Использование C# для подключения и выполнения команд SQL в Azure Cosmos DB для PostgreSQL

ПРИМЕНИМО К: Azure Cosmos DB для PostgreSQL (на базе расширения базы данных Citus для PostgreSQL)

В этом кратком руководстве показано, как использовать код C# для подключения к кластеру и использовать инструкции SQL для создания таблицы. Затем вы будете вставлять, запрашивать, обновлять и удалять данные в базе данных. В этой статье предполагается, что вы знакомы с разработкой на C# и не знакомы с Azure Cosmos DB для PostgreSQL.

Установка библиотеки PostgreSQL

Для примеров кода в этой статье требуется библиотека Npgsql . Необходимо установить Npgsql с помощью диспетчера языковых пакетов (например, NuGet в Visual Studio).

Подключение, создание таблицы и вставка данных

Мы подключимся к кластеру и загрузим данные с помощью инструкций SQL CREATE TABLE и INSERT INTO. В коде используются следующие методы класса NpgsqlCommand:

  • Open() для установки подключения к Azure Cosmos DB для PostgreSQL
  • CreateCommand(), чтобы задать свойство CommandText;
  • ExecuteNonQuery() для выполнения команд базы данных

Совет

В приведенном ниже примере кода используется пул подключений для создания подключений к PostgreSQL и управления ими. Настоятельно рекомендуется использовать пул подключений на стороне приложения, так как:

  • Это гарантирует, что приложение не будет создавать слишком много подключений к базе данных, поэтому можно будет избежать превышения ограничений на подключения.
  • Это может значительно повысить производительность — как с точки зрения задержки, так и с точки зрения пропускной способности. Процесс сервера PostgreSQL должен разделиться, чтобы обработать каждое новое подключение, а повторное использование подключения позволяет избежать этих издержек.

В следующем коде замените <кластер> именем кластера, а <пароль> — паролем администратора.

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresCreate
    {
       
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("DROP TABLE IF EXISTS pharmacy;", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished dropping table (if existed)");
                }
                using (var command = new NpgsqlCommand("CREATE TABLE pharmacy (pharmacy_id integer ,pharmacy_name text,city text,state text,zip_code integer);", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished creating table");
                }
                using (var command = new NpgsqlCommand("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished creating index");
                }
                using (var command = new NpgsqlCommand("INSERT INTO  pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (@n1, @q1, @a, @b, @c)", conn))
                {
                    command.Parameters.AddWithValue("n1", 0);
                    command.Parameters.AddWithValue("q1", "Target");
                    command.Parameters.AddWithValue("a", "Sunnyvale");
                    command.Parameters.AddWithValue("b", "California");
                    command.Parameters.AddWithValue("c", 94001);
                    int nRows = command.ExecuteNonQuery();
                    Console.Out.WriteLine(String.Format("Number of rows inserted={0}", nRows));
                }

            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Распределение таблиц

Azure Cosmos DB для PostgreSQL предоставляет супер возможности распределения таблиц между несколькими узлами для обеспечения масштабируемости. Используйте следующий код для распространения таблицы. Дополнительные сведения о create_distributed_table столбце распределения и см. в разделе Столбец распределения (также известный как ключ сегмента).

Примечание

Распределение таблиц позволяет увеличивать их между любыми рабочими узлами, добавленными в кластер.

В следующем коде замените <кластер> именем кластера, а <пароль> — паролем администратора.

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresCreate
    {
      
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = {your password}; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("select create_distributed_table('pharmacy','pharmacy_id');", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished distributing the table");
                }

            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Чтение данных

Используйте указанный ниже код с инструкцией SQL SELECT для подключения и чтения данных. В коде используются следующие методы класса NpgsqlCommand:

  • Open(), чтобы установить подключение к Azure Cosmos DB для PostgreSQL.
  • CreateCommand() и ExecuteReader(), чтобы выполнять команды базы данных.
  • Read(), чтобы перейти к записи в результирующем наборе.
  • GetInt32() и GetString(), чтобы анализировать значения в записи.

В следующем коде замените <кластер> именем кластера, а <пароль> — паролем администратора.

using System;
using Npgsql;
namespace Driver
{
    public class read
    {

        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("SELECT * FROM pharmacy", conn))
                {
                    var reader = command.ExecuteReader();
                    while (reader.Read())
                    {
                        Console.WriteLine(
                            string.Format(
                                "Reading from table=({0}, {1}, {2}, {3}, {4})",
                                reader.GetInt32(0).ToString(),
                                reader.GetString(1),
                                 reader.GetString(2),
                                 reader.GetString(3),
                                reader.GetInt32(4).ToString()
                                )
                            );
                    }
                    reader.Close();
                }
            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Обновление данных

Используйте указанный ниже код с инструкцией SQL UPDATE для подключения и обновления данных. В коде замените <кластер> именем кластера, а <пароль> — паролем администратора.

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresUpdate
    {
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("UPDATE pharmacy SET city = @q WHERE pharmacy_id = @n", conn))
                {
                    command.Parameters.AddWithValue("n", 0);
                    command.Parameters.AddWithValue("q", "guntur");
                    int nRows = command.ExecuteNonQuery();
                    Console.Out.WriteLine(String.Format("Number of rows updated={0}", nRows));
                }
            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Удаление данных

Используйте указанный ниже код, чтобы подключиться и удалить данные с помощью инструкции SQL DELETE. В коде замените <кластер> именем кластера, а <пароль> — паролем администратора.

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresDelete
    {
       
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = {your password}; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {

                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("DELETE FROM pharmacy WHERE pharmacy_id = @n", conn))
                {
                    command.Parameters.AddWithValue("n", 0);
                    int nRows = command.ExecuteNonQuery();
                    Console.Out.WriteLine(String.Format("Number of rows deleted={0}", nRows));
                }
            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Команда COPY для быстрого приема

Команда COPY может обеспечить огромную пропускную способность при приеме данных в Azure Cosmos DB для PostgreSQL. Команда COPY может принимать данные в файлах или из микропакетов данных в памяти для приема в реальном времени.

Команда COPY для загрузки данных из файла

В следующем примере кода данные копируются из CSV-файла в таблицу базы данных.

Для примера кода требуется, чтобы файлpharmacies.csv был в папке Документы . В коде замените <кластер> именем кластера, а <пароль> — паролем администратора.

using Npgsql;
public class csvtotable
{

    static void Main(string[] args)
    {
        String sDestinationSchemaAndTableName = "pharmacy";
        String sFromFilePath = "C:\\Users\\Documents\\pharmacies.csv";
       
        // Replace <cluster> with your cluster name and <password> with your password:
        var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
            
        connStr.TrustServerCertificate = true;

        NpgsqlConnection conn = new NpgsqlConnection(connStr.ToString());
        NpgsqlCommand cmd = new NpgsqlCommand();

        conn.Open();

        if (File.Exists(sFromFilePath))
        {
            using (var writer = conn.BeginTextImport("COPY " + sDestinationSchemaAndTableName + " FROM STDIN WITH(FORMAT CSV, HEADER true,NULL ''); "))
            {
                foreach (String sLine in File.ReadAllLines(sFromFilePath))
                {
                    writer.WriteLine(sLine);
                }
            }
            Console.WriteLine("csv file data copied sucessfully");
        }
    }
}

Команда COPY для загрузки данных в памяти

В следующем примере кода данные в памяти копируются в таблицу. В коде замените <кластер> именем кластера, а <пароль> — паролем администратора.

using Npgsql;
using NpgsqlTypes;
namespace Driver
{
    public class InMemory
    {

        static async Task Main(string[] args)
        {
         
             // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                conn.Open();
                var text = new dynamic[] { 0, "Target", "Sunnyvale", "California", 94001 };
                using (var writer = conn.BeginBinaryImport("COPY pharmacy  FROM STDIN (FORMAT BINARY)"))
                {
                    writer.StartRow();
                    foreach (var item in text)
                    {
                        writer.Write(item);
                    }
                    writer.Complete();
                }
                Console.WriteLine("in-memory data copied sucessfully");
            }
        }
    }
}

Повторные попытки приложения при сбоях запросов к базе данных

Иногда запросы к базе данных из приложения могут завершаться ошибкой. Такие проблемы могут возникать при различных сценариях, включая сбой сети между приложением и базой данных, неправильный пароль и т. д. Некоторые проблемы могут быть временными и самостоятельно устраняться в течение нескольких секунд или минут. Для устранения временных ошибок можно настроить в приложении логику повторных попыток.

Настройка логики повторных попыток в приложении помогает улучшить взаимодействие с пользователем. В сценариях сбоя пользователи просто будут немного дольше ждать, пока приложение обслужит запросы, а не столкнутся с ошибками.

В приведенном ниже примере показано, как реализовать логику повторных попыток в вашем приложении. Фрагмент примера кода пытается выполнить запрос к базе данных каждые 60 секунд (до пяти раз) до достижения требуемого результата. Количество и частоту повторных попыток можно настроить в зависимости от требований вашего приложения.

В этом коде замените <кластер> именем кластера, а <пароль> — паролем администратора.

using System;
using System.Data;
using System.Runtime.InteropServices;
using System.Text;
using Npgsql;

namespace Driver
{
    public class Reconnect
    {
        
        // Replace <cluster> with your cluster name and <password> with your password:
        static string connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50;TrustServerCertificate = true").ToString();
        static string executeRetry(string sql, int retryCount)
        {
            for (int i = 0; i < retryCount; i++)
            {
                try
                {
                    using (var conn = new NpgsqlConnection(connStr))
                    {
                        conn.Open();
                        DataTable dt = new DataTable();
                        using (var _cmd = new NpgsqlCommand(sql, conn))
                        {
                            NpgsqlDataAdapter _dap = new NpgsqlDataAdapter(_cmd);
                            _dap.Fill(dt);
                            conn.Close();
                            if (dt != null)
                            {
                                if (dt.Rows.Count > 0)
                                {
                                    int J = dt.Rows.Count;
                                    StringBuilder sb = new StringBuilder();

                                    for (int k = 0; k < dt.Rows.Count; k++)
                                    {
                                        for (int j = 0; j < dt.Columns.Count; j++)
                                        {
                                            sb.Append(dt.Rows[k][j] + ",");
                                        }
                                        sb.Remove(sb.Length - 1, 1);
                                        sb.Append("\n");
                                    }
                                    return sb.ToString();
                                }
                            }
                        }
                    }
                    return null;
                }
                catch (Exception e)
                {
                    Thread.Sleep(60000);
                    Console.WriteLine(e.Message);
                }
            }
            return null;
        }
        static void Main(string[] args)
        {
            string result = executeRetry("select 1",5);
            Console.WriteLine(result);
        }
    }
}

Дальнейшие действия