変更データ キャプチャ用に Azure Event Hubs の Apache Kafka Connect のサポートを Debezium と統合する
変更データ キャプチャ (CDC) は、作成、更新、削除操作に応答して、データベース テーブル内の行レベルの変更を追跡するために使用される手法です。 Debezium は、さまざまなデータベースで使用できる変更データ キャプチャ機能 (PostgreSQL の論理デコードなど) に基づいて構築された分散プラットフォームです。 これにより、Kafka Connect コネクタのセットが提供されます。これらのコネクタは、データベース テーブル内の行レベルの変更を活用し、それらを、後で Apache Kafka に送信されるイベント ストリームに変換します。
このチュートリアルでは、Event Hubs (Kafka 用)、Azure DB for PostgreSQL、Debezium を使用して、Azure で変更データ キャプチャ ベースのシステムを設定する方法について説明します。 これは、Debezium PostgreSQL コネクタを使用して、データベース変更を PostgreSQL から Event Hubs の Kafka トピックにストリーミングします。
Note
この記事には、Microsoft が使用しなくなった用語への言及が含まれています。 ソフトウェアからこの用語が削除された時点で、この記事から削除します。
このチュートリアルでは、次の手順を実行します。
- Event Hubs 名前空間を作成します
- Azure Database for PostgreSQL を設定および構成する
- Debezium PostgreSQL コネクタを使用して Kafka Connect を構成して実行する
- 変更データ キャプチャをテストする
- (省略可能)
FileStreamSink
コネクタを使用して変更データ イベントを消費する
前提条件
このチュートリアルを完了するには、次の作業が必要です。
- Azure のサブスクリプション。 アカウントがない場合は、無料アカウントを作成してください。
- Linux または macOS
- Kafka リリース (バージョン 1.1.1、Scala バージョン 2.11)。kafka.apache.org から入手できます。
- Apache Kafka 用 Event Hubs の概要に関する記事を読む。
Event Hubs 名前空間を作成します
Event Hubs サービスとの間で送受信を行うには、イベント ハブの名前空間が必要です。 名前空間とイベント ハブを作成する手順については、イベント ハブの作成に関するページを参照してください。 Event Hubs の接続文字列と完全修飾ドメイン名 (FQDN) を、後で使用するために取得します。 手順については、「Get an Event Hubs connection string (Event Hubs の接続文字列を取得する)」を参照してください。
Azure Database for PostgreSQL を設定および構成する
Azure Database for PostgreSQL は、オープン ソースの PostgreSQL データベース エンジンのコミュニティ バージョンに基づいたリレーショナル データベース サービスであり、シングル サーバー、フレキシブル サーバー、Cosmos DB for PostgreSQL の 3 つのデプロイ オプションで使用できます。 Azure portal を使用して Azure Database for PostgreSQL サーバーを作成するには、この手順に従ってください。
Kafka Connect を設定して実行する
このセクションは、次のトピックで構成されています。
- Debezium コネクタのインストール
- Event Hubs 用の Kafka Connect の構成
- Debezium コネクタを使用した Kafka Connect クラスターの起動
Debezium コネクタをダウンロードして設定する
Debezium のドキュメントにある最新の手順に従って、コネクタをダウンロードして設定します。
- コネクタのプラグイン アーカイブをダウンロードします。 たとえば、コネクタのバージョン
1.2.0
をダウンロードするには、リンク https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.2.0.Final/debezium-connector-postgres-1.2.0.Final-plugin.tar.gz を使用します。 - JAR ファイルを抽出し、それを Kafka Connect の plugin.path にコピーします。
Event Hubs 用に Kafka Connect を構成する
Kafka Connect のスループットを Kafka から Event Hubs にリダイレクトする際に、最小限の再構成が必要となります。 次の connect-distributed.properties
サンプルは、Event Hubs 上の Kafka エンドポイントに対して認証と通信を行うように Connect を構成する方法を示しています。
重要
- Debezium により、テーブルごとのトピックと、一連のメタデータ トピックが自動的に作成されます。 Kafka のトピックは、Event Hubs のインスタンス (イベント ハブ) に対応します。 Apache Kafka と Azure Event Hubs の対応については、「Kafka と Event Hubs の概念のマッピング」をご覧ください。
- Event Hubs 名前空間内のイベント ハブの数には、レベル (Basic、Standard、Premium、Dedicated) に応じて異なる制限があります。 これらの制限については、「クォータ」をご覧ください。
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
重要
{YOUR.EVENTHUBS.CONNECTION.STRING}
を Event Hubs 名前空間への接続文字列に置き換えます。 接続文字列を取得する手順については、「Event Hubs の接続文字列の取得」を参照してください。 構成の例には、sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
などがあります。
Kafka Connect を実行する
この手順では、Event Hubs を使用し、Kafka Connect ワーカーをローカルから分散モードで開始して、クラスターの状態を維持します。
- 前出の
connect-distributed.properties
ファイルをローカルに保存します。 中かっこで囲んだ値はすべて置き換えてください。 - お使いのマシン上にある Kafka リリースの場所に移動します。
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
を実行し、クラスターが起動するまで待ちます。
Note
Kafka Connect は、Kafka AdminClient API を使用して、圧縮などの推奨される構成を含むトピックを自動的に作成します。 Azure portal で名前空間をざっとチェックすると、Connect ワーカーの内部的なトピックが自動的に作成されていることがわかります。
Kafka Connect の内部トピックでは、圧縮を使用する必要があります。 Connect の内部トピックが正しく構成されていない場合、Event Hubs チームでは不適切な構成を修正する責任を負いません。
Debezium PostgreSQL ソース コネクタを構成して起動する
PostgreSQL ソース コネクタの構成ファイル (pg-source-connector.json
) を作成し、Azure PostgreSQL インスタンスに従って値を置き換えます。
{
"name": "todo-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
"database.port": "5432",
"database.user": "<replace with database user name>",
"database.password": "<replace with database password>",
"database.dbname": "postgres",
"database.server.name": "my-server",
"plugin.name": "wal2json",
"table.whitelist": "public.todos"
}
}
ヒント
database.server.name
属性は、監視対象の特定の PostgreSQL データベース サーバーおよびクラスターの名前空間を識別および提供する論理名です。
コネクタのインスタンスを作成するには、Kafka Connect の REST API エンドポイントを使用します。
curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors
コネクタの状態を確認するには、次のようにします。
curl -s http://localhost:8083/connectors/todo-connector/status
変更データ キャプチャをテストする
動作中の変更データ キャプチャを確認するには、Azure PostgreSQL データベースのレコードを作成、更新、削除する必要があります。
まず、Azure PostgreSQL データベースに接続します (次の例では psql を使用しています)。
psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require
e.g.
psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require
テーブルを作成し、レコードを挿入する
CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));
INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');
これでコネクタが動作を開始し、my-server.public.todos
という名前を持つ Event Hubs トピックに変更データ イベントが送信されます。ここで、my-server
は database.server.name
の値であり、public.todos
は (table.whitelist
構成に従って) 変更を追跡しているテーブルであるとします。
Event Hubs トピックを確認する
すべてが期待どおりに動作していることを確認するために、このトピックの内容を調べてみましょう。 次の例では kafkacat
を使用しますが、ここに一覧表示されているオプションのいずれかを使用してコンシューマーを作成することもできます。
次の内容を含む kafkacat.conf
という名前のファイルを作成します。
metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>
注意
kafkacat.conf
内の metadata.broker.list
および sasl.password
属性を Event Hubs の情報に従って更新してください。
別のターミナルで、コンシューマーを起動します。
export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos
kafkacat -b $BROKER -t $TOPIC -o beginning
todos
テーブルに追加した行に応答して、PostgreSQL で生成された変更データ イベントを表す JSON ペイロードが表示されます。 そのペイロードのスニペットを次に示します。
{
"schema": {...},
"payload": {
"before": null,
"after": {
"id": 1,
"description": "setup postgresql on azure",
"todo_status": "complete"
},
"source": {
"version": "1.2.0.Final",
"connector": "postgresql",
"name": "fulfillment",
"ts_ms": 1593018069944,
"snapshot": "last",
"db": "postgres",
"schema": "public",
"table": "todos",
"txId": 602,
"lsn": 184579736,
"xmin": null
},
"op": "c",
"ts_ms": 1593018069947,
"transaction": null
}
このイベントは payload
とその schema
(簡潔にするために省略しています) で構成されています。 payload
セクションでは、作成操作 ("op": "c"
) がどのように表されているかに注意してください。たとえば、"before": null
は、それが新しく INSERT
された行であったことを示し、after
はその行内の列の値を示し、source
は、このイベントが選択された元の PostgreSQL インスタンス メタデータを示しています。
更新または削除操作でも同じことを行い、変更データ イベントを調べてみることができます。 たとえば、configure and install connector
(id
を 3
とします) のタスクの状態を更新するには、次のようにします。
UPDATE todos SET todo_status = 'complete' WHERE id = 3;
(省略可能) FileStreamSink コネクタをインストールする
todos
テーブルのすべての変更が Event Hubs トピックにキャプチャされたので、FileStreamSink コネクタ (Kafka Connect では既定で使用可能) を使用して、これらのイベントを使用します。
コネクタの構成ファイル (file-sink-connector.json
) を作成します (ファイル システムに従って file
属性を置き換えます)。
{
"name": "cdc-file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "my-server.public.todos",
"file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
}
}
コネクタを作成し、その状態を確認するには、次のようにします。
curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors
curl http://localhost:8083/connectors/cdc-file-sink/status
データベース レコードを挿入/更新/削除し、構成された出力シンク ファイルでこれらのレコードを監視します。
tail -f /Users/foo/todos-cdc.txt
クリーンアップ
Kafka Connect では、イベント ハブのトピックを作成して、Kafka Connect クラスターが停止した後も永続的に保持される構成、オフセット、状態を格納します。 この永続化が必要な場合を除いて、これらのトピックを削除することをお勧めします。 また、このチュートリアルで作成された my-server.public.todos
イベント ハブを削除することもできます。
次のステップ
Kafka 用 Event Hubs の詳細については、次の記事を参照してください。