分享方式:


PostgreSQL 資料表上使用 Apache Flink® 進行的異動資料擷取 (CDC)

注意

AKS 上的 Azure HDInsight 將於 2025 年 1 月 31 日退場。 請於 2025 年 1 月 31 日之前,將工作負載移轉至 Microsoft Fabric 或對等的 Azure 產品,以免工作負載突然終止。 訂用帳戶中剩餘的叢集將會停止,並會從主機移除。

在淘汰日期之前,只有基本支援可用。

重要

此功能目前為預覽功能。 Microsoft Azure 預覽版增補使用規定包含適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的更多法律條款。 若需此特定預覽版的相關資訊,請參閱 Azure HDInsight on AKS 預覽版資訊。 如有問題或功能建議,請在 AskHDInsight 上提交要求並附上詳細資料,並且在 Azure HDInsight 社群上追蹤我們以獲得更多更新資訊。

異動資料擷取 (CDC) 是一項技術,您可用來追蹤資料庫資料表的資料列層級變更,以便對建立、更新和刪除等作業做出回應。 我們會在本文中使用 適用於 Apache Flink® 的 CDC 連接器,它會提供一組適用於 Apache Flink 的來源連接器。 連接器會將 Debezium® 整合為引擎,以擷取資料變更。

Flink 支援將 Debezium JSON 和 Avro 訊息以 INSERT/UPDATE/DELETE 訊息形式解譯至 Apache Flink SQL 系統。

在許多情況下,這項支援很有用:

  • 將累加資料從資料庫同步處理至其他系統
  • 稽核記錄
  • 在資料庫上建置即時具體化檢視
  • 檢視時態性聯結變更資料庫資料表的歷程記錄

現在,讓我們了解如何使用 Flink-SQL CDC 監視 PostgreSQL 資料表上的變更。 PostgreSQL CDC 連接器可讓您從 PostgreSQL 資料庫讀取快照集資料和累加資料。

必要條件

準備 PostgreSQL 資料表和用戶端

  • 使用 Linux 虛擬機器,使用下列命令安裝 PostgreSQL 用戶端

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • 安裝憑證以使用 SSL 連線到 PostgreSQL 伺服器

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • 連線到伺服器 (據以取代主機、使用者名稱與資料庫名稱)

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • 成功連線到資料庫之後,請建立範例資料表

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • 若要在 PostgreSQL 資料庫上啟用 CDC,您必須進行下列變更。

    • WAL 層級必須變更為邏輯。 您可以在 Azure 入口網站的 [伺服器參數] 區段中變更此值。

      顯示如何 enable-cdc-on-postgres-database 的螢幕擷取畫面。

    • 存取資料表的使用者必須新增「複寫」角色

      改變使用者 <username> 具有複寫;

  • 若要建立 Flink PostgreSQL CDC 資料表,請下載所有相依 jar。 使用 pom.xml 檔案的下列內容。

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • 使用 maven 命令下載所有相依 jar

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    注意

  • 下載相依 jar 之後,請啟動 Flink SQL 用戶端,並將這些 jar 匯入工作階段。 完成命令,如下所示,

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    這些命令會啟動具有相依性的 SQL 用戶端,

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • 使用 CDC 連接器建立 Flink PostgreSQL CDC 資料表

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

驗證

  • 執行 'select *' 命令以監視變更。

    select * from shipments;

    顯示如何 run-select-command 的螢幕擷取畫面。

參考