Partage via


Intégrer la prise en charge d’Apache Kafka Connect sur Azure Event Hubs avec Debezium pour la capture des changements de données

La capture des changements de données (CDC) est une technique utilisée pour effectuer le suivi des modifications au niveau des lignes dans les tables de base de données en réponse à des opérations de création, de mise à jour et de suppression. Debezium est une plateforme distribuée qui s’appuie sur les fonctionnalités de capture des changements de données disponibles dans différentes bases de données (par exemple, le décodage logique dans PostgreSQL). Elle fournit un ensemble de connecteurs Kafka Connect qui récupèrent les modifications des lignes dans les tables de base de données et les convertissent en flux d’événements qui sont ensuite envoyés à Apache Kafka.

Ce tutoriel vous explique comment configurer un système basé sur la capture de modifications de données sur Azure à l’aide de Event Hubs (pour Kafka), Azure Database pour PostgreSQL et Debezium. Il utilise le connecteur Debezium PostgreSQL pour diffuser des modifications de base de données de PostgreSQL vers des rubriques Kafka dans Event Hubs.

Remarque

Cet article contient des références à un terme qui n’est plus utilisé par Microsoft. Lorsque le terme sera supprimé du logiciel, nous le supprimerons de cet article.

Dans ce tutoriel, vous effectuez les étapes suivantes :

  • Créer un espace de noms Event Hubs
  • Installer et configurer Azure Database pour PostgreSQL
  • Configurer et exécuter Kafka Connect avec le connecteur Debezium PostgreSQL
  • Tester la capture des changements de données
  • (Facultatif) Utiliser des événements de changement de données avec un connecteur FileStreamSink

Prérequis

Pour effectuer cette procédure pas à pas, vous devez effectuer les étapes suivantes :

Créer un espace de noms Event Hubs

Un espace de noms Event Hubs est requis pour échanger avec tout service Event Hubs. Pour obtenir des instructions sur la création d'un espace de noms et d'un Event Hub, consultez Créer un Event Hub. Obtenez la chaîne de connexion Event Hubs et le nom de domaine complet (FQDN) pour une utilisation ultérieure. Pour obtenir des instructions, consultez Obtenir une chaîne de connexion Event Hubs.

Installer et configurer Azure Database pour PostgreSQL

Azure Database pour PostgreSQL est un service de base de données relationnelle basé sur la version communautaire du moteur de base de données PostgreSQL open source et disponible dans trois options de déploiement : serveur unique, serveur flexible et Cosmos DB pour PostgreSQL. Procédez comme suit pour créer un serveur Azure Database pour PostgreSQL à l’aide du Portail Azure.

Configurer et exécuter Kafka Connect

Cette section couvre les sujets suivants :

  • Installation du connecteur Debezium
  • Configuration de Kafka Connect pour Event Hubs
  • Démarrer le cluster Kafka Connect avec le connecteur Debezium

Télécharger et configurer le connecteur Debezium

Suivez les dernières instructions de la documentation Debezium pour télécharger et configurer le connecteur.

Configurer Kafka Connect pour Event Hubs

Une reconfiguration minimale est nécessaire pour rediriger le débit Kafka Connect de Kafka vers Event Hubs. L’exemple connect-distributed.properties suivant montre comment configurer Connect pour s’authentifier et communiquer avec le point de terminaison Kafka sur Event Hubs :

Important

  • Debezium crée automatiquement une rubrique par table et une série de rubriques de métadonnées. La rubrique Kafka correspond à une instance d’Event Hubs (hub d’événements). Pour connaître les mappages entre Apache Kafka et Azure Event Hubs, consultez Mappage des concepts Kafka et Event Hubs.
  • Il existe différentes limites sur le nombre de hubs d’événements dans un espace de noms Event Hubs en fonction du niveau (De base, Standard, Premium ou Dedicated). Pour connaître ces limites, consultez Quotas.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Important

Remplacez {YOUR.EVENTHUBS.CONNECTION.STRING} par la chaîne de connexion de votre espace de noms Event Hubs. Pour savoir comment obtenir la chaîne de connexion, consultez Obtenir une chaîne de connexion Event Hubs. Voici un exemple de configuration : sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Exécuter Kafka Connect

Dans cette étape, un Worker Kafka Connect est démarré en local en mode distribué, à l’aide d’Event Hubs pour maintenir l’état du cluster.

  1. Enregistrez le fichier connect-distributed.properties ci-dessus en local. Veillez à remplacer toutes les valeurs entre accolades.
  2. Accédez à l’emplacement de la version de Kafka sur votre ordinateur.
  3. Exécutez ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties et attendez que le cluster démarre.

Notes

Kafka Connect utilise l’API Kafka AdminClient pour créer automatiquement des rubriques avec les configurations recommandées, y compris le compactage. Une vérification rapide de l’espace de noms dans le portail Azure révèle que les rubriques internes du Worker Connect ont été créées automatiquement.

Les rubriques internes Kafka Connect doivent utiliser le compactage. L’équipe Event Hubs n’est pas responsable de la résolution des configurations incorrectes si les rubriques Connect internes ne sont pas correctement configurées.

Configurer et démarrer le connecteur Debezium PostgreSQL source

Créez un fichier de configuration (pg-source-connector.json) pour le connecteur source PostgreSQL : remplacez les valeurs comme pour votre instance Azure PostgreSQL.

{
    "name": "todo-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
        "database.port": "5432",
        "database.user": "<replace with database user name>",
        "database.password": "<replace with database password>",
        "database.dbname": "postgres",
        "database.server.name": "my-server",
        "plugin.name": "wal2json",
        "table.whitelist": "public.todos"
    }
}

Conseil

L’attribut database.server.name est un nom logique qui identifie et fournit un espace de noms pour le serveur/cluster de base de données PostgreSQL particulier surveillé.

Pour créer une instance du connecteur, utilisez le point de terminaison de l’API REST Kafka Connect :

curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors

Pour vérifier l’état du connecteur :

curl -s http://localhost:8083/connectors/todo-connector/status

Tester la capture des changements de données

Pour voir la capture des données modifiées en action, vous devez créer/mettre à jour/supprimer des enregistrements dans la base de données Azure PostgreSQL.

Commencez par vous connecter à votre base de données Azure PostgreSQL (l’exemple suivant utilise psql).

psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

e.g. 

psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require

Créer une table et insérer des enregistrements

CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));

INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');

Le connecteur doit maintenant entrer en action et envoyer des événements de données de modification à une rubrique Event Hubs avec le nom suivant my-server.public.todos, en supposant que vous avez my-server comme valeur pour database.server.name et que public.todos est la table dont vous suivez les modifications (conformément à la configuration table.whitelist).

Consulter la rubrique Check Event Hubs

Examinons le contenu de la rubrique pour s’assurer que tout fonctionne comme prévu. L’exemple ci-dessous utilise kafkacat, mais vous pouvez également créer un contrôle serveur consommateur à l’aide de l’une des options répertoriées ici.

Créez un fichier nommé kafkacat.conf avec le contenu suivant :

metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>

Notes

Mettez à jour les attributs metadata.broker.list et sasl.password dans kafkacat.conf en fonction des informations d’Event Hubs.

Dans un autre terminal, démarrez un contrôle serveur consommateur :

export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos

kafkacat -b $BROKER -t $TOPIC -o beginning

Vous devez voir les charges utiles JSON représentant les événements de changement des données générés dans PostgreSQL en réponse aux lignes que vous avez ajoutées à la table todos. Voici un extrait de la charge utile :

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "description": "setup postgresql on azure",
            "todo_status": "complete"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "fullfillment",
            "ts_ms": 1593018069944,
            "snapshot": "last",
            "db": "postgres",
            "schema": "public",
            "table": "todos",
            "txId": 602,
            "lsn": 184579736,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1593018069947,
        "transaction": null
    }

L’événement se compose de payload , ainsi que de son schema (omis par souci de concision). Dans la section payload, notez comment l’opération de création ("op": "c") est représentée : "before": null signifie qu’il s’agit d’une nouvelle ligne insérée (INSERT), after fournit des valeurs pour les colonnes de la ligne, source fournit les métadonnées de l’instance PostgreSQL à partir desquelles cet événement a été récupéré et ainsi de suite.

Vous pouvez également essayer les mêmes opérations de mise à jour ou de suppression, et examiner les événements de changement des données. Par exemple, pour mettre à jour l’état de la tâche pour configure and install connector (en supposant que son id est 3) :

UPDATE todos SET todo_status = 'complete' WHERE id = 3;

(Facultatif) Installer le connecteur FileStreamSink

Maintenant que toutes les modifications apportées à la table todos sont capturées dans la rubrique Event Hubs, vous utilisez le connecteur FileStreamSink (disponible par défaut dans Kafka Connect) pour consommer ces événements.

Créez un fichier de configuration (file-sink-connector.json) pour le connecteur : remplacez l’attribut file en fonction de votre système de fichiers.

{
    "name": "cdc-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-server.public.todos",
        "file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
    }
}

Pour créer le connecteur et vérifier son état :

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

curl http://localhost:8083/connectors/cdc-file-sink/status

Insérez/mettez à jour/supprimez des enregistrements de base de données et surveillez les enregistrements dans le fichier récepteur de sortie configuré :

tail -f /Users/foo/todos-cdc.txt

Nettoyage

Kafka Connect crée des rubriques Event Hubs pour stocker les configurations, les décalages et l’état qui persistent même après la prise en charge du cluster Kafka Connect. Sauf si cette persistance est souhaitée, nous vous recommandons de supprimer ces rubriques. Vous pouvez également supprimer le hub d’événements my-server.public.todos qui ont été créés pendant cette procédure pas à pas.

Étapes suivantes

Pour plus d’informations sur Event Hubs pour Kafka, consultez les articles suivants :