C# を使用し、Azure Cosmos DB for PostgreSQL に接続して SQL ステートメントを実行する
適用対象: Azure Cosmos DB for PostgreSQL (PostgreSQL の Citus データベース拡張機能を利用)
このクイックスタートでは、C# コードを使用してクラスターに接続する方法と、SQL ステートメントを使用してテーブルを作成する方法について説明します。 その後で、データベース内のデータの挿入、クエリ、更新、削除を実行します。 この記事内の手順では、C# の開発には慣れているものの、Azure Cosmos DB for PostgreSQL を使用するのは初めてというユーザーを対象とします。
PostgreSQL ライブラリをインストールする
この記事のコード例では、Npgsql ライブラリが必要です。 言語パッケージ マネージャー (Visual Studio の NuGet など) を使用して Npgsql をインストールする必要があります。
接続、テーブルの作成、およびデータの挿入
クラスターに接続し、CREATE TABLE および INSERT INTO SQL ステートメントを使用してデータを読み込みます。 このコードでは、NpgsqlCommand
クラスのメソッドを使用します。
- Open() では、Azure Cosmos DB for PostgreSQL への接続を確立します
- CreateCommand() は、CommandText プロパティを設定します
- ExecuteNonQuery() では、データベース コマンドを実行します
ヒント
下のサンプル コードでは、接続プールを使用して PostgreSQL への接続を作成および管理します。 次の理由から、アプリケーション側の接続プールを強くお勧めします。
- アプリケーションがデータベースへの接続を生成しすぎないようにするため、接続制限の超過を回避できます。
- 待機時間とスループットの両方のパフォーマンスを大幅に向上させるのに役立ちます。 PostgreSQL サーバー プロセスでは、新しい各接続を処理するためにフォークする必要があり、接続を再利用すると、そのオーバーヘッドが回避されます。
以下のコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードまたは Microsoft Entra ID トークンに置き換えます。
using System;
using Npgsql;
namespace Driver
{
public class AzurePostgresCreate
{
static void Main(string[] args)
{
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
connStr.TrustServerCertificate = true;
using (var conn = new NpgsqlConnection(connStr.ToString()))
{
Console.Out.WriteLine("Opening connection");
conn.Open();
using (var command = new NpgsqlCommand("DROP TABLE IF EXISTS pharmacy;", conn))
{
command.ExecuteNonQuery();
Console.Out.WriteLine("Finished dropping table (if existed)");
}
using (var command = new NpgsqlCommand("CREATE TABLE pharmacy (pharmacy_id integer ,pharmacy_name text,city text,state text,zip_code integer);", conn))
{
command.ExecuteNonQuery();
Console.Out.WriteLine("Finished creating table");
}
using (var command = new NpgsqlCommand("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);", conn))
{
command.ExecuteNonQuery();
Console.Out.WriteLine("Finished creating index");
}
using (var command = new NpgsqlCommand("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (@n1, @q1, @a, @b, @c)", conn))
{
command.Parameters.AddWithValue("n1", 0);
command.Parameters.AddWithValue("q1", "Target");
command.Parameters.AddWithValue("a", "Sunnyvale");
command.Parameters.AddWithValue("b", "California");
command.Parameters.AddWithValue("c", 94001);
int nRows = command.ExecuteNonQuery();
Console.Out.WriteLine(String.Format("Number of rows inserted={0}", nRows));
}
}
Console.WriteLine("Press RETURN to exit");
Console.ReadLine();
}
}
}
テーブルの分散
Azure Cosmos DB for PostgreSQL は、スケーラビリティを確保するために複数のノード全体にテーブルを分散させる優れた能力をユーザーに提供するものです。 次のコードを使用してテーブルを分散させます。 create_distributed_table
と分散列の詳細については、「分散列 (別名シャード キー)」を参照してください。
Note
テーブルを分散させると、クラスターに追加されたすべてのワーカー ノード全体にテーブルを拡張することができます。
以下のコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。
using System;
using Npgsql;
namespace Driver
{
public class AzurePostgresCreate
{
static void Main(string[] args)
{
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = {your password}; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50");
connStr.TrustServerCertificate = true;
using (var conn = new NpgsqlConnection(connStr.ToString()))
{
Console.Out.WriteLine("Opening connection");
conn.Open();
using (var command = new NpgsqlCommand("select create_distributed_table('pharmacy','pharmacy_id');", conn))
{
command.ExecuteNonQuery();
Console.Out.WriteLine("Finished distributing the table");
}
}
Console.WriteLine("Press RETURN to exit");
Console.ReadLine();
}
}
}
データの読み取り
接続し、SELECT SQL ステートメントを使用してデータを読み取るには、次のコードを使用します。 このコードでは、NpgsqlCommand
クラスのメソッドを使用します。
- Open() では、Azure Cosmos DB for PostgreSQL への接続を確立します。
- CreateCommand() および ExecuteReader(): データベース コマンドを実行します。
- Read(): 結果のレコードに進みます。
- GetInt32 () および GetString (): レコード内の値を解析します。
以下のコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。
using System;
using Npgsql;
namespace Driver
{
public class read
{
static void Main(string[] args)
{
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
connStr.TrustServerCertificate = true;
using (var conn = new NpgsqlConnection(connStr.ToString()))
{
Console.Out.WriteLine("Opening connection");
conn.Open();
using (var command = new NpgsqlCommand("SELECT * FROM pharmacy", conn))
{
var reader = command.ExecuteReader();
while (reader.Read())
{
Console.WriteLine(
string.Format(
"Reading from table=({0}, {1}, {2}, {3}, {4})",
reader.GetInt32(0).ToString(),
reader.GetString(1),
reader.GetString(2),
reader.GetString(3),
reader.GetInt32(4).ToString()
)
);
}
reader.Close();
}
}
Console.WriteLine("Press RETURN to exit");
Console.ReadLine();
}
}
}
データの更新
接続して UPDATE SQL ステートメントを使ってデータを更新するには、以下のコードを使います。 このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。
using System;
using Npgsql;
namespace Driver
{
public class AzurePostgresUpdate
{
static void Main(string[] args)
{
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
connStr.TrustServerCertificate = true;
using (var conn = new NpgsqlConnection(connStr.ToString()))
{
Console.Out.WriteLine("Opening connection");
conn.Open();
using (var command = new NpgsqlCommand("UPDATE pharmacy SET city = @q WHERE pharmacy_id = @n", conn))
{
command.Parameters.AddWithValue("n", 0);
command.Parameters.AddWithValue("q", "guntur");
int nRows = command.ExecuteNonQuery();
Console.Out.WriteLine(String.Format("Number of rows updated={0}", nRows));
}
}
Console.WriteLine("Press RETURN to exit");
Console.ReadLine();
}
}
}
データの削除
以下のコードを使って接続し、DELETE SQL ステートメントを使ってデータを削除します。 このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。
using System;
using Npgsql;
namespace Driver
{
public class AzurePostgresDelete
{
static void Main(string[] args)
{
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = {your password}; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
connStr.TrustServerCertificate = true;
using (var conn = new NpgsqlConnection(connStr.ToString()))
{
Console.Out.WriteLine("Opening connection");
conn.Open();
using (var command = new NpgsqlCommand("DELETE FROM pharmacy WHERE pharmacy_id = @n", conn))
{
command.Parameters.AddWithValue("n", 0);
int nRows = command.ExecuteNonQuery();
Console.Out.WriteLine(String.Format("Number of rows deleted={0}", nRows));
}
}
Console.WriteLine("Press RETURN to exit");
Console.ReadLine();
}
}
}
高速取り込み用の COPY コマンド
COPY コマンドでは、Azure Cosmos DB for PostgreSQL にデータを取り込みながら、驚異的なスループットを達成できます。 COPY コマンドでは、ファイル内のデータを取り込んだり、メモリ内データのマイクロバッチからデータを取り込んだりすることで、リアルタイム インジェストを実現できます。
ファイルからデータを読み込む COPY コマンド
次のコード例では、CSV ファイルからデータベース テーブルにデータをコピーします。
このコード サンプルでは、ファイル pharmacies.csv が Documents フォルダーに含まれている必要があります。 このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。
using Npgsql;
public class csvtotable
{
static void Main(string[] args)
{
String sDestinationSchemaAndTableName = "pharmacy";
String sFromFilePath = "C:\\Users\\Documents\\pharmacies.csv";
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
connStr.TrustServerCertificate = true;
NpgsqlConnection conn = new NpgsqlConnection(connStr.ToString());
NpgsqlCommand cmd = new NpgsqlCommand();
conn.Open();
if (File.Exists(sFromFilePath))
{
using (var writer = conn.BeginTextImport("COPY " + sDestinationSchemaAndTableName + " FROM STDIN WITH(FORMAT CSV, HEADER true,NULL ''); "))
{
foreach (String sLine in File.ReadAllLines(sFromFilePath))
{
writer.WriteLine(sLine);
}
}
Console.WriteLine("csv file data copied sucessfully");
}
}
}
メモリ内データを読み込むための COPY コマンド
次のコード例では、メモリ内データをテーブルにコピーします。 このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。
using Npgsql;
using NpgsqlTypes;
namespace Driver
{
public class InMemory
{
static async Task Main(string[] args)
{
// Replace <cluster> with your cluster name and <password> with your password:
var connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50 ");
connStr.TrustServerCertificate = true;
using (var conn = new NpgsqlConnection(connStr.ToString()))
{
conn.Open();
var text = new dynamic[] { 0, "Target", "Sunnyvale", "California", 94001 };
using (var writer = conn.BeginBinaryImport("COPY pharmacy FROM STDIN (FORMAT BINARY)"))
{
writer.StartRow();
foreach (var item in text)
{
writer.Write(item);
}
writer.Complete();
}
Console.WriteLine("in-memory data copied sucessfully");
}
}
}
}
データベース要求の失敗によるアプリの再試行
アプリケーションからのデータベース要求が失敗する場合があります。 このような問題は、アプリとデータベース間のネットワーク障害や、パスワードの誤りなど、さまざまなシナリオで発生することがあります。一部の問題は一時的なもので、数秒から数分で自然に解決されます。 一時的なエラーを克服するために、アプリで再試行ロジックを構成することができます。
アプリで再試行ロジックを構成すると、エンド ユーザー エクスペリエンスの向上に役立ちます。 障害シナリオでは、ユーザーはエラーを経験するかわりに、アプリケーションで要求が処理されるまで少し長く待つことになります。
次の例は、アプリに再試行ロジックを実装する方法を示しています。 サンプル コード スニペットでは、成功するまで 60 秒ごとに (最大 5 回) データベース要求を試行します。 再試行の回数と頻度はアプリケーションのニーズに基づいて構成できます。
このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。
using System;
using System.Data;
using System.Runtime.InteropServices;
using System.Text;
using Npgsql;
namespace Driver
{
public class Reconnect
{
// Replace <cluster> with your cluster name and <password> with your password:
static string connStr = new NpgsqlConnectionStringBuilder("Server = c-<cluster>.<uniqueID>.postgres.cosmos.azure.com; Database = citus; Port = 5432; User Id = citus; Password = <password>; Ssl Mode = Require; Pooling = true; Minimum Pool Size=0; Maximum Pool Size =50;TrustServerCertificate = true").ToString();
static string executeRetry(string sql, int retryCount)
{
for (int i = 0; i < retryCount; i++)
{
try
{
using (var conn = new NpgsqlConnection(connStr))
{
conn.Open();
DataTable dt = new DataTable();
using (var _cmd = new NpgsqlCommand(sql, conn))
{
NpgsqlDataAdapter _dap = new NpgsqlDataAdapter(_cmd);
_dap.Fill(dt);
conn.Close();
if (dt != null)
{
if (dt.Rows.Count > 0)
{
int J = dt.Rows.Count;
StringBuilder sb = new StringBuilder();
for (int k = 0; k < dt.Rows.Count; k++)
{
for (int j = 0; j < dt.Columns.Count; j++)
{
sb.Append(dt.Rows[k][j] + ",");
}
sb.Remove(sb.Length - 1, 1);
sb.Append("\n");
}
return sb.ToString();
}
}
}
}
return null;
}
catch (Exception e)
{
Thread.Sleep(60000);
Console.WriteLine(e.Message);
}
}
return null;
}
static void Main(string[] args)
{
string result = executeRetry("select 1",5);
Console.WriteLine(result);
}
}
}
次のステップ
- Azure Cosmos DB for PostgreSQL API で PostgreSQL がどのように拡張されるかを確認し、便利な診断クエリを試す
- ワークロードに最適なクラスター サイズを選択する
- クラスター パフォーマンスを監視する