共用方式為


整合 Debezium 與 Azure 事件中樞上的 Apache Kafka Connect 支援功能以供異動資料擷取

異動資料擷取 (CDC) 技術是用來追蹤資料庫資料表的資料列層級變更,以便對建立、更新和刪除等作業做出回應。 Debezium 是一種分散式平台,以不同資料庫所提供的異動資料擷取功能為基礎 (例如 PostgreSQL 中的邏輯解碼)。 此平台會提供一組 Kafka Connect 連接器,可運用資料庫資料表中的資料列層級變更,並將變更轉換為事件串流再送往 Apache Kafka

本教學課程會逐步說明如何使用事件中樞 (適用於 Kafka)、適用於 PostgreSQL 的 Azure 資料庫及 Debezium,在 Azure 上建立異動資料擷取式的系統。 會使用到 Debezium PostgreSQL 連接器從 PostgreSQL 將串流資料庫修改項目傳送到事件中樞內的 Kafka 主題。

注意

本文參考了 Microsoft 不再使用的詞彙。 從軟體中移除該字詞時,我們也會將其從本文中移除。

在本教學課程中,您會執行下列步驟:

  • 建立事件中樞命名空間
  • 建立和設定適用於 PostgreSQL 的 Azure 資料庫
  • 使用 Debezium PostgreSQL 連接器設定和執行 Kafka Connect
  • 測試異動資料擷取
  • (選擇性) 以 FileStreamSink 連接器取用異動資料事件

必要條件

本次逐步教學需準備以下項目:

建立事件中樞命名空間

您需要事件中樞命名空間,才能從任何事件中樞服務傳送和接收。 請參閱建立事件中樞,以取得建立命名空間和事件中樞的指示。 請取得事件中樞連接字串和完整網域名稱 (FQDN) 以供稍後使用。 如需相關指示,請參閱取得事件中樞連接字串

建立和設定適用於 PostgreSQL 的 Azure 資料庫

適用於 PostgreSQL 的 Azure 資料庫是關聯式資料庫服務,以社群版的開放原始碼 PostgreSQL 資料庫引擎為基礎,提供三種部署選擇:「單一伺服器」、「彈性伺服器」和 Cosmos DB for PostgreSQL。 遵循這些指示來使用 Azure 入口網站建立適用於 PostgreSQL 的 Azure 資料庫。

設定和執行 Kafka Connect

本章節涵蓋下列主題:

  • Debezium 連接器安裝
  • 設定事件中樞的 Kafka Connect
  • 使用 Debezium 連接器啟動 Kafka Connect 叢集

下載並設定 Debezium 連接器

請按照 Debezium 文件中的最新說明,下載並設定該連接器。

設定事件中樞的 Kafka Connect

將 Kafka Connect 輸送量從 Kafka 重新導向至事件中樞時,需要稍微重新設定。 下列 connect-distributed.properties 範例說明如何設定 Connect,以向事件中樞上的 Kafka 端點進行驗證與通訊:

重要

  • Debezium 會自動建立每個資料表的主題和多個中繼資料主題。 Kafka 主題與事件中樞的執行個體 (事件中樞) 相對應。 關於 Apache Kafka 和 Azure 事件中樞的對應,請參閱 Kafka 和事件中樞的概念對應
  • 事件中樞命名空間內事件中樞的數量依照服務層級 (基本、標準、進階或專用) 有不同的限制。 關於這類限制,請參閱配額
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} 取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

執行 Kafka Connect

在此步驟中,Kafka Connect 背景工作角色會以分散模式在本機啟動,使用事件中樞來維持叢集的狀態。

  1. 將上述 connect-distributed.properties 檔案儲存在本機。 請務必取代括號中的所有值。
  2. 瀏覽至機器上的 Kafka 版本位置。
  3. 執行 ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties 並等候叢集啟動。

注意

Kafka Connect 會使用 Kafka AdminClient API,以使用建議的設定 (包括壓縮) 自動建立主題。 在 Azure 入口網站中快速檢查命名空間,會顯露出 Connect 背景工作角色的內部主題已自動建立。

Kafka Connect 內部主題必須使用壓縮。 若未正確設定 [內部連線] 主題,事件中樞小組將不會負責修正不當的設定。

設定並啟動 Debezium PostgreSQL 來源連接器

請建立 PostgreSQL 來源連接器的設定檔 (pg-source-connector.json),並根據您的 Azure PostgreSQL 執行個體替換這些值。

{
    "name": "todo-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
        "database.port": "5432",
        "database.user": "<replace with database user name>",
        "database.password": "<replace with database password>",
        "database.dbname": "postgres",
        "database.server.name": "my-server",
        "plugin.name": "wal2json",
        "table.whitelist": "public.todos"
    }
}

提示

database.server.name 屬性是一種邏輯名稱,可識別並提供所監視之特定 PostgreSQL 資料庫伺服器/叢集的命名空間。

若要建立連接器的執行個體,請使用 Kafka Connect REST API 端點:

curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors

檢查連接器的狀態:

curl -s http://localhost:8083/connectors/todo-connector/status

測試異動資料擷取

若要查看進行中的異動資料擷取,需建立/更新/刪除 Azure PostgreSQL 資料庫中的記錄。

首先請連線到您的 Azure PostgreSQL 資料庫 (以下範例使用 psql)。

psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

e.g. 

psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require

建立資料表並插入記錄

CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));

INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');

連接器應當隨即會開始運作,並將異動資料事件傳送到事件中樞主題,並使用以下名稱 my-server.public.todos (假設您以 my-server 作為 database.server.name 的值,且 public.todos 是依照 table.whitelist 設定正在追蹤其變更的資料表)。

查看事件中樞主題

接下來要檢查主題的內容,確定一切如預期運作。 以下範例使用 kafkacat,但您也可以使用此處所列的任何一種選項來建立取用者

建立名為 kafkacat.conf 的檔案,包含以下內容:

metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>

注意

依據事件中樞的資訊,更新 kafkacat.conf 中的 metadata.broker.listsasl.password 屬性。

在不同的終端中啟動取用者:

export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos

kafkacat -b $BROKER -t $TOPIC -o beginning

您應該會看到 JSON 酬載,呈現出 PostgreSQL 為了回應您在 todos 資料表中新增的資料列而產生的異動資料事件。 酬載的程式碼片段如下:

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "description": "setup postgresql on azure",
            "todo_status": "complete"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "fulfillment",
            "ts_ms": 1593018069944,
            "snapshot": "last",
            "db": "postgres",
            "schema": "public",
            "table": "todos",
            "txId": 602,
            "lsn": 184579736,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1593018069947,
        "transaction": null
    }

該事件是以 payload 和其 schema (省略以求簡潔) 所組成。 在 payload 區段中,請注意建立作業 ("op": "c") 的呈現方式:"before": null代表這是剛進行 INSERT 作業的資料列,after 提供該資料列中資料行的值,source 提供 PostgreSQL 執行個體的中繼資料 (此事件即後續事件便是從這裡取得)。

更新或刪除作業也可以依樣進行,對異動資料事件的檢查也是如此。 舉例來說,更新 configure and install connector 工作狀態的方式如下 (假設其 id3):

UPDATE todos SET todo_status = 'complete' WHERE id = 3;

(選用) 安裝 FileStreamSink 連接器

現在所有 todos 資料表的變更都會擷取到事件中樞主題中,您可使用 FileStreamSink 連接器 (Kafka Connect 預設提供) 來取用這些事件。

請建立該連接器的設定檔 (file-sink-connector.json),並依據您的檔案系統取代 file 屬性。

{
    "name": "cdc-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-server.public.todos",
        "file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
    }
}

若要建立連接器並檢查其狀態:

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

curl http://localhost:8083/connectors/cdc-file-sink/status

插入/更新/刪除資料庫記錄,並監視所設輸出接收器檔案中的記錄:

tail -f /Users/foo/todos-cdc.txt

清理

Kafka Connect 會建立事件中樞主題,以儲存即使在 Kafka Connect 叢集關閉後仍會保存下來的設定、位移及狀態。 除非需要此持續性,否則建議您刪除這些主題。 也建議刪除此逐步教學期間建立的 my-server.public.todos 事件中樞。

下一步

若要深入了解適用於 Kafka 的事件中樞,請參閱下列文章: