Nawiązywanie połączenia i uruchamianie poleceń SQL w usłudze Azure Cosmos DB for PostgreSQL za pomocą języka C#

DOTYCZY: Usługa Azure Cosmos DB for PostgreSQL (obsługiwana przez rozszerzenie bazy danych Citus do bazy danych PostgreSQL)

W tym przewodniku Szybki start pokazano, jak używać kodu w języku C# do nawiązywania połączenia z klastrem i używać instrukcji SQL do tworzenia tabeli. Następnie wstawisz, wykonasz zapytanie, zaktualizujesz i usuniesz dane w bazie danych. W krokach opisanych w tym artykule założono, że znasz programowanie w języku C# i dopiero zaczynasz pracę z usługą Azure Cosmos DB for PostgreSQL.

Instalowanie biblioteki PostgreSQL

Przykłady kodu w tym artykule wymagają biblioteki Npgsql . Musisz zainstalować narzędzie Npgsql za pomocą menedżera pakietów językowych (np. NuGet w programie Visual Studio).

Łączenie, tworzenie tabeli i wstawianie danych

Połączymy się z klastrem i załadujemy dane przy użyciu instrukcji CREATE TABLE i INSERT INTO SQL. Kod używa następujących NpgsqlCommand metod klas:

  • Open() w celu nawiązania połączenia z usługą Azure Cosmos DB for PostgreSQL
  • CreateCommand() w celu ustawienia właściwości CommandText
  • ExecuteNonQuery() do uruchamiania poleceń bazy danych

Porada

Poniższy przykładowy kod używa puli połączeń do tworzenia połączeń z bazą danych PostgreSQL i zarządzania nimi. Buforowanie połączeń po stronie aplikacji jest zdecydowanie zalecane, ponieważ:

  • Gwarantuje to, że aplikacja nie generuje zbyt wielu połączeń z bazą danych, dlatego unika przekraczania limitów połączeń.
  • Może to pomóc drastycznie poprawić wydajność — zarówno opóźnienie, jak i przepływność. Proces serwera PostgreSQL musi obsługiwać każde nowe połączenie, a ponowne użycie połączenia pozwala uniknąć tego obciążenia.

W poniższym kodzie zastąp <klaster> nazwą klastra i <hasłem administratora> .

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresCreate
    {
       
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("DROP TABLE IF EXISTS pharmacy;", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished dropping table (if existed)");
                }
                using (var command = new NpgsqlCommand("CREATE TABLE pharmacy (pharmacy_id integer ,pharmacy_name text,city text,state text,zip_code integer);", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished creating table");
                }
                using (var command = new NpgsqlCommand("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished creating index");
                }
                using (var command = new NpgsqlCommand("INSERT INTO  pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (@n1, @q1, @a, @b, @c)", conn))
                {
                    command.Parameters.AddWithValue("n1", 0);
                    command.Parameters.AddWithValue("q1", "Target");
                    command.Parameters.AddWithValue("a", "Sunnyvale");
                    command.Parameters.AddWithValue("b", "California");
                    command.Parameters.AddWithValue("c", 94001);
                    int nRows = command.ExecuteNonQuery();
                    Console.Out.WriteLine(String.Format("Number of rows inserted={0}", nRows));
                }

            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Dystrybuowanie tabel

Usługa Azure Cosmos DB for PostgreSQL zapewnia super możliwości dystrybucji tabel między wieloma węzłami w celu skalowalności. Użyj następującego kodu, aby rozpowszechnić tabelę. Więcej informacji na temat create_distributed_table kolumny dystrybucji można uzyskać w kolumnie Dystrybucja (znanej również jako klucz fragmentu).

Uwaga

Dystrybucja tabel umożliwia ich zwiększanie w obrębie wszystkich węzłów roboczych dodanych do klastra.

W poniższym kodzie zastąp <klaster> nazwą klastra i <hasłem administratora> .

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresCreate
    {
      
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = {your password}; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("select create_distributed_table('pharmacy','pharmacy_id');", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished distributing the table");
                }

            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Odczyt danych

Użyj poniższego kodu, aby nawiązać połączenie i odczytać dane za pomocą instrukcji SELECT języka SQL. Kod używa następujących NpgsqlCommand metod klas:

W poniższym kodzie zastąp <klaster> nazwą klastra i <hasłem administratora> .

using System;
using Npgsql;
namespace Driver
{
    public class read
    {

        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("SELECT * FROM pharmacy", conn))
                {
                    var reader = command.ExecuteReader();
                    while (reader.Read())
                    {
                        Console.WriteLine(
                            string.Format(
                                "Reading from table=({0}, {1}, {2}, {3}, {4})",
                                reader.GetInt32(0).ToString(),
                                reader.GetString(1),
                                 reader.GetString(2),
                                 reader.GetString(3),
                                reader.GetInt32(4).ToString()
                                )
                            );
                    }
                    reader.Close();
                }
            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Aktualizowanie danych

Użyj następującego kodu, aby nawiązać połączenie i zaktualizować dane przy użyciu instrukcji UPDATE SQL. W kodzie zastąp <klaster> nazwą klastra i <hasłem administratora> .

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresUpdate
    {
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("UPDATE pharmacy SET city = @q WHERE pharmacy_id = @n", conn))
                {
                    command.Parameters.AddWithValue("n", 0);
                    command.Parameters.AddWithValue("q", "guntur");
                    int nRows = command.ExecuteNonQuery();
                    Console.Out.WriteLine(String.Format("Number of rows updated={0}", nRows));
                }
            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Usuwanie danych

Użyj następującego kodu, aby połączyć i usunąć dane przy użyciu instrukcji DELETE SQL. W kodzie zastąp <klaster> nazwą klastra i <hasłem administratora> .

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresDelete
    {
       
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = {your password}; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {

                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("DELETE FROM pharmacy WHERE pharmacy_id = @n", conn))
                {
                    command.Parameters.AddWithValue("n", 0);
                    int nRows = command.ExecuteNonQuery();
                    Console.Out.WriteLine(String.Format("Number of rows deleted={0}", nRows));
                }
            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

POLECENIE COPY do szybkiego pozyskiwania

Polecenie COPY może przynieść ogromną przepływność podczas pozyskiwania danych do usługi Azure Cosmos DB for PostgreSQL. Polecenie COPY może pozyskiwać dane w plikach lub z mikrosadów danych w pamięci na potrzeby pozyskiwania danych w czasie rzeczywistym.

POLECENIE COPY w celu załadowania danych z pliku

Poniższy przykładowy kod kopiuje dane z pliku CSV do tabeli bazy danych.

Przykładowy kod wymaga, aby plik pharmacies.csv znajdować się w folderze Dokumenty . W kodzie zastąp <klaster> nazwą klastra i <hasłem administratora> .

using Npgsql;
public class csvtotable
{

    static void Main(string[] args)
    {
        String sDestinationSchemaAndTableName = "pharmacy";
        String sFromFilePath = "C:\\Users\\Documents\\pharmacies.csv";
       
        // Replace <cluster> with your cluster name and <password> with your password:
        var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
            
        connStr.TrustServerCertificate = true;

        NpgsqlConnection conn = new NpgsqlConnection(connStr.ToString());
        NpgsqlCommand cmd = new NpgsqlCommand();

        conn.Open();

        if (File.Exists(sFromFilePath))
        {
            using (var writer = conn.BeginTextImport("COPY " + sDestinationSchemaAndTableName + " FROM STDIN WITH(FORMAT CSV, HEADER true,NULL ''); "))
            {
                foreach (String sLine in File.ReadAllLines(sFromFilePath))
                {
                    writer.WriteLine(sLine);
                }
            }
            Console.WriteLine("csv file data copied sucessfully");
        }
    }
}

POLECENIE COPY w celu załadowania danych w pamięci

Poniższy przykładowy kod kopiuje dane w pamięci do tabeli. W kodzie zastąp <klaster> nazwą klastra i <hasłem administratora> .

using Npgsql;
using NpgsqlTypes;
namespace Driver
{
    public class InMemory
    {

        static async Task Main(string[] args)
        {
         
             // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                conn.Open();
                var text = new dynamic[] { 0, "Target", "Sunnyvale", "California", 94001 };
                using (var writer = conn.BeginBinaryImport("COPY pharmacy  FROM STDIN (FORMAT BINARY)"))
                {
                    writer.StartRow();
                    foreach (var item in text)
                    {
                        writer.Write(item);
                    }
                    writer.Complete();
                }
                Console.WriteLine("in-memory data copied sucessfully");
            }
        }
    }
}

Ponów próbę aplikacji dla błędów żądań bazy danych

Czasami możliwe jest, że żądania bazy danych z aplikacji kończą się niepowodzeniem. Takie problemy mogą wystąpić w różnych scenariuszach, takich jak awaria sieci między aplikacją a bazą danych, nieprawidłowe hasło itp. Niektóre problemy mogą być przejściowe i rozwiązać się w ciągu kilku sekund do minut. Logikę ponawiania prób można skonfigurować w aplikacji, aby przezwyciężyć błędy przejściowe.

Konfigurowanie logiki ponawiania prób w aplikacji pomaga ulepszyć środowisko użytkownika końcowego. W przypadku scenariuszy awarii użytkownicy będą tylko czekać nieco dłużej, aż aplikacja będzie obsługiwać żądania, a nie napotkać błędów.

W poniższym przykładzie pokazano, jak zaimplementować logikę ponawiania prób w aplikacji. Przykładowy fragment kodu próbuje żądać bazy danych co 60 sekund (maksymalnie pięć razy), dopóki nie powiedzie się. Liczbę i częstotliwość ponownych prób można skonfigurować na podstawie potrzeb aplikacji.

W tym kodzie zastąp <klaster> nazwą klastra i <hasłem administratora> .

using System;
using System.Data;
using System.Runtime.InteropServices;
using System.Text;
using Npgsql;

namespace Driver
{
    public class Reconnect
    {
        
        // Replace <cluster> with your cluster name and <password> with your password:
        static string connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50;TrustServerCertificate = true").ToString();
        static string executeRetry(string sql, int retryCount)
        {
            for (int i = 0; i < retryCount; i++)
            {
                try
                {
                    using (var conn = new NpgsqlConnection(connStr))
                    {
                        conn.Open();
                        DataTable dt = new DataTable();
                        using (var _cmd = new NpgsqlCommand(sql, conn))
                        {
                            NpgsqlDataAdapter _dap = new NpgsqlDataAdapter(_cmd);
                            _dap.Fill(dt);
                            conn.Close();
                            if (dt != null)
                            {
                                if (dt.Rows.Count > 0)
                                {
                                    int J = dt.Rows.Count;
                                    StringBuilder sb = new StringBuilder();

                                    for (int k = 0; k < dt.Rows.Count; k++)
                                    {
                                        for (int j = 0; j < dt.Columns.Count; j++)
                                        {
                                            sb.Append(dt.Rows[k][j] + ",");
                                        }
                                        sb.Remove(sb.Length - 1, 1);
                                        sb.Append("\n");
                                    }
                                    return sb.ToString();
                                }
                            }
                        }
                    }
                    return null;
                }
                catch (Exception e)
                {
                    Thread.Sleep(60000);
                    Console.WriteLine(e.Message);
                }
            }
            return null;
        }
        static void Main(string[] args)
        {
            string result = executeRetry("select 1",5);
            Console.WriteLine(result);
        }
    }
}

Następne kroki