다음을 통해 공유


C#을 사용하여 Azure Cosmos DB for PostgreSQL에서 SQL 명령 연결 및 실행

적용 대상: Azure Cosmos DB for PostgreSQL(PostgreSQL에 대한 Citus 데이터베이스 확장 기반)

이 빠른 시작에서는 C# 코드를 사용하여 클러스터에 연결하고 SQL 문을 사용하여 테이블을 만드는 방법을 보여 줍니다. 그런 다음, 데이터베이스에서 데이터를 삽입, 쿼리, 업데이트 및 삭제합니다. 이 문서의 단계에서는 사용자가 C# 개발에 익숙하고 Azure Cosmos DB for PostgreSQL을 처음 사용한다고 가정합니다.

PostgreSQL 라이브러리 설치

이 문서의 코드 예에는 Npgsql 라이브러리가 필요합니다. 언어 패키지 관리자(예: Visual Studio의 NuGet)와 함께 Npgsql을 설치해야 합니다.

연결, 테이블 만들기 및 데이터 삽입

CREATE TABLE 및 INSERT INTO SQL 문을 사용하여 클러스터에 연결하고 데이터를 로드합니다. 코드는 다음 NpgsqlCommand 클래스 메서드를 사용합니다.

  • Open(): Azure Cosmos DB for PostgreSQL에 대한 연결을 설정합니다.
  • CreateCommand(): CommandText 속성을 설정합니다.
  • ExecuteNonQuery(): 데이터베이스 명령을 실행합니다.

아래 샘플 코드는 연결 풀을 사용하여 PostgreSQL에 대한 연결을 만들고 관리합니다. 애플리케이션 측 연결 풀링은 다음과 같은 이유로 강력히 권장됩니다.

  • 애플리케이션이 데이터베이스에 너무 많은 연결을 생성하지 않도록 하여 연결 제한을 초과하지 않도록 합니다.
  • 대기 시간과 처리량 모두에서 성능을 크게 개선시키는 데 도움이 될 수 있습니다. PostgreSQL 서버 프로세스는 각각의 새로운 연결을 처리하기 위해 분기해야 하며 연결을 재사용하면 이러한 오버헤드를 피할 수 있습니다.

다음 코드에서 <cluster>를 클러스터 이름으로 바꾸고 <password>를 관리자 암호로 바꿉니다.

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresCreate
    {
       
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("DROP TABLE IF EXISTS pharmacy;", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished dropping table (if existed)");
                }
                using (var command = new NpgsqlCommand("CREATE TABLE pharmacy (pharmacy_id integer ,pharmacy_name text,city text,state text,zip_code integer);", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished creating table");
                }
                using (var command = new NpgsqlCommand("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished creating index");
                }
                using (var command = new NpgsqlCommand("INSERT INTO  pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (@n1, @q1, @a, @b, @c)", conn))
                {
                    command.Parameters.AddWithValue("n1", 0);
                    command.Parameters.AddWithValue("q1", "Target");
                    command.Parameters.AddWithValue("a", "Sunnyvale");
                    command.Parameters.AddWithValue("b", "California");
                    command.Parameters.AddWithValue("c", 94001);
                    int nRows = command.ExecuteNonQuery();
                    Console.Out.WriteLine(String.Format("Number of rows inserted={0}", nRows));
                }

            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

테이블 배포

Azure Cosmos DB for PostgreSQL은 확장성을 위해 여러 노드에 걸쳐 테이블을 분산하는 강력한 기능을 제공합니다. 다음 코드를 사용하여 테이블을 배포합니다. 배포 열(분할 키라고도 함)에서 create_distributed_table 및 분포 열에 대해 자세히 알아볼 수 있습니다.

참고 항목

테이블을 분산하면 클러스터에 추가된 모든 작업자 노드에서 확장할 수 있습니다.

다음 코드에서 <cluster>를 클러스터 이름으로 바꾸고 <password>를 관리자 암호로 바꿉니다.

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresCreate
    {
      
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = {your password}; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("select create_distributed_table('pharmacy','pharmacy_id');", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished distributing the table");
                }

            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

데이터 읽기

SELECT SQL 문을 사용하여 데이터를 연결하고 읽으려면 다음 코드를 사용하세요. 코드는 다음 NpgsqlCommand 클래스 메서드를 사용합니다.

다음 코드에서 <cluster>를 클러스터 이름으로 바꾸고 <password>를 관리자 암호로 바꿉니다.

using System;
using Npgsql;
namespace Driver
{
    public class read
    {

        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("SELECT * FROM pharmacy", conn))
                {
                    var reader = command.ExecuteReader();
                    while (reader.Read())
                    {
                        Console.WriteLine(
                            string.Format(
                                "Reading from table=({0}, {1}, {2}, {3}, {4})",
                                reader.GetInt32(0).ToString(),
                                reader.GetString(1),
                                 reader.GetString(2),
                                 reader.GetString(3),
                                reader.GetInt32(4).ToString()
                                )
                            );
                    }
                    reader.Close();
                }
            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

데이터 업데이트

UPDATE SQL 문을 사용하여 데이터를 연결하고 업데이트하려면 다음 코드를 사용하세요. 코드에서 <cluster>를 클러스터 이름으로 바꾸고 <password>를 관리자 암호로 바꿉니다.

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresUpdate
    {
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("UPDATE pharmacy SET city = @q WHERE pharmacy_id = @n", conn))
                {
                    command.Parameters.AddWithValue("n", 0);
                    command.Parameters.AddWithValue("q", "guntur");
                    int nRows = command.ExecuteNonQuery();
                    Console.Out.WriteLine(String.Format("Number of rows updated={0}", nRows));
                }
            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

데이터 삭제

DELETE SQL 문을 사용하여 데이터를 연결하고 삭제하려면 다음 코드를 사용하세요. 코드에서 <cluster>를 클러스터 이름으로 바꾸고 <password>를 관리자 암호로 바꿉니다.

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresDelete
    {
       
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = {your password}; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {

                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("DELETE FROM pharmacy WHERE pharmacy_id = @n", conn))
                {
                    command.Parameters.AddWithValue("n", 0);
                    int nRows = command.ExecuteNonQuery();
                    Console.Out.WriteLine(String.Format("Number of rows deleted={0}", nRows));
                }
            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

빠른 수집을 위한 COPY 명령

COPY 명령은 데이터를 Azure Cosmos DB for PostgreSQL로 수집하는 동안 엄청난 처리량을 낼 수 있습니다. COPY 명령은 실시간 수집을 위해 파일의 데이터 또는 메모리에 있는 데이터의 마이크로 일괄 처리에서 데이터를 수집할 수 있습니다.

파일에서 데이터를 로드하는 COPY 명령

다음 코드 예는 CSV 파일에서 데이터베이스 테이블로 데이터를 복사합니다.

코드 샘플을 사용하려면 pharmacy.csv 파일이 Documents 폴더에 있어야 합니다. 코드에서 <cluster>를 클러스터 이름으로 바꾸고 <password>를 관리자 암호로 바꿉니다.

using Npgsql;
public class csvtotable
{

    static void Main(string[] args)
    {
        String sDestinationSchemaAndTableName = "pharmacy";
        String sFromFilePath = "C:\\Users\\Documents\\pharmacies.csv";
       
        // Replace <cluster> with your cluster name and <password> with your password:
        var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
            
        connStr.TrustServerCertificate = true;

        NpgsqlConnection conn = new NpgsqlConnection(connStr.ToString());
        NpgsqlCommand cmd = new NpgsqlCommand();

        conn.Open();

        if (File.Exists(sFromFilePath))
        {
            using (var writer = conn.BeginTextImport("COPY " + sDestinationSchemaAndTableName + " FROM STDIN WITH(FORMAT CSV, HEADER true,NULL ''); "))
            {
                foreach (String sLine in File.ReadAllLines(sFromFilePath))
                {
                    writer.WriteLine(sLine);
                }
            }
            Console.WriteLine("csv file data copied sucessfully");
        }
    }
}

메모리 내 데이터를 로드하는 COPY 명령

다음 코드 예는 메모리 내 데이터를 테이블에 복사합니다. 코드에서 <cluster>를 클러스터 이름으로 바꾸고 <password>를 관리자 암호로 바꿉니다.

using Npgsql;
using NpgsqlTypes;
namespace Driver
{
    public class InMemory
    {

        static async Task Main(string[] args)
        {
         
             // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                conn.Open();
                var text = new dynamic[] { 0, "Target", "Sunnyvale", "California", 94001 };
                using (var writer = conn.BeginBinaryImport("COPY pharmacy  FROM STDIN (FORMAT BINARY)"))
                {
                    writer.StartRow();
                    foreach (var item in text)
                    {
                        writer.Write(item);
                    }
                    writer.Complete();
                }
                Console.WriteLine("in-memory data copied sucessfully");
            }
        }
    }
}

데이터베이스 요청 실패에 대한 앱 다시 시도

애플리케이션의 데이터베이스 요청이 실패하는 경우가 있습니다. 이러한 문제는 앱과 데이터베이스 간의 네트워크 오류, 잘못된 암호 등과 같은 다양한 시나리오에서 발생할 수 있습니다. 일부 문제는 일시적일 수 있으며 몇 초에서 몇 분 안에 자체적으로 해결됩니다. 일시적인 오류를 극복하도록 앱에서 다시 시도 논리를 구성할 수 있습니다.

앱에서 다시 시도 논리를 구성하면 최종 사용자 환경을 향상시키는 데 도움이 됩니다. 오류 시나리오에서 사용자는 오류가 발생하지 않고 애플리케이션에서 요청을 처리할 때까지 조금 더 기다리게 됩니다.

아래 예제에서는 앱에서 다시 시도 논리를 구현하는 방법을 보여 줍니다. 이 샘플 코드 조각은 성공할 때까지 60초마다(최대 5회) 데이터베이스 요청을 시도합니다. 다시 시도 횟수 및 빈도는 애플리케이션의 요구 사항에 따라 구성할 수 있습니다.

이 코드에서 <cluster>를 클러스터 이름으로, <password>를 관리자 암호로 바꿉니다.

using System;
using System.Data;
using System.Runtime.InteropServices;
using System.Text;
using Npgsql;

namespace Driver
{
    public class Reconnect
    {
        
        // Replace <cluster> with your cluster name and <password> with your password:
        static string connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50;TrustServerCertificate = true").ToString();
        static string executeRetry(string sql, int retryCount)
        {
            for (int i = 0; i < retryCount; i++)
            {
                try
                {
                    using (var conn = new NpgsqlConnection(connStr))
                    {
                        conn.Open();
                        DataTable dt = new DataTable();
                        using (var _cmd = new NpgsqlCommand(sql, conn))
                        {
                            NpgsqlDataAdapter _dap = new NpgsqlDataAdapter(_cmd);
                            _dap.Fill(dt);
                            conn.Close();
                            if (dt != null)
                            {
                                if (dt.Rows.Count > 0)
                                {
                                    int J = dt.Rows.Count;
                                    StringBuilder sb = new StringBuilder();

                                    for (int k = 0; k < dt.Rows.Count; k++)
                                    {
                                        for (int j = 0; j < dt.Columns.Count; j++)
                                        {
                                            sb.Append(dt.Rows[k][j] + ",");
                                        }
                                        sb.Remove(sb.Length - 1, 1);
                                        sb.Append("\n");
                                    }
                                    return sb.ToString();
                                }
                            }
                        }
                    }
                    return null;
                }
                catch (Exception e)
                {
                    Thread.Sleep(60000);
                    Console.WriteLine(e.Message);
                }
            }
            return null;
        }
        static void Main(string[] args)
        {
            string result = executeRetry("select 1",5);
            Console.WriteLine(result);
        }
    }
}

다음 단계