使用 Apache Kafka 將資料從 PostgreSQL 移轉至 Azure Cosmos DB for Apache Cassandra 帳戶

適用於: Cassandra

因為各種原因,Azure Cosmos DB 中的 API for Cassandra 現已成為在 Apache Cassandra 上執行的企業工作負載的絕佳選擇,例如:

  • 可大幅節省成本:您可以使用 Azure Cosmos DB 來節省成本,其包括 VM、頻寬和任何適用 Oracle 授權的成本。 此外,您也不需要管理資料中心、伺服器、SSD 儲存體、網路和電力的成本。

  • 更好的可擴縮性和可用性:其可消除單一失敗點,讓應用程式有更好的可擴縮性和可用性。

  • 沒有管理和監視方面的額外負荷:作為完全受控的雲端服務,Azure Cosmos DB 去除了必須管理和監視龐大設定的額外負荷。

Kafka Connect 這個平台能透過可調整且可靠的方式,在 Apache Kafka 與其他系統之間串流資料。 其支援數個現成的連接器,這表示您不需要自訂程式碼,就能整合外部系統與 Apache Kafka。

本文示範如何使用 Kafka 連接器組合來設定資料管線,以持續將記錄從關聯式資料庫 (例如 PostgreSQL) 同步至 Azure Cosmos DB for Apache Cassandra

概觀

下面會概述本文所呈現的端對端流程。

PostgreSQL 資料表中的資料會使用 Debezium PostgreSQL 連接器 (這是 Kafka Connect 的來源連接器) 推送至 Apache Kafka。 針對在 PostgreSQL 資料表中插入、更新或刪除記錄的行為,系統均會將其擷取為 change data 事件,並傳送至 Kafka 主題。 DataStax Apache Kafka 連接器 (Kafka Connect 的接收連接器) 會形成管線的第二個部分。 其會將變更資料事件從 Kafka 主題同步至 Azure Cosmos DB for Apache Cassandra 資料表。

注意

使用 DataStax Apache Kafka 連接器的特定功能可讓我們將資料推送至多個資料表。 在此範例中,該連接器會協助我們將異動資料記錄保存到兩個可支援不同查詢需求的 Cassandra 資料表。

必要條件

基礎設定

設定 PostgreSQL 資料庫 (如果尚未設定)。

這可以是現有的內部部署資料庫,或者您也可以在本機電腦上下載並安裝一個資料庫。 您也可以使用 Docker 容器

注意

下列範例會從 Docker Hub 提取公用容器映像。 建議您先驗證 Docker Hub 帳戶 (docker login),而不是發出匿名提取要求。 若要改善使用公用內容時的可靠性,請在私人 Azure Container Registry 中匯入和管理映像。 深入了解公用映像的使用方式

若要啟動容器:

docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=<enter password> postgres

使用 psql 用戶端連線到 PostgreSQL 執行個體:

psql -h localhost -p 5432 -U postgres -W -d postgres

建立資料表來儲存訂單範例的資訊:

CREATE SCHEMA retail;

CREATE TABLE retail.orders_info (
	orderid SERIAL NOT NULL PRIMARY KEY,
	custid INTEGER NOT NULL,
	amount INTEGER NOT NULL,
	city VARCHAR(255) NOT NULL,
	purchase_time VARCHAR(40) NOT NULL
);

使用 Azure 入口網站建立示範應用程式所需的 Cassandra Keyspace 和資料表。

注意

使用如下的相同 Keyspace 和資料表名稱

CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

設定 Apache Kafka

本文使用本機叢集,但您也可以選擇任何其他選項。 下載 Kafka、將其解壓縮,然後啟動 Zookeeper 和 Kafka 叢集。

cd <KAFKA_HOME>/bin

#start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

#start kafka (in another terminal)
bin/kafka-server-start.sh config/server.properties

設定連接器

安裝 Debezium PostgreSQL 和 DataStax Apache Kafka 連接器。 下載 Debezium PostgreSQL 連接器外掛程式封存檔。 例如,若要下載 1.3.0 版的連接器 (本文撰寫當下的最新版本),請使用此連結。 從這個連結下載 DataStax Apache Kafka 連接器。

將兩個連接器封存檔解壓縮,並將 JAR 檔案複製到 Kafka Connect plugin.path

cp <path_to_debezium_connector>/*.jar <KAFKA_HOME>/libs
cp <path_to_cassandra_connector>/*.jar <KAFKA_HOME>/libs

如需詳細資訊,請參閱 DebeziumDataStax 文件。

設定 Kafka Connect 並啟動資料管線

啟動 Kafka Connect 叢集

cd <KAFKA_HOME>/bin
./connect-distributed.sh ../config/connect-distributed.properties

啟動 PostgreSQL 連接器執行個體

將連接器設定 (JSON) 儲存至檔案範例 pg-source-config.json

{
    "name": "pg-orders-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname": "postgres",
        "database.server.name": "myserver",
        "plugin.name": "wal2json",
        "table.include.list": "retail.orders_info",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

若要啟動 PostgreSQL 連接器執行個體:

curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors

注意

若要刪除,您可以使用:curl -X DELETE http://localhost:8083/connectors/pg-orders-source

插入資料

orders_info 資料表包含訂單識別碼、客戶識別碼、城市這類訂單詳細資料。請使用下列 SQL 在資料表中填入隨機資料。

insert into retail.orders_info (
    custid, amount, city, purchase_time
)
select
    random() * 10000 + 1,
    random() * 200,
    ('{New Delhi,Seattle,New York,Austin,Chicago,Cleveland}'::text[])[ceil(random()*3)],
    NOW() + (random() * (interval '1 min'))
from generate_series(1, 10) s(i);

其應該會在資料表中插入 10 筆記錄。 請務必根據您的需求範例來更新下面的 generate_series(1, 10) 中的記錄數目,若要插入 100 記錄,則請使用 generate_series(1, 100)

若要確認:

select * from retail.orders_info;

檢查 Kafka 主題中的異動資料擷取事件

cd <KAFKA_HOME>/bin

./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server localhost:9092 --from-beginning

您應該會看到 JSON 格式的異動資料事件。

啟動 DataStax Apache Kafka 連接器執行個體

將連接器設定 (JSON) 儲存至檔案範例 cassandra-sink-config.json,並按照您的環境來更新屬性。

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "myserver.retail.orders_info",
        "contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
        "loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
        "ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
        "port": 10350,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "auth.username": "<Azure Cosmos DB user name (same as account name)>",
        "auth.password": "<Azure Cosmos DB password>",
        "topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "offset.flush.interval.ms": 10000
    }
}

若要啟動連接器執行個體:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

連接器應當會隨即開始運作,而從 PostgreSQL 到 Azure Cosmos DB 的端對端管線會正常運作。

查詢 Azure Cosmos DB

檢查 Azure Cosmos DB 中的 Cassandra 資料表。 以下是一些您可以嘗試的查詢:

select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;

select * from retail.orders_by_customer;
select * from retail.orders_by_city;

select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;

您可以繼續在 PostgreSQL 中插入更多資料,並確認記錄會同步至 Azure Cosmos DB。

下一步