Изменение записи данных (CDC) таблицы PostgreSQL с помощью Apache Flink®
Примечание.
Мы отставим Azure HDInsight в AKS 31 января 2025 г. До 31 января 2025 г. необходимо перенести рабочие нагрузки в Microsoft Fabric или эквивалентный продукт Azure, чтобы избежать резкого прекращения рабочих нагрузок. Оставшиеся кластеры в подписке будут остановлены и удалены из узла.
До даты выхода на пенсию будет доступна только базовая поддержка.
Внимание
Эта функция в настоящее время доступна для предварительного ознакомления. Дополнительные условия использования для предварительных версий Microsoft Azure включают более юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в статье Azure HDInsight в предварительной версии AKS. Для вопросов или предложений функций отправьте запрос на AskHDInsight с подробными сведениями и следуйте за нами для получения дополнительных обновлений в сообществе Azure HDInsight.
Запись измененных данных (CDC) — это способ отслеживания изменений на уровне строк в таблицах баз данных в ответ на создание, обновление и удаление операций. В этой статье мы используем соединители CDC для Apache Flink, которые предлагают набор исходных соединителей для Apache Flink®. Соединители интегрируют Debezium® в качестве обработчика для записи изменений данных.
Flink поддерживает интерпретацию сообщений JSON Debezium и Avro как INSERT/UPDATE/DELETE в систему Apache Flink SQL.
Эта поддержка полезна во многих случаях:
- Синхронизация добавочных данных из баз данных в другие системы
- Журналы аудита
- Создание материализованных представлений в режиме реального времени на базах данных
- Просмотр журнала изменения темпорального соединения таблицы базы данных
Теперь давайте узнаем, как отслеживать изменения в таблице PostgreSQL с помощью Flink-SQL CDC. Соединитель PostgreSQL CDC позволяет считывать данные моментального снимка и добавочные данные из базы данных PostgreSQL.
Необходимые компоненты
- Гибкий сервер Azure PostgresSQL версии 14.7
- Кластер Apache Flink в HDInsight в AKS
- Виртуальная машина Linux для использования клиента PostgreSQL
- Добавьте правило NSG, которое разрешает входящий и исходящий подключения через порт 5432 в HDInsight в подсети пула AKS.
Подготовка таблицы PostgreSQL и клиента
С помощью виртуальной машины Linux установите клиент PostgreSQL с помощью приведенных ниже команд.
sudo apt-get update sudo apt-get install postgresql-client
Установка сертификата для подключения к серверу PostgreSQL с помощью SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Подключитесь к серверу (замените имя узла, имя пользователя и базы данных соответствующим образом)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
После успешного подключения к базе данных создайте пример таблицы
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Чтобы включить CDC в базе данных PostgreSQL, необходимо внести следующие изменения.
Создание таблицы CDC Apache Flink PostgreSQL
Чтобы создать таблицу CDC Flink PostgreSQL, скачайте все зависимые JAR-файлы.
pom.xml
Используйте файл со следующим содержимым.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
С помощью команды maven для скачивания всех зависимых jars
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Примечание.
- Если веб-модуль ssh не содержит maven, следуйте ссылкам, чтобы скачать и установить его.
- Чтобы скачать jar-файл jsr, выполните следующую команду.
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
После скачивания зависимых jar-файлов запустите клиент Flink SQL с импортом этих jar-файлов в сеанс. Выполните следующую команду:
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Эти команды запускают клиент SQL с зависимостями как:
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Создание таблицы CDC Flink PostgreSQL с помощью соединителя CDC
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Проверка
Справочные материалы
- Веб-сайт Apache Flink
- Соединитель CDC PostgreSQL лицензирован в соответствии с лицензией Apache 2.0
- Apache, Apache Flink, Flink и связанные открытый код имена проектов являются товарными знаками Apache Software Foundation (ASF).