Freigeben über


Integrieren der Unterstützung für Apache Kafka Connect in Azure Event Hubs mit Debezium für Change Data Capture

Bei Change Data Capture (CDC) handelt es sich um eine Methode, die verwendet wird, um Änderungen auf Zeilenebene in Datenbanktabellen als Reaktion auf Erstellungs-, Aktualisierungs- und Löschvorgänge nachzuverfolgen. Debezium ist eine verteilte Plattform, die auf den in verschiedenen Datenbanken verfügbaren Change Data Capture-Funktionen aufbaut (z. B. logische Decodierung in PostgreSQL). Es bietet eine Reihe von Kafka Connect-Connectors, die auf Änderungen auf Zeilenebene in Datenbanktabellen zugreifen und diese in Ereignisstreams konvertieren, die dann an Apache Kafka gesendet werden.

In diesem Tutorial werden Sie schrittweise durch den Vorgang geführt, mit dem Sie ein Change Data Capture-basiertes System in Azure mithilfe von Event Hubs (für Kafka), Azure Database for PostgreSQL und Debezium einrichten. Es verwendet den Debezium PostgreSQL-Connector, um Datenbankänderungen aus PostgreSQL zu Kafka-Themen in Event Hubs zu streamen.

Hinweis

In diesem Artikel wird ein Begriff verwendet, der von Microsoft nicht mehr genutzt wird. Sobald der Begriff aus der Software entfernt wurde, wird er auch aus diesem Artikel entfernt.

In diesem Tutorial führen Sie die folgenden Schritte aus:

  • Erstellen eines Event Hubs-Namespace
  • Einrichten und Konfigurieren von Azure Database for PostgreSQL
  • Konfigurieren und Ausführen von Kafka Connect mit dem PostgreSQL-Connector von Debezium
  • Testen von Change Data Capture
  • (Optional) Nutzen von Datenänderungsereignissen mit einem FileStreamSink-Connector

Voraussetzungen

Im Rahmen dieser exemplarischen Vorgehensweise ist Folgendes erforderlich:

Erstellen eines Event Hubs-Namespace

Ein Event Hubs-Namespace ist erforderlich, um Nachrichten an einen Event Hubs-Dienst zu senden und von diesem zu empfangen. Anweisungen zum Erstellen eines Namespace und eines Event Hub finden Sie unter Erstellen eines Event Hubs. Rufen Sie die Event Hubs-Verbindungszeichenfolge und den vollqualifizierten Domänennamen (Fully Qualified Domain Name, FQDN) zur späteren Verwendung ab. Anweisungen hierzu finden Sie unter Get an Event Hubs connection string (Abrufen einer Event Hubs-Verbindungszeichenfolge).

Einrichten und Konfigurieren von Azure Database for PostgreSQL

Azure Database for PostgreSQL ist ein relationaler Datenbankdienst, der auf der Communityversion des Open-Source-Datenbankmoduls „PostgreSQL“ basiert und in drei Bereitstellungsoptionen verfügbar ist: Einzelserver, Flexibler Server und Cosmos DB for PostgreSQL. Befolgen Sie diese Anweisungen, um einen Azure Database for PostgreSQL-Server über das Azure-Portal zu erstellen.

Einrichten und Ausführen von Kafka Connect

Dieser Abschnitt enthält die folgenden Themen:

  • Installation des Debezium-Connectors
  • Konfigurieren von Kafka Connect für Event Hubs
  • Starten des Kafka Connect-Clusters mit dem Debezium-Connector

Herunterladen und Einrichten des Debezium-Connectors

Befolgen Sie die neuesten Anweisungen in der Debezium-Dokumentation, um den Connector herunterzuladen und einzurichten.

Konfigurieren von Kafka Connect für Event Hubs

Es ist nur eine minimale Neukonfiguration erforderlich, wenn Sie den Kafka Connect-Durchsatz von Kafka an Event Hubs umleiten. Im folgenden Beispiel connect-distributed.properties ist dargestellt, wie Sie Connect konfigurieren, um die Authentifizierung und Kommunikation mit dem Kafka-Endpunkt unter Event Hubs einzurichten:

Wichtig

  • Debezium erstellt automatisch ein Thema pro Tabelle und eine Reihe von Metadatenthemen. Das Kafka-Thema entspricht einer Event Hubs-Instanz (Event Hub). Weitere Zuordnungen zwischen Apache Kafka und Azure Event Hubs finden Sie unter Konzeptionelle Zuordnung zwischen Kafka und Event Hubs.
  • Je nach Tarif (Basic, Standard, Premium oder Dedicated) gelten unterschiedliche Grenzwerte für die Anzahl von Event Hubs in einem Event Hubs-Namespace. Informationen zu diesen Grenzwerten finden Sie unter Kontingente.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Wichtig

Ersetzen Sie {YOUR.EVENTHUBS.CONNECTION.STRING} durch die Verbindungszeichenfolge für Ihren Event Hubs-Namespace. Anweisungen zum Abrufen der Verbindungszeichenfolge finden Sie unter Abrufen einer Event Hubs-Verbindungszeichenfolge. Hier sehen Sie eine Beispielkonfiguration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Ausführen von Kafka Connect

In diesem Schritt wird ein Kafka Connect-Worker lokal im verteilten Modus gestartet, indem Event Hubs zum Warten des Clusterzustands verwendet wird.

  1. Speichern Sie die obige Datei connect-distributed.properties lokal. Achten Sie darauf, alle Werte in geschweiften Klammern zu ersetzen.
  2. Navigieren Sie zum Speicherort des Kafka-Release auf Ihrem Computer.
  3. Führen Sie ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties aus, und warten Sie, bis der Cluster startet.

Hinweis

Kafka Connect verwendet die Kafka-AdminClient-API, um automatisch Themen mit empfohlenen Konfigurationen, einschließlich Komprimierung, zu erstellen. Eine schnelle Überprüfung des Namespace im Azure-Portal ergibt, dass die internen Themen des Connect-Workers automatisch erstellt wurden.

Interne Kafka Connect-Themen müssen Komprimierung verwenden. Das Event Hubs-Team ist nicht für die Korrektur falscher Konfigurationen zuständig, sollten interne Connect-Themen nicht ordnungsgemäß konfiguriert sein.

Konfigurieren und Starten des Debezium PostgreSQL-Quellconnectors

Erstellen Sie eine Konfigurationsdatei (pg-source-connector.json) für den PostgreSQL-Quellconnector, und ersetzen Sie dabei die Werte gemäß Ihrer Azure PostgreSQL-Instanz.

{
    "name": "todo-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
        "database.port": "5432",
        "database.user": "<replace with database user name>",
        "database.password": "<replace with database password>",
        "database.dbname": "postgres",
        "database.server.name": "my-server",
        "plugin.name": "wal2json",
        "table.whitelist": "public.todos"
    }
}

Tipp

Das database.server.name-Attribut ist ein logischer Name, der einen Namespace für den spezifischen überwachten PostgreSQL-Datenbankserver/-Cluster identifiziert und bereitstellt.

Verwenden Sie zum Erstellen einer Instanz des Connectors den Kafka Connect-REST-API-Endpunkt:

curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors

So überprüfen Sie den Status des Connectors

curl -s http://localhost:8083/connectors/todo-connector/status

Testen von Change Data Capture

Um Change Data Capture in Aktion zu sehen, müssen Sie Datensätze in der Azure PostgreSQL-Datenbank erstellen/aktualisieren/löschen.

Beginnen Sie damit, dass Sie eine Verbindung mit Ihrer Azure PostgreSQL-Datenbank herstellen (im folgenden Beispiel wird psql verwendet).

psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

e.g. 

psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require

Erstellen einer Tabelle und Einfügen von Datensätzen

CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));

INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');

Der Connector sollte jetzt aktiv werden und Datenänderungsereignisse an ein Event Hubs-Thema mit dem Namen my-server.public.todos senden, vorausgesetzt, Sie haben my-server als Wert für database.server.name festgelegt, und public.todos ist die Tabelle, deren Änderungen Sie nachverfolgen (gemäß Konfiguration table.whitelist).

Überprüfen des Event Hubs-Themas

Betrachten wir nun den Inhalt des Themas, um sicherzustellen, dass alles wie erwartet funktioniert. Im folgenden Beispiel wird kafkacat verwendet. Sie können aber auch einen Verbraucher mithilfe einer der hier aufgeführten Optionen erstellen.

Erstellen Sie eine Datei namens kafkacat.conf mit folgendem Inhalt:

metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>

Hinweis

Aktualisieren Sie die Attribute metadata.broker.list und sasl.password in kafkacat.conf gemäß den Event Hubs-Informationen.

Starten Sie in einem anderen Terminal einen Consumer:

export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos

kafkacat -b $BROKER -t $TOPIC -o beginning

Die JSON-Nutzdaten, die die Datenänderungsereignisse darstellen, die als Reaktion auf die Zeilen, die Sie in der Tabelle todos hinzugefügt haben, in PostgreSQL generiert wurden, sollten angezeigt werden. Hier ist ein Codeausschnitt der Nutzdaten:

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "description": "setup postgresql on azure",
            "todo_status": "complete"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "fulfillment",
            "ts_ms": 1593018069944,
            "snapshot": "last",
            "db": "postgres",
            "schema": "public",
            "table": "todos",
            "txId": 602,
            "lsn": 184579736,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1593018069947,
        "transaction": null
    }

Das Ereignis besteht aus der payload zusammen mit dem schema (aus Gründen der Übersichtlichkeit ausgelassen). Beachten Sie im Abschnitt payload, wie der Erstellungsvorgang ("op": "c") dargestellt wird: "before": null bedeutet, dass es sich um eine neu eingefügte (INSERT) Zeile handelte; after stellt Werte für die Spalten in der Zeile bereit; source stellt die PostgreSQL-Instanzmetadaten von dem Ort bereit, an dem dieses Ereignis abgerufen wurde, usw.

Sie können dasselbe auch mit Aktualisierungs- oder Löschvorgängen ausprobieren und die Datenänderungsereignisse untersuchen. Um z. B. den Aufgabenstatus für configure and install connector zu aktualisieren (vorausgesetzt, seine id ist 3):

UPDATE todos SET todo_status = 'complete' WHERE id = 3;

(Optional) Installieren eines FileStreamSink-Connectors

Nachdem nun alle Änderungen an der Tabelle todos in dem Event Hubs-Thema aufgezeichnet werden, verwenden Sie den FileStreamSink-Connector (der standardmäßig in Kafka Connect verfügbar ist), um diese Ereignisse zu nutzen.

Erstellen Sie eine Konfigurationsdatei (file-sink-connector.json) für den Connector, und ersetzen Sie dabei das file-Attribut gemäß Ihrem Dateisystem.

{
    "name": "cdc-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-server.public.todos",
        "file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
    }
}

So erstellen Sie den Connector und überprüfen seinen Status

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

curl http://localhost:8083/connectors/cdc-file-sink/status

Datensätze einfügen/aktualisieren/löschen und die Datensätze in der konfigurierten Ausgabesenkendatei überwachen:

tail -f /Users/foo/todos-cdc.txt

Cleanup

Mit Kafka Connect werden Event Hub-Themen zum Speichern von Konfigurationen, Offsets und des Status erstellt, die auch dann beibehalten werden, nachdem der Kafka Connect-Cluster heruntergefahren wurde. Sofern diese Persistenz nicht gewünscht ist, empfehlen wir, diese Themen zu löschen. Sie können auch den Event Hub my-server.public.todos löschen, der in dieser exemplarischen Vorgehensweise erstellt wurde.

Nächste Schritte

Weitere Informationen zu Event Hubs für Kafka finden Sie in folgenden Artikeln: