Ruby を使用し、Azure Cosmos DB for PostgreSQL に接続して SQL コマンドを実行する

適用対象: Azure Cosmos DB for PostgreSQL (PostgreSQL の Citus データベース拡張機能を利用)

このクイックスタートでは、Ruby コードを使用してクラスターに接続する方法と、SQL ステートメントを使用してテーブルを作成する方法について説明します。 その後で、データベース内のデータの挿入、クエリ、更新、削除を実行します。 この記事内の手順では、Ruby の開発には慣れているものの、Azure Cosmos DB for PostgreSQL を使用するのは初めてというユーザーを対象とします。

PostgreSQL ライブラリをインストールする

この記事のコード例では、pg gem が必要です。 言語パッケージ マネージャー (bundler など) を使用して pg をインストールする必要があります。

接続、テーブルの作成、およびデータの挿入

接続し、CREATE TABLE SQL ステートメントでテーブルを作成してから、INSERT INTO SQL ステートメントでそのテーブルに行を追加するには、次のコードを使用します。 このコードでは、コンストラクターを使用した PG::Connection オブジェクトを使用して、Azure Cosmos DB for PostgreSQL に接続します。 次に、exec() メソッドを呼び出して、DROP、CREATE TABLE、INSERT INTO の各コマンドを実行します。 PG::Error クラスを使用して、エラーをチェックします。 その後、close() メソッドを呼び出して、終了する前に接続を閉じます。

このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。

require 'pg'
begin
    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.azure.com port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database'

    # Drop previous table of same name if one exists
    connection.exec('DROP TABLE IF EXISTS pharmacy;')
    puts 'Finished dropping table (if existed).'

    # Drop previous table of same name if one exists.
    connection.exec('CREATE TABLE pharmacy (pharmacy_id integer ,pharmacy_name text,city text,state text,zip_code integer);')
    puts 'Finished creating table.'

    # Insert some data into table.
    connection.exec("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);")
    connection.exec("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);")
    puts 'Inserted 2 rows of data.'

    # Create index
    connection.exec("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);") 
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

テーブルの分散

Azure Cosmos DB for PostgreSQL は、スケーラビリティを確保するために複数のノード全体にテーブルを分散させる優れた能力をユーザーに提供するものです。 テーブルを分散させるには、次のコマンドを使用します。 create_distributed_table および分散列の詳細については、こちらを参照してください。

注意

テーブルを分散させると、クラスターに追加されたすべてのワーカー ノード全体にテーブルを拡張することができます。

データベースに接続してテーブルを分散させるには、次のコードを使用します。 このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。

require 'pg'
begin
    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.azure.com port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database.'

    # Super power of distributed tables.
    connection.exec("select create_distributed_table('pharmacy','pharmacy_id');") 
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

データの読み取り

接続し、SELECT SQL ステートメントを使用してデータを読み取るには、次のコードを使用します。

このコードでは、exec() メソッドを呼び出して SELECT コマンドを実行します。その際、結果は結果セットに保持されます。 結果セットのコレクションは resultSet.each do ループを使用して反復処理され、現在の行の値が row 変数に保持されます。 このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。

require 'pg'
begin
    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.azure.com port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database.'

    resultSet = connection.exec('SELECT * from pharmacy')
    resultSet.each do |row|
        puts 'Data row = (%s, %s, %s, %s, %s)' % [row['pharmacy_id'], row['pharmacy_name'], row['city'], row['state'], row['zip_code ']]
    end
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

データの更新

接続し、UPDATE SQL ステートメントを使用してデータを更新するには、次のコードを使用します。 このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。

require 'pg'
begin
    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.azure.com port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database.'

    # Modify some data in table.
    connection.exec('UPDATE pharmacy SET city = %s WHERE pharmacy_id = %d;' % ['\'guntur\'',100])
    puts 'Updated 1 row of data.'
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

データの削除

接続し、DELETE SQL ステートメントを使用してデータを削除するには、次のコードを使用します。 このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。

require 'pg'
begin
    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.azure.com port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database.'

    # Delete some data in table.
    connection.exec('DELETE FROM pharmacy WHERE city = %s;' % ['\'guntur\''])
    puts 'Deleted 1 row of data.'
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

COPY コマンドによる超高速取り込み

COPY コマンドでは、Azure Cosmos DB for PostgreSQL にデータを取り込みながら、驚異的なスループットを達成できます。 COPY コマンドでは、ファイル内のデータを取り込んだり、メモリ内データのマイクロバッチからデータを取り込んだりすることで、リアルタイム インジェストを実現できます。

ファイルからデータを読み込む COPY コマンド

次のコードは、CSV ファイルからデータベース テーブルにデータをコピーします。 pharmacies.csv というファイルが必要になります。 このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。

require 'pg'
begin
    filename = String('pharmacies.csv')

    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.azure.com port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database.'

    # Copy the data from Csv to table.
    result = connection.copy_data "COPY pharmacy FROM STDIN with csv" do
        File.open(filename , 'r').each do |line|
            connection.put_copy_data line
        end
    puts 'Copied csv data successfully.'
    end      
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

メモリ内データを読み込むための COPY コマンド

次のコードは、メモリ内データをテーブルにコピーします。 このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。

require 'pg'
begin
    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.azure.com port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database.'

    enco = PG::TextEncoder::CopyRow.new
    connection.copy_data "COPY pharmacy FROM STDIN", enco do
        connection.put_copy_data [5000,'Target','Sunnyvale','California','94001']
        connection.put_copy_data [5001, 'CVS','San Francisco','California','94002']
        puts 'Copied in-memory data successfully.'
    end
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

データベース要求の失敗によるアプリの再試行

アプリケーションからのデータベース要求が失敗する場合があります。 このような問題は、アプリとデータベース間のネットワーク障害や、パスワードの誤りなど、さまざまなシナリオで発生することがあります。一部の問題は一時的なもので、数秒から数分で自然に解決されます。 一時的なエラーを克服するために、アプリで再試行ロジックを構成することができます。

アプリで再試行ロジックを構成すると、エンド ユーザー エクスペリエンスの向上に役立ちます。 障害シナリオでは、ユーザーはエラーを経験するかわりに、アプリケーションで要求が処理されるまで少し長く待つことになります。

次の例は、アプリに再試行ロジックを実装する方法を示しています。 サンプル コード スニペットでは、成功するまで 60 秒ごとに (最大 5 回) データベース要求を試行します。 再試行の回数と頻度はアプリケーションのニーズに基づいて構成できます。

このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。

require 'pg'

def executeretry(sql,retryCount)
  begin
    for a in 1..retryCount do
      begin
        # NOTE: Replace <cluster> and <password> in the connection string.
        connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.azure.com port=5432 dbname=citus user=citus password=<password> sslmode=require")
        resultSet = connection.exec(sql)
        return resultSet.each
      rescue PG::Error => e
        puts e.message
        sleep 60
      ensure
        connection.close if connection
      end
    end
  end
  return nil
end

var = executeretry('select 1',5)

if var !=nil then
  var.each do |row|
    puts 'Data row = (%s)' % [row]
  end
end

次の手順