Uso de C# para conectarse y ejecutar comandos SQL en Azure Cosmos DB for PostgreSQL

SE APLICA A: Azure Cosmos DB for PostgreSQL (con tecnología de la extensión de base de datos de Citus en PostgreSQL)

En este inicio rápido se muestra cómo usar el código de C# para conectarse a un clúster y usar instrucciones SQL para crear una tabla. A continuación, insertará, consultará, actualizará y eliminará datos de la base de datos. En los pasos de este artículo se da por hecho que está familiarizado con el desarrollo en C#, pero que nunca ha trabajado con Azure Cosmos DB for PostgreSQL.

Instalación de la biblioteca de PostgreSQL

Los ejemplos de código de este artículo requieren la biblioteca Npgsql. Deberá instalar Npgsql con el administrador de paquetes de idioma (como NuGet en Visual Studio).

Conexión, creación de una tabla e inserción de datos

Nos conectaremos a un clúster y cargaremos los datos mediante las instrucciones SQL como CREATE TABLE e INSERT INTO. Este código incluye los siguientes métodos de clase NpgsqlCommand:

  • Open() para establecer una conexión a Azure Cosmos DB for PostgreSQL.
  • CreateCommand() para establecer la propiedad CommandText.
  • ExecuteNonQuery() para ejecutar los comandos de base de datos.

Sugerencia

El siguiente código de ejemplo usa un grupo de conexiones para crear y administrar las conexiones a PostgreSQL. Se recomienda encarecidamente la agrupación de conexiones en el lado de la aplicación porque:

  • Garantiza que la aplicación no genere demasiadas conexiones a la base de datos, lo que evita que se superen los límites de conexiones.
  • Puede ayudar a mejorar drásticamente el rendimiento, tanto la latencia como el procesamiento. El proceso del servidor PostgreSQL debe bifurcarse para controlar cada nueva conexión y reutilizar una conexión evita esa sobrecarga.

En el código siguiente, reemplace <cluster> por el nombre del clúster y <password> por la contraseña de administrador.

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresCreate
    {
       
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("DROP TABLE IF EXISTS pharmacy;", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished dropping table (if existed)");
                }
                using (var command = new NpgsqlCommand("CREATE TABLE pharmacy (pharmacy_id integer ,pharmacy_name text,city text,state text,zip_code integer);", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished creating table");
                }
                using (var command = new NpgsqlCommand("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished creating index");
                }
                using (var command = new NpgsqlCommand("INSERT INTO  pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (@n1, @q1, @a, @b, @c)", conn))
                {
                    command.Parameters.AddWithValue("n1", 0);
                    command.Parameters.AddWithValue("q1", "Target");
                    command.Parameters.AddWithValue("a", "Sunnyvale");
                    command.Parameters.AddWithValue("b", "California");
                    command.Parameters.AddWithValue("c", 94001);
                    int nRows = command.ExecuteNonQuery();
                    Console.Out.WriteLine(String.Format("Number of rows inserted={0}", nRows));
                }

            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Distribución de tablas

Azure Cosmos DB for PostgreSQL le proporciona la habilidad de distribuir tablas entre varios nodos para mejorar la escalabilidad. Use el código siguiente para distribuir una tabla. Puede obtener más información sobre create_distributed_table y la columna de distribución en Columna de distribución (también conocida como clave de partición).

Nota

La distribución de tablas les permite crecer en todos los nodos de trabajo que se han agregado al clúster.

En el código siguiente, reemplace <cluster> por el nombre del clúster y <password> por la contraseña de administrador.

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresCreate
    {
      
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = {your password}; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("select create_distributed_table('pharmacy','pharmacy_id');", conn))
                {
                    command.ExecuteNonQuery();
                    Console.Out.WriteLine("Finished distributing the table");
                }

            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Lectura de datos

Use el código siguiente para conectarse y leer los datos mediante la instrucción SQL SELECT. Este código incluye los siguientes métodos de clase NpgsqlCommand:

En el código siguiente, reemplace <cluster> por el nombre del clúster y <password> por la contraseña de administrador.

using System;
using Npgsql;
namespace Driver
{
    public class read
    {

        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("SELECT * FROM pharmacy", conn))
                {
                    var reader = command.ExecuteReader();
                    while (reader.Read())
                    {
                        Console.WriteLine(
                            string.Format(
                                "Reading from table=({0}, {1}, {2}, {3}, {4})",
                                reader.GetInt32(0).ToString(),
                                reader.GetString(1),
                                 reader.GetString(2),
                                 reader.GetString(3),
                                reader.GetInt32(4).ToString()
                                )
                            );
                    }
                    reader.Close();
                }
            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Actualización de datos

Use el código siguiente para conectarse y actualizar los datos mediante la instrucción SQL UPDATE. En el código, reemplace <cluster> por el nombre del clúster y <password> por la contraseña de administrador.

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresUpdate
    {
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("UPDATE pharmacy SET city = @q WHERE pharmacy_id = @n", conn))
                {
                    command.Parameters.AddWithValue("n", 0);
                    command.Parameters.AddWithValue("q", "guntur");
                    int nRows = command.ExecuteNonQuery();
                    Console.Out.WriteLine(String.Format("Number of rows updated={0}", nRows));
                }
            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Eliminación de datos

Use el código siguiente para conectarse y eliminar los datos mediante la instrucción SQL DELETE. En el código, reemplace <cluster> por el nombre del clúster y <password> por la contraseña de administrador.

using System;
using Npgsql;
namespace Driver
{
    public class AzurePostgresDelete
    {
       
        static void Main(string[] args)
        {
            // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = {your password}; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {

                Console.Out.WriteLine("Opening connection");
                conn.Open();
                using (var command = new NpgsqlCommand("DELETE FROM pharmacy WHERE pharmacy_id = @n", conn))
                {
                    command.Parameters.AddWithValue("n", 0);
                    int nRows = command.ExecuteNonQuery();
                    Console.Out.WriteLine(String.Format("Number of rows deleted={0}", nRows));
                }
            }
            Console.WriteLine("Press RETURN to exit");
            Console.ReadLine();
        }
    }
}

Comando COPY para llevar a cabo una ingesta rápida

El comando COPY es capaz de conseguir un rendimiento enorme cuando ingiere datos en Azure Cosmos DB for PostgreSQL. El comando COPY puede ingerir datos ubicados en archivos o microprocesos de datos ubicados en memoria durante un proceso de ingesta en tiempo real.

Uso del comando COPY para cargar datos desde un archivo

El código de ejemplo siguiente copia datos de un archivo .csv a una tabla de base de datos.

El ejemplo de código requiere que el archivo pharmacies.csv esté en la carpeta Documentos. En el código, reemplace <cluster> por el nombre del clúster y <password> por la contraseña de administrador.

using Npgsql;
public class csvtotable
{

    static void Main(string[] args)
    {
        String sDestinationSchemaAndTableName = "pharmacy";
        String sFromFilePath = "C:\\Users\\Documents\\pharmacies.csv";
       
        // Replace <cluster> with your cluster name and <password> with your password:
        var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
            
        connStr.TrustServerCertificate = true;

        NpgsqlConnection conn = new NpgsqlConnection(connStr.ToString());
        NpgsqlCommand cmd = new NpgsqlCommand();

        conn.Open();

        if (File.Exists(sFromFilePath))
        {
            using (var writer = conn.BeginTextImport("COPY " + sDestinationSchemaAndTableName + " FROM STDIN WITH(FORMAT CSV, HEADER true,NULL ''); "))
            {
                foreach (String sLine in File.ReadAllLines(sFromFilePath))
                {
                    writer.WriteLine(sLine);
                }
            }
            Console.WriteLine("csv file data copied sucessfully");
        }
    }
}

Comando COPY para cargar datos en memoria

El siguiente código de ejemplo copia en una tabla los datos en memoria. En el código, reemplace <cluster> por el nombre del clúster y <password> por la contraseña de administrador.

using Npgsql;
using NpgsqlTypes;
namespace Driver
{
    public class InMemory
    {

        static async Task Main(string[] args)
        {
         
             // Replace <cluster> with your cluster name and <password> with your password:
            var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");

            connStr.TrustServerCertificate = true;

            using (var conn = new NpgsqlConnection(connStr.ToString()))
            {
                conn.Open();
                var text = new dynamic[] { 0, "Target", "Sunnyvale", "California", 94001 };
                using (var writer = conn.BeginBinaryImport("COPY pharmacy  FROM STDIN (FORMAT BINARY)"))
                {
                    writer.StartRow();
                    foreach (var item in text)
                    {
                        writer.Write(item);
                    }
                    writer.Complete();
                }
                Console.WriteLine("in-memory data copied sucessfully");
            }
        }
    }
}

Reintento de la aplicación para errores de solicitud de la base de datos

A veces es posible que las solicitudes de base de datos de la aplicación produzcan un error. Estos problemas pueden producirse en diferentes escenarios, como errores de red entre la aplicación y la base de datos, contraseñas incorrectas, etc. Algunos problemas pueden ser transitorios y resolverse por sí mismos en unos segundos o minutos. Puede configurar la lógica de reintento en la aplicación para resolver los errores transitorios.

Configurar la lógica de reintento en la aplicación ayuda a mejorar la experiencia del usuario final. En escenarios de error, los usuarios simplemente esperarán un poco más para que la aplicación atienda las solicitudes, en lugar de experimentar errores.

En el ejemplo siguiente se muestra cómo implementar la lógica de reintento en la aplicación. El fragmento de código de ejemplo intenta una solicitud de base de datos cada 60 segundos (hasta cinco veces) hasta que se realiza correctamente. El número y la frecuencia de los reintentos se pueden configurar en función de las necesidades de la aplicación.

En este código, reemplace el <clúster> por el nombre del clúster y la <contraseña> por la contraseña de administrador.

using System;
using System.Data;
using System.Runtime.InteropServices;
using System.Text;
using Npgsql;

namespace Driver
{
    public class Reconnect
    {
        
        // Replace <cluster> with your cluster name and <password> with your password:
        static string connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50;TrustServerCertificate = true").ToString();
        static string executeRetry(string sql, int retryCount)
        {
            for (int i = 0; i < retryCount; i++)
            {
                try
                {
                    using (var conn = new NpgsqlConnection(connStr))
                    {
                        conn.Open();
                        DataTable dt = new DataTable();
                        using (var _cmd = new NpgsqlCommand(sql, conn))
                        {
                            NpgsqlDataAdapter _dap = new NpgsqlDataAdapter(_cmd);
                            _dap.Fill(dt);
                            conn.Close();
                            if (dt != null)
                            {
                                if (dt.Rows.Count > 0)
                                {
                                    int J = dt.Rows.Count;
                                    StringBuilder sb = new StringBuilder();

                                    for (int k = 0; k < dt.Rows.Count; k++)
                                    {
                                        for (int j = 0; j < dt.Columns.Count; j++)
                                        {
                                            sb.Append(dt.Rows[k][j] + ",");
                                        }
                                        sb.Remove(sb.Length - 1, 1);
                                        sb.Append("\n");
                                    }
                                    return sb.ToString();
                                }
                            }
                        }
                    }
                    return null;
                }
                catch (Exception e)
                {
                    Thread.Sleep(60000);
                    Console.WriteLine(e.Message);
                }
            }
            return null;
        }
        static void Main(string[] args)
        {
            string result = executeRetry("select 1",5);
            Console.WriteLine(result);
        }
    }
}

Pasos siguientes