Kafka Connect を使用して Apache Kafka から Azure Cosmos DB for Apache Cassandra にデータを取り込む
適用対象: Cassandra
既存の Cassandra アプリケーションでは、CQLv4 ドライバーの互換性により、Azure Cosmos DB for Apache Cassandra を簡単に操作できます。 この機能を利用して Apache Kafka などのストリーミング プラットフォームと統合し、データを Azure Cosmos DB に取り込みます。
Apache Kafka (トピック) のデータは、他のアプリケーションによって使用されたり他のシステムに取り込まれたりした場合にのみ役立ちます。 選択した言語とクライアント SDK を使用して、Kafka Producer/Consumer API を使用してソリューションを構築することができます。 Kafka Connect には代替のソリューションがあります。 これは、Apache Kafka とその他のシステムとの間で、スケーラブルで信頼性の高い方法でデータをストリーム配信するためのプラットフォームです。 Kafka Connect は Cassandra を含む市販のコネクタをサポートしているため、Kafka を Azure Cosmos DB for Apache Cassandra と統合するためのカスタム コードを記述する必要はありません。
この記事では、オープンソースの DataStax Apache Kafka Connector を使用します。これは Kafka Connect フレームワーク上で動作し、Kafka トピックから 1 つ以上の Cassandra テーブルの行にレコードを取り込みます。 この例は、Docker Compose を使用して再利用可能なセットアップを示しています。 これは、1 つのコマンドで必要なすべてのコンポーネントをローカルにブートストラップできるため、非常に便利です。 これらのコンポーネントには、Kafka、Zookeeper、Kafka Connect ワーカー、およびサンプル データ ジェネレーター アプリケーションなどがあります。
ここでは、コンポーネントとそのサービス定義の詳細について説明します。完全な docker-compose
ファイルは GitHub リポジトリで参照できます。
- Kafka と Zookeeper は debezium イメージを使用します。
- Docker コンテナーとして実行するために、DataStax Apache Kafka Connector は既存の Docker イメージの上に組み込まれています (debezium/connect-base)。 このイメージには、Kafka とその Kafka Connect ライブラリのインストールが含まれているため、カスタム コネクタの追加が非常に便利です。 Dockerfile を参照してください。
data-generator
サービスは、ランダムに生成された (JSON) データをweather-data
Kafka トピックにシードします。 コードとDockerfile
は GitHub リポジトリで参照できます
[前提条件]
Docker と Docker Compose のインストール
キースペース、テーブルを作成し、統合パイプラインを開始する
Azure portal を使用して、デモ アプリケーションに必要な Cassandra キースペースとテーブルを作成します。
Note
以下のように、同じキースペースとテーブル名を使用してください
CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
GitHub レポジトリを複製します。
git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka
すべてのサービスを開始します。
docker-compose --project-name kafka-cosmos-cassandra up --build
Note
コンテナーをダウンロードして起動するのにしばらく時間がかかる場合があります。これは、1 回だけ実行するプロセスです。
すべてのコンテナーが開始されているかどうかを確認するには、次のようにします。
docker-compose -p kafka-cosmos-cassandra ps
データ ジェネレーター アプリケーションにより、Kafka の weather-data
トピックへのデータのポンプが開始されます。 また、確認のために簡易サニティ チェックを実行することもできます。 Kafka Connect ワーカーを実行している Docker コンテナーを調べます。
docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash
コンテナー シェルにドロップすると、通常の Kafka コンソールのコンシューマー プロセスを開始するだけで、気象データ (JSON 形式) が流れてくるのがわかるはずです。
cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data
Cassandra Sink コネクタのセットアップ
次の JSON コンテンツをファイルにコピーします (これには cassandra-sink-config.json
という名前を付けることができます)。 セットアップに従って更新する必要があります。このセクションの残りの部分では、このトピックに関するガイダンスを提供します。
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "weather-data",
"contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
"port": 10350,
"loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
"auth.username": "<enter username for cosmosdb account>",
"auth.password": "<enter password for cosmosdb account>",
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
"ssl.keystore.password": "changeit",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"offset.flush.interval.ms": 10000
}
}
属性の概要は次のとおりです。
基本的な接続
contactPoints
: Azure Cosmos DB Cassandra のコンタクト ポイントを入力しますloadBalancing.localDc
: Azure Cosmos DB アカウントのリージョン (例: 東南アジア) を入力しますauth.username
: ユーザー名を入力しますauth.password
: パスワードを入力しますport
: ポート値を入力します (これは10350
であり、9042
ではありません。そのままにしておきます)
SSL の構成
Azure Cosmos DB では SSL 経由のセキュリティで保護された接続が適用され、Kafka Connect コネクタでは SSL もサポートされます。
ssl.keystore.path
: コンテナー内の JDK キーストアへのパス-/etc/alternatives/jre/lib/security/cacerts/
ssl.keystore.password
:JDK キーストア (既定) のパスワードssl.hostnameValidation
:独自のノードのホスト名検証を有効にしますssl.provider
: SSL プロバイダーとしてJDK
が使用されます
ジェネリック パラメーター
key.converter
:文字列コンバーターorg.apache.kafka.connect.storage.StringConverter
を使用しますvalue.converter
: Kafka トピックのデータは JSON であるため、org.apache.kafka.connect.json.JsonConverter
を使用しますvalue.converter.schemas.enable
:JSON ペイロードには関連付けられたスキーマがないため (デモ アプリの目的のため)、この属性をfalse
に設定して、スキーマを検索しないように Kafka Connect に指示する必要があります。 そうしないと、エラーが発生します。
コネクタをインストールする
Kafka Connect REST エンドポイントを使用してコネクタをインストールします。
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
状態を確認するには、次の操作を行います。
curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status
すべてがうまくいけば、コネクタがその魔法を紡ぎ始めるはずです。 Azure Cosmos DB に対して認証を行い、Kafka トピック (weather-data
) から Cassandra テーブル (weather.data_by_state
および weather.data_by_station
) へのデータの取り込みを開始する必要があります
これでテーブル内のデータに対してクエリを実行できるようになりました。 Azure portal に移動し、Azure Cosmos DB アカウントのホステッド CQL シェルを起動します。
Azure Cosmos DB からデータのクエリを実行する
data_by_state
および data_by_station
テーブルを確認します。 開始するためのサンプル クエリを以下にいくつか示します。
select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');
select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');
リソースをクリーンアップする
アプリと Azure Cosmos DB アカウントの使用を完了したら、それ以上料金がかからないように、作成した Azure リソースを削除できます。 リソースを削除するには、次の手順に従います。
Azure portal の検索バーで、「リソース グループ」を検索して選択します。
一覧から、このクイック スタートで作成したリソース グループを選択します。
リソース グループの [概要] ページで、[リソース グループの削除] を選択します。
次のウィンドウで、削除するリソース グループの名前を入力し、[削除] を選択します。