Gegevenswijzigingen vastleggen (CDC) van een PostgreSQL-tabel met Apache Flink®
Belangrijk
Azure HDInsight op AKS is op 31 januari 2025 buiten gebruik gesteld. Meer informatie met deze aankondiging.
U moet uw workloads migreren naar Microsoft Fabric- of een gelijkwaardig Azure-product om plotselinge beëindiging van uw workloads te voorkomen.
Belangrijk
Deze functie is momenteel beschikbaar als preview-versie. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews meer juridische voorwaarden bevatten die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet in algemene beschikbaarheid zijn vrijgegeven. Voor meer informatie over deze specifieke preview, zie Azure HDInsight op AKS preview-informatie. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight- met de details en volgt u ons voor meer updates over Azure HDInsight Community-.
Change Data Capture (CDC) is een techniek die u kunt gebruiken om wijzigingen op rijniveau in databasetabellen bij te houden als reactie op het maken, bijwerken en verwijderen van bewerkingen. In dit artikel gebruiken we CDC-connectors voor Apache Flink®, die een set bronconnectors bieden voor Apache Flink. De connectors integreren Debezium® als engine om wijzigingen in de gegevens vast te leggen.
Flink ondersteunt het interpreteren van Debezium JSON- en Avro-berichten als INSERT/UPDATE/DELETE-berichten in het Apache Flink SQL-systeem.
Deze ondersteuning is in veel gevallen nuttig voor:
- Incrementele gegevens van databases synchroniseren met andere systemen
- Auditlogboeken
- Materiële weergaven in realtime bouwen op databases
- Temporele join wijzigingsgeschiedenis van een databasetabel bekijken
Laten we nu leren hoe u wijzigingen in de PostgreSQL-tabel kunt bewaken met behulp van Flink-SQL CDC. Met de PostgreSQL CDC-connector kunt u momentopnamegegevens en incrementele gegevens lezen uit de PostgreSQL-database.
- flexibele Azure PostgresSQL-server versie 14.7
- Apache Flink Cluster op HDInsight op AKS
- Virtuele Linux-machine voor het gebruik van PostgreSQL-client
- Voeg de NSG-regel toe die binnenkomende en uitgaande verbindingen toestaat op poort 5432 in HDInsight op het subnet van de AKS-pool.
Installeer postgreSQL-client met behulp van onderstaande opdrachten met behulp van een virtuele Linux-machine
sudo apt-get update sudo apt-get install postgresql-client
Het certificaat installeren om verbinding te maken met postgreSQL-server met behulp van SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Verbinding maken met de server (vervang de host, gebruikersnaam en databasenaam dienovereenkomstig)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Nadat u verbinding hebt gemaakt met de database, maakt u een voorbeeldtabel
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Als u CDC wilt inschakelen in postgreSQL-database, moet u de volgende wijzigingen aanbrengen.
Als u een Flink PostgreSQL CDC-tabel wilt maken, downloadt u alle afhankelijke JAR's. Gebruik het
pom.xml
-bestand met de volgende inhoud.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Maven-opdracht gebruiken om alle afhankelijke JAR's te downloaden
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Notitie
- Als uw web-ssh-pod geen maven bevat, volgt u de koppelingen om deze te downloaden en te installeren.
- Gebruik de volgende opdracht om jsr JAR-bestand te downloaden
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Zodra de afhankelijke JAR's zijn gedownload, start u de Flink SQL-client, waarbij deze JAR's in de sessie moeten worden geïmporteerd. Voer de volgende opdracht uit:
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Met deze opdrachten start u de SQL-client met de afhankelijkheden als:
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Een Flink PostgreSQL CDC-tabel maken met behulp van CDC-connector
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
- Apache Flink Website
- PostgreSQL CDC Connector is gelicentieerd onder Apache 2.0-licentie
- Apache, Apache Flink, Flink en bijbehorende opensource-projectnamen zijn handelsmerken van de Apache Software Foundation (ASF).