変更データ キャプチャ用に Azure Event Hubs の Apache Kafka Connect のサポートを Debezium と統合する

変更データ キャプチャ (CDC) は、作成、更新、削除操作に応答して、データベース テーブル内の行レベルの変更を追跡するために使用される手法です。 Debezium は、さまざまなデータベースで使用できる変更データ キャプチャ機能 (PostgreSQL の論理デコードなど) に基づいて構築された分散プラットフォームです。 これにより、Kafka Connect コネクタのセットが提供されます。これらのコネクタは、データベース テーブル内の行レベルの変更を活用し、それらを、後で Apache Kafka に送信されるイベント ストリームに変換します。

このチュートリアルでは、Event Hubs (Kafka 用)、Azure DB for PostgreSQL、Debezium を使用して、Azure で変更データ キャプチャ ベースのシステムを設定する方法について説明します。 これは、Debezium PostgreSQL コネクタを使用して、データベース変更を PostgreSQL から Event Hubs の Kafka トピックにストリーミングします。

Note

この記事には、Microsoft が使用しなくなった用語への言及が含まれています。 ソフトウェアからこの用語が削除された時点で、この記事から削除します。

このチュートリアルでは、次の手順を実行します。

  • Event Hubs 名前空間を作成します
  • Azure Database for PostgreSQL を設定および構成する
  • Debezium PostgreSQL コネクタを使用して Kafka Connect を構成して実行する
  • 変更データ キャプチャをテストする
  • (省略可能) FileStreamSink コネクタを使用して変更データ イベントを消費する

前提条件

このチュートリアルを完了するには、次の作業が必要です。

Event Hubs 名前空間を作成します

Event Hubs サービスとの間で送受信を行うには、イベント ハブの名前空間が必要です。 名前空間とイベント ハブを作成する手順については、イベント ハブの作成に関するページを参照してください。 Event Hubs の接続文字列と完全修飾ドメイン名 (FQDN) を、後で使用するために取得します。 手順については、「Get an Event Hubs connection string (Event Hubs の接続文字列を取得する)」を参照してください。

Azure Database for PostgreSQL を設定および構成する

Azure Database for PostgreSQL は、オープン ソースの PostgreSQL データベース エンジンのコミュニティ バージョンに基づいたリレーショナル データベース サービスであり、シングル サーバー、フレキシブル サーバー、Cosmos DB for PostgreSQL の 3 つのデプロイ オプションで使用できます。 Azure portal を使用して Azure Database for PostgreSQL サーバーを作成するには、この手順に従ってください。

Kafka Connect を設定して実行する

このセクションは、次のトピックで構成されています。

  • Debezium コネクタのインストール
  • Event Hubs 用の Kafka Connect の構成
  • Debezium コネクタを使用した Kafka Connect クラスターの起動

Debezium コネクタをダウンロードして設定する

Debezium のドキュメントにある最新の手順に従って、コネクタをダウンロードして設定します。

Event Hubs 用に Kafka Connect を構成する

Kafka Connect のスループットを Kafka から Event Hubs にリダイレクトする際に、最小限の再構成が必要となります。 次の connect-distributed.properties サンプルは、Event Hubs 上の Kafka エンドポイントに対して認証と通信を行うように Connect を構成する方法を示しています。

重要

  • Debezium により、テーブルごとのトピックと、一連のメタデータ トピックが自動的に作成されます。 Kafka のトピックは、Event Hubs のインスタンス (イベント ハブ) に対応します。 Apache Kafka と Azure Event Hubs の対応については、「Kafka と Event Hubs の概念のマッピング」をご覧ください。
  • Event Hubs 名前空間内のイベント ハブの数には、レベル (Basic、Standard、Premium、Dedicated) に応じて異なる制限があります。 これらの制限については、「クォータ」をご覧ください。
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} を Event Hubs 名前空間への接続文字列に置き換えます。 接続文字列を取得する手順については、「Event Hubs の接続文字列の取得」を参照してください。 構成の例には、sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX"; などがあります。

Kafka Connect を実行する

この手順では、Event Hubs を使用し、Kafka Connect ワーカーをローカルから分散モードで開始して、クラスターの状態を維持します。

  1. 前出の connect-distributed.properties ファイルをローカルに保存します。 中かっこで囲んだ値はすべて置き換えてください。
  2. お使いのマシン上にある Kafka リリースの場所に移動します。
  3. ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties を実行し、クラスターが起動するまで待ちます。

Note

Kafka Connect は、Kafka AdminClient API を使用して、圧縮などの推奨される構成を含むトピックを自動的に作成します。 Azure portal で名前空間をざっとチェックすると、Connect ワーカーの内部的なトピックが自動的に作成されていることがわかります。

Kafka Connect の内部トピックでは、圧縮を使用する必要があります。 Connect の内部トピックが正しく構成されていない場合、Event Hubs チームでは不適切な構成を修正する責任を負いません。

Debezium PostgreSQL ソース コネクタを構成して起動する

PostgreSQL ソース コネクタの構成ファイル (pg-source-connector.json) を作成し、Azure PostgreSQL インスタンスに従って値を置き換えます。

{
    "name": "todo-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
        "database.port": "5432",
        "database.user": "<replace with database user name>",
        "database.password": "<replace with database password>",
        "database.dbname": "postgres",
        "database.server.name": "my-server",
        "plugin.name": "wal2json",
        "table.whitelist": "public.todos"
    }
}

ヒント

database.server.name 属性は、監視対象の特定の PostgreSQL データベース サーバーおよびクラスターの名前空間を識別および提供する論理名です。

コネクタのインスタンスを作成するには、Kafka Connect の REST API エンドポイントを使用します。

curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors

コネクタの状態を確認するには、次のようにします。

curl -s http://localhost:8083/connectors/todo-connector/status

変更データ キャプチャをテストする

動作中の変更データ キャプチャを確認するには、Azure PostgreSQL データベースのレコードを作成、更新、削除する必要があります。

まず、Azure PostgreSQL データベースに接続します (次の例では psql を使用しています)。

psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

e.g. 

psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require

テーブルを作成し、レコードを挿入する

CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));

INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');

これでコネクタが動作を開始し、my-server.public.todos という名前を持つ Event Hubs トピックに変更データ イベントが送信されます。ここで、my-serverdatabase.server.name の値であり、public.todos は (table.whitelist 構成に従って) 変更を追跡しているテーブルであるとします。

Event Hubs トピックを確認する

すべてが期待どおりに動作していることを確認するために、このトピックの内容を調べてみましょう。 次の例では kafkacat を使用しますが、ここに一覧表示されているオプションのいずれかを使用してコンシューマーを作成することもできます。

次の内容を含む kafkacat.conf という名前のファイルを作成します。

metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>

注意

kafkacat.conf 内の metadata.broker.list および sasl.password 属性を Event Hubs の情報に従って更新してください。

別のターミナルで、コンシューマーを起動します。

export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos

kafkacat -b $BROKER -t $TOPIC -o beginning

todos テーブルに追加した行に応答して、PostgreSQL で生成された変更データ イベントを表す JSON ペイロードが表示されます。 そのペイロードのスニペットを次に示します。

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "description": "setup postgresql on azure",
            "todo_status": "complete"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "fullfillment",
            "ts_ms": 1593018069944,
            "snapshot": "last",
            "db": "postgres",
            "schema": "public",
            "table": "todos",
            "txId": 602,
            "lsn": 184579736,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1593018069947,
        "transaction": null
    }

このイベントは payload とその schema (簡潔にするために省略しています) で構成されています。 payload セクションでは、作成操作 ("op": "c") がどのように表されているかに注意してください。たとえば、"before": null は、それが新しく INSERT された行であったことを示し、after はその行内の列の値を示し、source は、このイベントが選択された元の PostgreSQL インスタンス メタデータを示しています。

更新または削除操作でも同じことを行い、変更データ イベントを調べてみることができます。 たとえば、configure and install connector (id3 とします) のタスクの状態を更新するには、次のようにします。

UPDATE todos SET todo_status = 'complete' WHERE id = 3;

(省略可能) FileStreamSink コネクタをインストールする

todos テーブルのすべての変更が Event Hubs トピックにキャプチャされたので、FileStreamSink コネクタ (Kafka Connect では既定で使用可能) を使用して、これらのイベントを使用します。

コネクタの構成ファイル (file-sink-connector.json) を作成します (ファイル システムに従って file 属性を置き換えます)。

{
    "name": "cdc-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-server.public.todos",
        "file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
    }
}

コネクタを作成し、その状態を確認するには、次のようにします。

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

curl http://localhost:8083/connectors/cdc-file-sink/status

データベース レコードを挿入/更新/削除し、構成された出力シンク ファイルでこれらのレコードを監視します。

tail -f /Users/foo/todos-cdc.txt

クリーンアップ

Kafka Connect では、イベント ハブのトピックを作成して、Kafka Connect クラスターが停止した後も永続的に保持される構成、オフセット、状態を格納します。 この永続化が必要な場合を除いて、これらのトピックを削除することをお勧めします。 また、このチュートリアルで作成された my-server.public.todos イベント ハブを削除することもできます。

次のステップ

Kafka 用 Event Hubs の詳細については、次の記事を参照してください。