Поделиться через


Изменение записи данных (CDC) таблицы PostgreSQL с помощью Apache Flink®

Примечание.

Мы отставим Azure HDInsight в AKS 31 января 2025 г. До 31 января 2025 г. необходимо перенести рабочие нагрузки в Microsoft Fabric или эквивалентный продукт Azure, чтобы избежать резкого прекращения рабочих нагрузок. Оставшиеся кластеры в подписке будут остановлены и удалены из узла.

До даты выхода на пенсию будет доступна только базовая поддержка.

Внимание

Эта функция в настоящее время доступна для предварительного ознакомления. Дополнительные условия использования для предварительных версий Microsoft Azure включают более юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в статье Azure HDInsight в предварительной версии AKS. Для вопросов или предложений функций отправьте запрос на AskHDInsight с подробными сведениями и следуйте за нами для получения дополнительных обновлений в сообществе Azure HDInsight.

Запись измененных данных (CDC) — это способ отслеживания изменений на уровне строк в таблицах баз данных в ответ на создание, обновление и удаление операций. В этой статье мы используем соединители CDC для Apache Flink, которые предлагают набор исходных соединителей для Apache Flink®. Соединители интегрируют Debezium® в качестве обработчика для записи изменений данных.

Flink поддерживает интерпретацию сообщений JSON Debezium и Avro как INSERT/UPDATE/DELETE в систему Apache Flink SQL.

Эта поддержка полезна во многих случаях:

  • Синхронизация добавочных данных из баз данных в другие системы
  • Журналы аудита
  • Создание материализованных представлений в режиме реального времени на базах данных
  • Просмотр журнала изменения темпорального соединения таблицы базы данных

Теперь давайте узнаем, как отслеживать изменения в таблице PostgreSQL с помощью Flink-SQL CDC. Соединитель PostgreSQL CDC позволяет считывать данные моментального снимка и добавочные данные из базы данных PostgreSQL.

Необходимые компоненты

Подготовка таблицы PostgreSQL и клиента

  • С помощью виртуальной машины Linux установите клиент PostgreSQL с помощью приведенных ниже команд.

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • Установка сертификата для подключения к серверу PostgreSQL с помощью SSL

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • Подключитесь к серверу (замените имя узла, имя пользователя и базы данных соответствующим образом)

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • После успешного подключения к базе данных создайте пример таблицы

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • Чтобы включить CDC в базе данных PostgreSQL, необходимо внести следующие изменения.

    • Уровень WAL должен быть изменен на логический. Это значение можно изменить в разделе параметров сервера портал Azure.

      Снимок экрана: включение cdc-on-postgres-database.

    • Для доступа пользователей к таблице должна быть добавлена роль REPLICATION

      ALTER USER <username> WITH REPLICATION;

  • Чтобы создать таблицу CDC Flink PostgreSQL, скачайте все зависимые JAR-файлы. pom.xml Используйте файл со следующим содержимым.

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • С помощью команды maven для скачивания всех зависимых jars

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    Примечание.

    • Если веб-модуль ssh не содержит maven, следуйте ссылкам, чтобы скачать и установить его.
    • Чтобы скачать jar-файл jsr, выполните следующую команду.
      • wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
  • После скачивания зависимых jar-файлов запустите клиент Flink SQL с импортом этих jar-файлов в сеанс. Выполните следующую команду:

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    Эти команды запускают клиент SQL с зависимостями как:

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • Создание таблицы CDC Flink PostgreSQL с помощью соединителя CDC

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

Проверка

  • Выполните команду select *, чтобы отслеживать изменения.

    select * from shipments;

    Снимок экрана: выполнение команды select-select.

Справочные материалы