Kafka コネクタを使用してデータを移動する

完了

Apache Kafka は、分散された方法でイベントをストリーミングするために使用されるオープンソース プラットフォームです。 多くの企業では、大規模なハイパフォーマンスのデータ統合シナリオに Kafka を使用しています。 Kafka Connect は、Kafka と他のデータ システムの間でデータをストリーミングする、スイート内のツールです。 当然ながら、これには、データのソースまたはデータのターゲット (シンク) として Azure Cosmos DB が含まれる場合があります。

セットアップ

Azure Cosmos DB 用の Kafka Connect コネクタは、GitHub (microsoft/kafka-connect-cosmosdb) でオープンソース プロジェクトとして利用できます。 JAR ファイルを手動でダウンロードしてインストールする手順については、リポジトリを参照してください。

構成

Azure Cosmos DB for NoSQL アカウントへの接続を適切に構成するには、4 つの構成プロパティを設定する必要があります。

プロパティ 価値
connect.cosmos.connection.endpoint アカウント エンドポイント URI
connect.cosmos.master.key アカウント キー
connect.cosmos.databasename データベース リソースの名前
connect.cosmos.containers.topicmap CSV 形式を使用した、コンテナーへの Kafka トピックのマッピング

コンテナーへのトピックのマップ

各コンテナーをトピックにマップする必要があります。 たとえば、products コンテナーを prodlistener トピックにマップし、customers コンテナーを custlistener トピックにマップするとします。 その場合は、CSV マッピング文字列 prodlistener#products,custlistener#customers を使用する必要があります。

Azure Cosmos DB に書き込む

トピックを作成して、Azure Cosmos DB にデータを書き込みしましょう。 Apache Kafka では、すべてのメッセージがトピックを介して送信されます。

新しいトピックを作成するには、kafka-topics コマンドを使用します。 この例では、prodlistener という名前の新しいトピックを作成します。

kafka-topics --create \
    --zookeeper localhost:2181 \
    --topic prodlistener \
    --replication-factor 1 \
    --partitions 1

次のコマンドによりプロデューサーが開始され、prodlistener トピックに 3 つのレコードを書き込むことができます。

kafka-console-producer \
    --broker-list localhost:9092 \
    --topic prodlistener

次に、コンソールで、これらの 3 つのレコードをトピックに入力できます。 これが完了すると、これらのレコードは、トピックにマップされた Azure Cosmos DB for NoSQL コンテナー (products) にコミットされます。

{"id": "0ac8b014-c3f4-4db0-8a1f-434bab460938", "name": "handlebar", "categoryId": "78148556-4e84-44be-abae-9755dde9c9e3"}
{"id": "54ba00da-50cf-44d8-b122-1d18bd1db400", "name": "handlebar", "categoryId": "eb642a5e-0c6f-4c83-b96b-bb2903b85e59"}
{"id": "381dde84-e6c2-4583-b66c-e4a4116f7d6e", "name": "handlebar", "categoryId": "cf8ae707-6d74-4563-831a-06e15a70a0dc"}

Azure Cosmos DB から読み込む

JSON 構成オブジェクトを使用して、Kafka Connect でソース コネクタを作成できます。 次のサンプル構成では、ほとんどのプロパティは変更しませんが、次の値は必ず変更してください。

プロパティ 説明
connect.cosmos.connection.endpoint 実際のアカウント エンドポイント URI
connect.cosmos.master.key 実際のアカウント キー
connect.cosmos.databasename 実際のアカウント データベース リソースの名前
connect.cosmos.containers.topicmap CSV 形式を使用した、コンテナーへの実際の Kafka トピックのマッピング
{
  "name": "cosmosdb-source-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "connect.cosmos.task.poll.interval": "100",
    "connect.cosmos.connection.endpoint": "<cosmos-endpoint>",
    "connect.cosmos.master.key": "<cosmos-key>",
    "connect.cosmos.databasename": "<cosmos-database>",
    "connect.cosmos.containers.topicmap": "<kafka-topic>#<cosmos-container>",
    "connect.cosmos.offset.useLatest": false,
    "value.converter.schemas.enable": "false",
    "key.converter.schemas.enable": "false"
  }
}

わかりやすい例として、次の構成テーブルの例を使用します。

プロパティ 説明
connect.cosmos.connection.endpoint https://dp420.documents.azure.com:443/
connect.cosmos.master.key C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==
connect.cosmos.databasename cosmicworks
connect.cosmos.containers.topicmap prodlistener#products

構成ファイルの例を次に示します。

{
  "name": "cosmosdb-source-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "connect.cosmos.task.poll.interval": "100",
    "connect.cosmos.connection.endpoint": "https://dp420.documents.azure.com:443/",
    "connect.cosmos.master.key": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
    "connect.cosmos.databasename": "cosmicworks",
    "connect.cosmos.containers.topicmap": "prodlistener#products",
    "connect.cosmos.offset.useLatest": false,
    "value.converter.schemas.enable": "false",
    "key.converter.schemas.enable": "false"
  }
}

構成が完了すると、Azure Cosmos DB 変更フィードのデータが Kafka トピックに発行されます。