使用 Apache Flink® 的 PostgreSQL 數據表異動數據擷取 (CDC)
重要
此功能目前為預覽功能。 適用於 Microsoft Azure 預覽版的補充使用規定包含適用於 Beta 版、預覽版或尚未發行至正式運作之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊的 Azure HDInsight。 如需問題或功能建議,請在 AskHDInsight 上提交要求,並提供詳細數據,並遵循我們在 Azure HDInsight 社群上取得更多更新。
異動數據擷取 (CDC) 是一種技術,可用來追蹤資料庫數據表中的數據列層級變更,以回應建立、更新和刪除作業。 在本文中,我們會使用適用於 Apache Flink 的 CDC 連線 ors,此連結提供一組 Apache Flink® 的來源連接器。 連接器會將 Debezium® 整合為引擎,以擷取數據變更。
Flink 支援將 Debezium JSON 和 Avro 訊息解譯為 INSERT/UPDATE/DELETE 訊息至 Apache Flink SQL 系統。
在許多情況下,這項支援很有用:
- 將累加數據從資料庫同步處理至其他系統
- 稽核記錄
- 在資料庫上建置即時具體化檢視
- 檢視時態聯結變更資料庫數據表的歷程記錄
現在,讓我們瞭解如何使用 Flink-SQL CDC 監視 PostgreSQL 數據表上的變更。 PostgreSQL CDC 連接器可讓您從 PostgreSQL 資料庫讀取快照集數據和增量數據。
必要條件
- Azure PostgresSQL 彈性伺服器 14.7 版
- AKS 上的 HDInsight 上的 Apache Flink 叢集
- Linux 虛擬機以使用PostgreSQL用戶端
- 新增 NSG 規則,以允許 AKS 集區子網上的 HDInsight 連接埠 5432 上的輸入和輸出連線。
準備 PostgreSQL 數據表和用戶端
使用 Linux 虛擬機,使用下列命令安裝 PostgreSQL 用戶端
sudo apt-get update sudo apt-get install postgresql-client
安裝憑證以使用 SSL 連線到 PostgreSQL 伺服器
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
連線 伺服器 (據以取代主機、使用者名稱與資料庫名稱 )
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
成功連線到資料庫之後,請建立範例數據表
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
若要在 PostgreSQL 資料庫上啟用 CDC,您必須進行下列變更。
建立 Apache Flink PostgreSQL CDC 數據表
若要建立 Flink PostgreSQL CDC 數據表,請下載所有相依 jar。 使用
pom.xml
檔案搭配下列內容。<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
使用 maven 命令下載所有相依 jar
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
注意
- 如果您的 Web ssh Pod 不包含 maven,請遵循連結來下載並安裝。
- 若要下載 jsr jar 檔案,請使用下列命令
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
下載相依 jar 之後, 請啟動 Flink SQL 用戶端,並將這些 jar 匯入工作階段。 完成命令,如下所示:
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
這些命令會啟動具有相依性的 sql 用戶端,
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
使用 CDC 連接器建立 Flink PostgreSQL CDC 數據表
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
驗證
參考
- Apache Flink 網站
- PostgreSQL CDC 連線 or 已獲得 Apache 2.0 授權
- Apache、Apache Flink、Flink 和相關聯的開放原始碼專案名稱為 Apache Software Foundation (ASF) 的 商標。
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應