PostgreSQL 資料表上使用 Apache Flink® 進行的異動資料擷取 (CDC)
注意
AKS 上的 Azure HDInsight 將於 2025 年 1 月 31 日退場。 請於 2025 年 1 月 31 日之前,將工作負載移轉至 Microsoft Fabric 或對等的 Azure 產品,以免工作負載突然終止。 訂用帳戶中剩餘的叢集將會停止,並會從主機移除。
在淘汰日期之前,只有基本支援可用。
重要
此功能目前為預覽功能。 Microsoft Azure 預覽版增補使用規定包含適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的更多法律條款。 若需此特定預覽版的相關資訊,請參閱 Azure HDInsight on AKS 預覽版資訊。 如有問題或功能建議,請在 AskHDInsight 上提交要求並附上詳細資料,並且在 Azure HDInsight 社群上追蹤我們以獲得更多更新資訊。
異動資料擷取 (CDC) 是一項技術,您可用來追蹤資料庫資料表的資料列層級變更,以便對建立、更新和刪除等作業做出回應。 我們會在本文中使用 適用於 Apache Flink® 的 CDC 連接器,它會提供一組適用於 Apache Flink 的來源連接器。 連接器會將 Debezium® 整合為引擎,以擷取資料變更。
Flink 支援將 Debezium JSON 和 Avro 訊息以 INSERT/UPDATE/DELETE 訊息形式解譯至 Apache Flink SQL 系統。
在許多情況下,這項支援很有用:
- 將累加資料從資料庫同步處理至其他系統
- 稽核記錄
- 在資料庫上建置即時具體化檢視
- 檢視時態性聯結變更資料庫資料表的歷程記錄
現在,讓我們了解如何使用 Flink-SQL CDC 監視 PostgreSQL 資料表上的變更。 PostgreSQL CDC 連接器可讓您從 PostgreSQL 資料庫讀取快照集資料和累加資料。
必要條件
- Azure PostgresSQL 彈性伺服器 14.7 版
- AKS 上的 HDInsight 上的 Apache Flink 叢集
- Linux 虛擬機器以使用 PostgreSQL 用戶端
- 新增 NSG 規則,允許 AKS 集區子網路上 HDInsight 連接埠 5432 上的輸入和輸出連線。
準備 PostgreSQL 資料表和用戶端
使用 Linux 虛擬機器,使用下列命令安裝 PostgreSQL 用戶端
sudo apt-get update sudo apt-get install postgresql-client
安裝憑證以使用 SSL 連線到 PostgreSQL 伺服器
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
連線到伺服器 (據以取代主機、使用者名稱與資料庫名稱)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
成功連線到資料庫之後,請建立範例資料表
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
若要在 PostgreSQL 資料庫上啟用 CDC,您必須進行下列變更。
建立 Apache Flink PostgreSQL CDC 資料表
若要建立 Flink PostgreSQL CDC 資料表,請下載所有相依 jar。 使用
pom.xml
檔案的下列內容。<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
使用 maven 命令下載所有相依 jar
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
注意
- 如果您的 Web SSH Pod 不包含 maven,請遵循連結來下載並安裝。
- 若要下載 jsr jar 檔案,請使用下列命令
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
下載相依 jar 之後,請啟動 Flink SQL 用戶端,並將這些 jar 匯入工作階段。 完成命令,如下所示,
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
這些命令會啟動具有相依性的 SQL 用戶端,
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
使用 CDC 連接器建立 Flink PostgreSQL CDC 資料表
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
驗證
參考
- Apache Flink 網站
- PostgreSQL CDC 連接器是在 Apache 2.0 授權之下獲得授權
- Apache、Apache Flink、Flink 和相關聯的開放原始碼專案名稱為 Apache Software Foundation (ASF) 的 商標。