Apache Kafka を使用して PostgreSQL から Azure Cosmos DB for Apache Cassandra アカウントにデータを移行する
適用対象: Cassandra
Azure Cosmos DB の Cassandra 用 API は、次のようなさまざまな理由により、Apache Cassandra 上で実行されているエンタープライズ ワークロードに適した選択肢になりました。
大幅なコスト削減: Azure Cosmos DB によってコストを節約できます。これには、VM、帯域幅、適用されるすべての Oracle ライセンスのコストが含まれます。 さらに、データ センター、サーバー、SSD ストレージ、ネットワーク、電気代を管理する必要がありません。
より優れたスケーラビリティと可用性: 単一障害点がなくなり、スケーラビリティとお使いのアプリケーションの可用性が向上します。
管理と監視のオーバーヘッドなし: Azure Cosmos DB は、フル マネージドのクラウド サービスとして、無数の設定を管理および監視する際のオーバーヘッドを解消します。
Kafka Connect は、Apache Kafka とその他のシステムとの間で、スケーラブルで信頼性の高い方法でデータをストリーム配信するためのプラットフォームです。 いくつかの既製のコネクタがサポートされているため、外部システムを Apache Kafka と統合するのにカスタム コードは不要です。
この記事では、Kafka コネクタの組み合わせを使用して、PostgreSQL などのリレーショナル データベースのレコードを Azure Cosmos DB for Apache Cassandra に継続的に同期するためにデータ パイプラインを設定する方法を示します。
概要
この記事で紹介するエンド ツー エンドのフローの概要を次に示します。
PostgreSQL テーブルのデータは、Kafka Connect ソース コネクタである Debezium PostgreSQL コネクタを使用して Apache Kafka にプッシュされます。 PostgreSQL テーブル内のレコードの挿入、更新、削除は change data
イベントとして取得され、Kafka トピックに送信されます。 DataStax Apache Kafka コネクタ (Kafka Connect シンク コネクタ) は、パイプラインの 2 番目の部分を形成します。 これにより、Kafka トピックの変更データ イベントが Azure Cosmos DB for Apache Cassandra テーブルに同期されます。
Note
DataStax Apache Kafka コネクタに固有の機能を使用すると、複数のテーブルにデータをプッシュできます。 この例では、当該コネクタを使用して、さまざまなクエリ要件をサポートできる 2 つの Cassandra テーブルに変更データ レコードを保持できるようにします。
前提条件
基本的な設定
PostgreSQL データベースをまだ設定していない場合は設定します。
これは、既存のオンプレミス データベースでも、ローカル コンピューターにダウンロードしてインストールしたものでも構いません。 また、Docker コンテナーを使用することもできます。
Note
次の例では、パブリック コンテナー イメージを Docker Hub からプルします。 匿名の pull request を行うのではなく、最初に Docker Hub アカウント (docker login
) で認証を行うことをお勧めします。 パブリック コンテンツを使用するときの信頼性を向上させるには、プライベートの Azure Container Registry にイメージをインポートして管理します。 パブリック イメージの操作に関する詳細を参照してください。
コンテナーを開始するには、次のようにします。
docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres
psql
クライアントを使用して PostgreSQL インスタンスに接続します。
psql -h localhost -p 5432 -U postgres -W -d postgres
サンプルの注文情報を格納するテーブルを作成します。
CREATE SCHEMA retail;
CREATE TABLE retail.orders_info (
orderid SERIAL NOT NULL PRIMARY KEY,
custid INTEGER NOT NULL,
amount INTEGER NOT NULL,
city VARCHAR(255) NOT NULL,
purchase_time VARCHAR(40) NOT NULL
);
Azure portal を使用して、デモ アプリケーションに必要な Cassandra キースペースとテーブルを作成します。
Note
以下のように、同じキースペースとテーブル名を使用してください
CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
Apache Kafka を設定する
この記事ではローカル クラスターを使用しますが、他のオプションを選択することもできます。 Kafka をダウンロードして解凍し、Zookeeper と Kafka クラスターを開始します。
cd <KAFKA_HOME>/bin
#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties
コネクタを設定する
Debezium PostgreSQL および DataStax Apache Kafka のコネクタをインストールします。 Debezium PostgreSQL コネクタのプラグイン アーカイブをダウンロードします。 たとえば、(執筆時点で最新の) バージョン 1.3.0 のコネクタをダウンロードするには、こちらのリンクを使用します。 DataStax Apache Kafka コネクタは、こちらのリンクからダウンロードします。
両方のコネクタ アーカイブを解凍し、JAR ファイルを Kafka Connect plugin.path にコピーします。
cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs
Kafka Connect を構成してデータ パイプラインを開始する
Kafka Connect クラスターを開始する
cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties
PostgreSQL コネクタ インスタンスを開始する
コネクタ構成 (JSON) をファイルの例 pg-source-config.json
に保存します
{
"name": "pg-orders-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "postgres",
"database.server.name": "myserver",
"plugin.name": "wal2json",
"table.include.list": "retail.orders_info",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
PostgreSQL コネクタ インスタンスを開始するには、次のようにします。
curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors
Note
削除するには、curl -X DELETE http://localhost:8083/connectors/pg-orders-source
を使用します。
データの挿入
orders_info
テーブルには、注文 ID、顧客 ID、市区町村などの注文の詳細が含まれています。下の SQL を使用して、このテーブルにランダムなデータを設定します。
insert into retail.orders_info (
custid, amount, city, purchase_time
)
select
random() * 10000 + 1,
random() * 200,
('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);
10 件のレコードがテーブルに挿入されます。 ご自分の要件に従って、下部にある generate_series(1, 10)
のレコード数を必ず更新してください。たとえば、100
件のレコードを挿入するには、generate_series(1, 100)
を使用します
確認するには、次のようにします。
select * from retail.orders_info;
Kafka トピックの変更データ キャプチャ イベントを確認します
cd <KAFKA_HOME>/bin
./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning
変更データ イベントが JSON 形式で表示されます。
DataStax Apache Kafka コネクタ インスタンスを開始する
コネクタ構成 (JSON) をファイルの例、cassandra-sink-config.json
に保存し、お使いの環境に合わせてプロパティを更新します。
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "myserver.retail.orders_info",
"contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
"loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
"ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
"port": 10350,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"auth.username": "<Azure Cosmos DB user name (same as account name)>",
"auth.password": "<Azure Cosmos DB password>",
"topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"offset.flush.interval.ms": 10000
}
}
コネクタ インスタンスを開始するには、次のようにします。
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
コネクタが動作を開始し、PostgreSQL から Azure Cosmos DB へのエンド ツー エンドのパイプラインが動作可能になります。
Azure Cosmos DB に対してクエリを実行する
Azure Cosmos DB の Cassandra テーブルを確認します。 試すことができるクエリをいくつか次に示します。
select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;
select * from retail.orders_by_customer;
select * from retail.orders_by_city;
select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;
引き続き PostgreSQL にさらにデータを挿入し、レコードが Azure Cosmos DB に同期されていることを確認できます。
次のステップ
- Kafka Connect を使用して Apache Kafka と Azure Cosmos DB for Apache Cassandra を統合する
- 変更データ キャプチャ用に Azure Event Hubs の Apache Kafka Connect (プレビュー) を Debezium と統合する
- Arcion を使用して Oracle から Azure Cosmos DB for Apache Cassandra にデータを移行する
- コンテナーとデータベースのスループットのプロビジョニング
- パーティション キーのベスト プラクティス
- 「Azure Cosmos DB Capacity Planner を使用して RU/秒を見積もる」の記事