Change Data Capture (CDC) de la table PostgreSQL à l’aide d’Apache Flink®
Remarque
Nous allons mettre hors service Azure HDInsight sur AKS le 31 janvier 2025. Avant le 31 janvier 2025, vous devrez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent afin d’éviter leur arrêt brutal. Les clusters restants de votre abonnement seront arrêtés et supprimés de l’hôte.
Seul le support de base sera disponible jusqu’à la date de mise hors service.
Important
Cette fonctionnalité est disponible actuellement en mode Aperçu. Les Conditions d’utilisation supplémentaires pour les préversions de Microsoft Azure contiennent davantage de conditions légales qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou ne se trouvant pas encore en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les Informations sur la préversion d’Azure HDInsight sur AKS. Si vous avez des questions ou des suggestions de fonctionnalités, soumettez une demande sur AskHDInsight avec les détails et suivez-nous pour obtenir les dernières actualités sur la Communauté Azure HDInsight.
Capture des changements de données (CDC) est une technique que vous pouvez utiliser pour suivre les modifications au niveau des lignes dans les tables de base de données en réponse aux opérations de création, de mise à jour et de suppression. Dans cet article, nous utilisons les Connecteurs CDC pour Apache Flink®, qui proposent un ensemble de connecteurs sources pour Apache Flink. Les connecteurs intègrent Debezium® comme moteur pour capturer les modifications des données.
Flink prend en charge l'interprétation des messages Debezium JSON et Avro en tant que messages INSERT/UPDATE/DELETE dans le système Apache Flink SQL.
Ce support est utile dans de nombreux cas pour :
- Synchronisez les données incrémentielles des bases de données avec d'autres systèmes
- Journaux d’audit
- Créez des vues matérialisées en temps réel sur des bases de données
- Afficher l'historique des modifications de jointure temporelle d'une table de base de données
Voyons maintenant comment surveiller les modifications sur la table PostgreSQL à l'aide de Flink-SQL CDC. Le connecteur PostgreSQL CDC permet de lire les données instantanées et les données incrémentielles de la base de données PostgreSQL.
Prérequis
- Serveur flexible Azure PostgresQL version 14.7
- Cluster Apache Flink sur HDInsight sur AKS
- Machine virtuelle Linux pour utiliser le client PostgreSQL
- Ajoutez la règle NSG qui autorise les connexions entrantes et sortantes sur le port 5432 dans HDInsight sur le sous-réseau du pool AKS.
Préparer la table et le client PostgreSQL
À l'aide d'une machine virtuelle Linux, installez le client PostgreSQL à l'aide des commandes ci-dessous
sudo apt-get update sudo apt-get install postgresql-client
Installez le certificat pour vous connecter au serveur PostgreSQL via SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Connectez-vous au serveur (remplacez l'hôte, le nom d'utilisateur et le nom de la base de données en conséquence)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Après vous être connecté avec succès à la base de données, créez un exemple de table
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Pour activer CDC sur la base de données PostgreSQL, vous devez apporter les modifications suivantes.
Créer une table CDC Apache Flink PostgreSQL
Pour créer la table CDC Flink PostgreSQL, téléchargez tous les fichiers jar dépendants. Utilisez le fichier
pom.xml
avec le contenu suivant.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Utilisez la commande maven pour télécharger tous les fichiers jar dépendants
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Remarque
- Si votre pod Web SSH ne contient pas Maven, veuillez suivre les liens pour le télécharger et l'installer.
- Pour télécharger le fichier jar jsr, utilisez la commande suivante
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Une fois les jars dépendants téléchargés, démarrez le client Flink SQL, avec ces jars à importer dans la session. Complétez la commande comme suit,
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Ces commandes démarrent le client SQL avec les dépendances telles que,
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Créer une table CDC Flink PostgreSQL à l'aide du connecteur CDC
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Validation
Référence
- Site web Apache Flink
- PostgreSQL CDC Connector est sous licence Apache 2.0
- Apache, Apache Flink, Flink et les noms de projet open source associés sont des marques de commerce d’Apache Software Foundation (ASF).