異動資料擷取 (CDC) 技術是用來追蹤資料庫資料表的資料列層級變更,以便對建立、更新和刪除等作業做出回應。 Debezium 是一種分散式平台,以不同資料庫所提供的異動資料擷取功能為基礎 (例如 PostgreSQL 中的邏輯解碼)。 此平台會提供一組 Kafka Connect 連接器,可運用資料庫資料表中的資料列層級變更,並將變更轉換為事件串流再送往 Apache Kafka。
本教學課程會逐步說明如何使用事件中樞 (適用於 Kafka)、適用於 PostgreSQL 的 Azure 資料庫及 Debezium,在 Azure 上建立異動資料擷取式的系統。 會使用到 Debezium PostgreSQL 連接器從 PostgreSQL 將串流資料庫修改項目傳送到事件中樞內的 Kafka 主題。
注意
本文參考了 Microsoft 不再使用的詞彙。 從軟體中移除該字詞時,我們也會將其從本文中移除。
在本教學課程中,您會執行下列步驟:
- 建立事件中樞命名空間
- 建立和設定適用於 PostgreSQL 的 Azure 資料庫
- 使用 Debezium PostgreSQL 連接器設定和執行 Kafka Connect
- 測試異動資料擷取
- (選擇性) 以
FileStreamSink
連接器取用異動資料事件
必要條件
本次逐步教學需準備以下項目:
- Azure 訂閱。 如果您沒有訂用帳戶,請建立免費帳戶。
- Linux/macOS
- Kafka 版本 (1.1.1 版,Scala 2.11 版),取得來源為 kafka.apache.org
- 請參閱適用於 Apache Kafka 的事件中樞簡介文章
建立事件中樞命名空間
您需要事件中樞命名空間,才能從任何事件中樞服務傳送和接收。 請參閱建立事件中樞,以取得建立命名空間和事件中樞的指示。 請取得事件中樞連接字串和完整網域名稱 (FQDN) 以供稍後使用。 如需相關指示,請參閱取得事件中樞連接字串。
建立和設定適用於 PostgreSQL 的 Azure 資料庫
適用於 PostgreSQL 的 Azure 資料庫是關聯式資料庫服務,以社群版的開放原始碼 PostgreSQL 資料庫引擎為基礎,提供三種部署選擇:「單一伺服器」、「彈性伺服器」和 Cosmos DB for PostgreSQL。 遵循這些指示來使用 Azure 入口網站建立適用於 PostgreSQL 的 Azure 資料庫。
設定和執行 Kafka Connect
本章節涵蓋下列主題:
- Debezium 連接器安裝
- 設定事件中樞的 Kafka Connect
- 使用 Debezium 連接器啟動 Kafka Connect 叢集
下載並設定 Debezium 連接器
請按照 Debezium 文件中的最新說明,下載並設定該連接器。
- 下載連接器的外掛程式封存檔。 舉例來說,若要下載版本
1.2.0
的連接器,請使用此連結 - https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.2.0.Final/debezium-connector-postgres-1.2.0.Final-plugin.tar.gz - 將 JAR 檔案解壓縮,並複製到 Kafka Connect plugin.path。
設定事件中樞的 Kafka Connect
將 Kafka Connect 輸送量從 Kafka 重新導向至事件中樞時,需要稍微重新設定。 下列 connect-distributed.properties
範例說明如何設定 Connect,以向事件中樞上的 Kafka 端點進行驗證與通訊:
重要
- Debezium 會自動建立每個資料表的主題和多個中繼資料主題。 Kafka 主題與事件中樞的執行個體 (事件中樞) 相對應。 關於 Apache Kafka 和 Azure 事件中樞的對應,請參閱 Kafka 和事件中樞的概念對應。
- 事件中樞命名空間內事件中樞的數量依照服務層級 (基本、標準、進階或專用) 有不同的限制。 關於這類限制,請參閱配額。
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
重要
將 {YOUR.EVENTHUBS.CONNECTION.STRING}
取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
執行 Kafka Connect
在此步驟中,Kafka Connect 背景工作角色會以分散模式在本機啟動,使用事件中樞來維持叢集的狀態。
- 將上述
connect-distributed.properties
檔案儲存在本機。 請務必取代括號中的所有值。 - 瀏覽至機器上的 Kafka 版本位置。
- 執行
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
並等候叢集啟動。
注意
Kafka Connect 會使用 Kafka AdminClient API,以使用建議的設定 (包括壓縮) 自動建立主題。 在 Azure 入口網站中快速檢查命名空間,會顯露出 Connect 背景工作角色的內部主題已自動建立。
Kafka Connect 內部主題必須使用壓縮。 若未正確設定 [內部連線] 主題,事件中樞小組將不會負責修正不當的設定。
設定並啟動 Debezium PostgreSQL 來源連接器
請建立 PostgreSQL 來源連接器的設定檔 (pg-source-connector.json
),並根據您的 Azure PostgreSQL 執行個體替換這些值。
{
"name": "todo-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
"database.port": "5432",
"database.user": "<replace with database user name>",
"database.password": "<replace with database password>",
"database.dbname": "postgres",
"database.server.name": "my-server",
"plugin.name": "wal2json",
"table.whitelist": "public.todos"
}
}
提示
database.server.name
屬性是一種邏輯名稱,可識別並提供所監視之特定 PostgreSQL 資料庫伺服器/叢集的命名空間。
若要建立連接器的執行個體,請使用 Kafka Connect REST API 端點:
curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors
檢查連接器的狀態:
curl -s http://localhost:8083/connectors/todo-connector/status
測試異動資料擷取
若要查看進行中的異動資料擷取,需建立/更新/刪除 Azure PostgreSQL 資料庫中的記錄。
首先請連線到您的 Azure PostgreSQL 資料庫 (以下範例使用 psql)。
psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require
e.g.
psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require
建立資料表並插入記錄
CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));
INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');
連接器應當隨即會開始運作,並將異動資料事件傳送到事件中樞主題,並使用以下名稱 my-server.public.todos
(假設您以 my-server
作為 database.server.name
的值,且 public.todos
是依照 table.whitelist
設定正在追蹤其變更的資料表)。
查看事件中樞主題
接下來要檢查主題的內容,確定一切如預期運作。 以下範例使用 kafkacat
,但您也可以使用此處所列的任何一種選項來建立取用者。
建立名為 kafkacat.conf
的檔案,包含以下內容:
metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>
注意
依據事件中樞的資訊,更新 kafkacat.conf
中的 metadata.broker.list
和 sasl.password
屬性。
在不同的終端中啟動取用者:
export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos
kafkacat -b $BROKER -t $TOPIC -o beginning
您應該會看到 JSON 酬載,呈現出 PostgreSQL 為了回應您在 todos
資料表中新增的資料列而產生的異動資料事件。 酬載的程式碼片段如下:
{
"schema": {...},
"payload": {
"before": null,
"after": {
"id": 1,
"description": "setup postgresql on azure",
"todo_status": "complete"
},
"source": {
"version": "1.2.0.Final",
"connector": "postgresql",
"name": "fulfillment",
"ts_ms": 1593018069944,
"snapshot": "last",
"db": "postgres",
"schema": "public",
"table": "todos",
"txId": 602,
"lsn": 184579736,
"xmin": null
},
"op": "c",
"ts_ms": 1593018069947,
"transaction": null
}
該事件是以 payload
和其 schema
(省略以求簡潔) 所組成。 在 payload
區段中,請注意建立作業 ("op": "c"
) 的呈現方式:"before": null
代表這是剛進行 INSERT
作業的資料列,after
提供該資料列中資料行的值,source
提供 PostgreSQL 執行個體的中繼資料 (此事件即後續事件便是從這裡取得)。
更新或刪除作業也可以依樣進行,對異動資料事件的檢查也是如此。 舉例來說,更新 configure and install connector
工作狀態的方式如下 (假設其 id
為 3
):
UPDATE todos SET todo_status = 'complete' WHERE id = 3;
(選用) 安裝 FileStreamSink 連接器
現在所有 todos
資料表的變更都會擷取到事件中樞主題中,您可使用 FileStreamSink 連接器 (Kafka Connect 預設提供) 來取用這些事件。
請建立該連接器的設定檔 (file-sink-connector.json
),並依據您的檔案系統取代 file
屬性。
{
"name": "cdc-file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "my-server.public.todos",
"file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
}
}
若要建立連接器並檢查其狀態:
curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors
curl http://localhost:8083/connectors/cdc-file-sink/status
插入/更新/刪除資料庫記錄,並監視所設輸出接收器檔案中的記錄:
tail -f /Users/foo/todos-cdc.txt
清理
Kafka Connect 會建立事件中樞主題,以儲存即使在 Kafka Connect 叢集關閉後仍會保存下來的設定、位移及狀態。 除非需要此持續性,否則建議您刪除這些主題。 也建議刪除此逐步教學期間建立的 my-server.public.todos
事件中樞。
下一步
若要深入了解適用於 Kafka 的事件中樞,請參閱下列文章: