Kafka コネクタを使用してデータを移動する
Apache Kafka は、分散された方法でイベントをストリーミングするために使用されるオープンソース プラットフォームです。 多くの企業では、大規模なハイパフォーマンスのデータ統合シナリオに Kafka を使用しています。 Kafka Connect は、Kafka と他のデータ システムの間でデータをストリーミングする、スイート内のツールです。 当然ながら、これには、データのソースまたはデータのターゲット (シンク) として Azure Cosmos DB が含まれる場合があります。
セットアップ
Azure Cosmos DB 用の Kafka Connect コネクタは、GitHub (microsoft/kafka-connect-cosmosdb) でオープンソース プロジェクトとして利用できます。 JAR ファイルを手動でダウンロードしてインストールする手順については、リポジトリを参照してください。
構成
Azure Cosmos DB for NoSQL アカウントへの接続を適切に構成するには、4 つの構成プロパティを設定する必要があります。
| プロパティ | 価値 |
|---|---|
| connect.cosmos.connection.endpoint | アカウント エンドポイント URI |
| connect.cosmos.master.key | アカウント キー |
| connect.cosmos.databasename | データベース リソースの名前 |
| connect.cosmos.containers.topicmap | CSV 形式を使用した、コンテナーへの Kafka トピックのマッピング |
コンテナーへのトピックのマップ
各コンテナーをトピックにマップする必要があります。 たとえば、products コンテナーを prodlistener トピックにマップし、customers コンテナーを custlistener トピックにマップするとします。 その場合は、CSV マッピング文字列 prodlistener#products,custlistener#customers を使用する必要があります。
Azure Cosmos DB に書き込む
トピックを作成して、Azure Cosmos DB にデータを書き込みしましょう。 Apache Kafka では、すべてのメッセージがトピックを介して送信されます。
新しいトピックを作成するには、kafka-topics コマンドを使用します。 この例では、prodlistener という名前の新しいトピックを作成します。
kafka-topics --create \
--zookeeper localhost:2181 \
--topic prodlistener \
--replication-factor 1 \
--partitions 1
次のコマンドによりプロデューサーが開始され、prodlistener トピックに 3 つのレコードを書き込むことができます。
kafka-console-producer \
--broker-list localhost:9092 \
--topic prodlistener
次に、コンソールで、これらの 3 つのレコードをトピックに入力できます。 これが完了すると、これらのレコードは、トピックにマップされた Azure Cosmos DB for NoSQL コンテナー (products) にコミットされます。
{"id": "0ac8b014-c3f4-4db0-8a1f-434bab460938", "name": "handlebar", "categoryId": "78148556-4e84-44be-abae-9755dde9c9e3"}
{"id": "54ba00da-50cf-44d8-b122-1d18bd1db400", "name": "handlebar", "categoryId": "eb642a5e-0c6f-4c83-b96b-bb2903b85e59"}
{"id": "381dde84-e6c2-4583-b66c-e4a4116f7d6e", "name": "handlebar", "categoryId": "cf8ae707-6d74-4563-831a-06e15a70a0dc"}
Azure Cosmos DB から読み込む
JSON 構成オブジェクトを使用して、Kafka Connect でソース コネクタを作成できます。 次のサンプル構成では、ほとんどのプロパティは変更しませんが、次の値は必ず変更してください。
| プロパティ | 説明 |
|---|---|
| connect.cosmos.connection.endpoint | 実際のアカウント エンドポイント URI |
| connect.cosmos.master.key | 実際のアカウント キー |
| connect.cosmos.databasename | 実際のアカウント データベース リソースの名前 |
| connect.cosmos.containers.topicmap | CSV 形式を使用した、コンテナーへの実際の Kafka トピックのマッピング |
{
"name": "cosmosdb-source-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connect.cosmos.task.poll.interval": "100",
"connect.cosmos.connection.endpoint": "<cosmos-endpoint>",
"connect.cosmos.master.key": "<cosmos-key>",
"connect.cosmos.databasename": "<cosmos-database>",
"connect.cosmos.containers.topicmap": "<kafka-topic>#<cosmos-container>",
"connect.cosmos.offset.useLatest": false,
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false"
}
}
わかりやすい例として、次の構成テーブルの例を使用します。
| プロパティ | 説明 |
|---|---|
| connect.cosmos.connection.endpoint | https://dp420.documents.azure.com:443/ |
| connect.cosmos.master.key | C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw== |
| connect.cosmos.databasename | cosmicworks |
| connect.cosmos.containers.topicmap | prodlistener#products |
構成ファイルの例を次に示します。
{
"name": "cosmosdb-source-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connect.cosmos.task.poll.interval": "100",
"connect.cosmos.connection.endpoint": "https://dp420.documents.azure.com:443/",
"connect.cosmos.master.key": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
"connect.cosmos.databasename": "cosmicworks",
"connect.cosmos.containers.topicmap": "prodlistener#products",
"connect.cosmos.offset.useLatest": false,
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false"
}
}
構成が完了すると、Azure Cosmos DB 変更フィードのデータが Kafka トピックに発行されます。