Delen via


Ondersteuning voor Apache Kafka Connect integreren in Azure Event Hubs met Debezium voor Change Data Capture

Change Data Capture (CDC) is een techniek die wordt gebruikt om wijzigingen op rijniveau in databasetabellen bij te houden als reactie op het maken, bijwerken en verwijderen van bewerkingen. Debezium is een gedistribueerd platform dat voortbouwt op de functies Change Data Capture die beschikbaar zijn in verschillende databases (bijvoorbeeld logische decodering in PostgreSQL). Het biedt een set Kafka Connect-connectors die gebruikmaken van wijzigingen op rijniveau in databasetabellen en deze converteren naar gebeurtenisstromen die vervolgens naar Apache Kafka worden verzonden.

In deze zelfstudie leert u hoe u een systeem voor het vastleggen van wijzigingengegevens instelt op basis van Azure met behulp van Event Hubs (voor Kafka), Azure Database for PostgreSQL en Debezium. De connector Debezium PostgreSQL wordt gebruikt om databasewijzigingen van PostgreSQL naar Kafka-onderwerpen in Event Hubs te streamen.

Notitie

Dit artikel bevat verwijzingen naar een term die microsoft niet meer gebruikt. Zodra de term uit de software wordt verwijderd, verwijderen we deze uit dit artikel.

In deze zelfstudie voert u de volgende stappen uit:

  • Een Event Hubs-naamruimte maken
  • Azure Database for PostgreSQL instellen en configureren
  • Kafka Connect configureren en uitvoeren met Debezium PostgreSQL-connector
  • Test van het vastleggen van wijzigingsgegevens
  • (Optioneel) Maak gebruik van veranderingsgegevensgebeurtenissen met een FileStreamSink connector

Vereisten

U hebt het volgende nodig om deze procedure te voltooien:

Een Event Hubs-naamruimte maken

Er is een Event Hubs-naamruimte vereist om gegevens te verzenden naar en te ontvangen van Event Hubs-services. Zie Een Event Hub maken voor instructies voor het maken van een naamruimte en een Event Hub. Haal de Event Hubs-verbindingsreeks en de Fully Qualified Domain Name (FQDN) op voor later gebruik. Zie Get an Event Hubs connection string voor instructies.

Azure Database for PostgreSQL instellen en configureren

Azure Database for PostgreSQL is een relationele databaseservice op basis van de communityversie van opensource PostgreSQL-database-engine en is beschikbaar in drie implementatieopties: Enkele server, flexibele server en Cosmos DB for PostgreSQL. Volg deze instructies om een Azure Database for PostgreSQL-server te maken met behulp van Azure Portal.

Kafka Connect instellen en uitvoeren

In deze sectie worden de volgende onderwerpen behandeld:

  • Debezium connectorinstallatie
  • Kafka Connect configureren voor Event Hubs
  • Kafka Connect-cluster starten met Debezium-connector

Debezium-connector downloaden en instellen

Volg de nieuwste instructies in de Documentatie van Debezium om de connector te downloaden en in te stellen.

Kafka Connect configureren voor Event Hubs

Er is een minimale herconfiguratie vereist als u doorvoer van Kafka Connect wilt omleiden van Kafka naar Event Hubs. In het volgende connect-distributed.properties-voorbeeld wordt getoond hoe u Connect kunt configureren om op Event Hubs het Kafka-eindpunt te verifiëren en hoe ermee te communiceren:

Belangrijk

  • Met Debezium wordt automatisch een onderwerp per tabel en een aantal metagegevensonderwerpen gemaakt. Kafka-onderwerp komt overeen met een Event Hubs-exemplaar (Event Hub). Zie Kafka en Event Hubs conceptuele mapping voor Apache Kafka naar Azure Event Hubs.
  • Er gelden verschillende limieten voor het aantal Event Hubs in een Event Hubs-naamruimte, afhankelijk van de laag (Basic, Standard, Premium of Dedicated). Zie Quota voor deze limieten.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Belangrijk

Vervang {YOUR.EVENTHUBS.CONNECTION.STRING} door de verbindingsreeks voor uw Event Hubs-naamruimte. Zie Een verbindingsreeks voor Event Hubs ophalen voor instructies voor het ophalen van de verbindingsreeks. Hier volgt een voorbeeldconfiguratie: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Kafka Connect uitvoeren

In deze stap wordt een Kafka Connect-worker lokaal gestart in gedistribueerde modus, waarbij Event Hubs wordt gebruikt om de clusterstatus te handhaven.

  1. Sla het connect-distributed.properties bestand lokaal op. Zorg ervoor dat je alle waarden tussen accolades vervangt.
  2. Ga naar de locatie van de Kafka-release op uw computer.
  3. Voer ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties uit en wacht totdat het cluster is gestart.

Notitie

Kafka Connect maakt gebruik van de Kafka AdminClient-API om automatisch onderwerpen te maken met aanbevolen configuraties, waaronder compressie. Een snelle controle van de naamruimte in de Azure-portal laat zien dat de interne topics van de Connect-worker automatisch zijn gemaakt.

Interne onderwerpen van Kafka Connect moeten gebruikmaken van compressie. Het Event Hubs-team is niet verantwoordelijk voor het oplossen van onjuiste configuraties als interne connect-onderwerpen onjuist zijn geconfigureerd.

De Debezium PostgreSQL-bronconnector configureren en starten

Maak een configuratiebestand (pg-source-connector.json) voor de PostgreSQL-bronconnector: vervang de waarden volgens uw Azure PostgreSQL-exemplaar.

{
    "name": "todo-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
        "database.port": "5432",
        "database.user": "<replace with database user name>",
        "database.password": "<replace with database password>",
        "database.dbname": "postgres",
        "database.server.name": "my-server",
        "plugin.name": "wal2json",
        "table.whitelist": "public.todos"
    }
}

Tip

database.server.name kenmerk is een logische naam die een naamruimte identificeert en biedt voor de specifieke PostgreSQL-databaseserver/het cluster dat wordt bewaakt.

Als u een exemplaar van de connector wilt maken, gebruikt u het Kafka Connect REST API-eindpunt:

curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors

De status van de connector controleren:

curl -s http://localhost:8083/connectors/todo-connector/status

Het testen van gegevensvastlegging bij wijzigingen

Als u het vastleggen van gegevens in actie wilt zien, moet u records maken/bijwerken/verwijderen in de Azure PostgreSQL-database.

Maak eerst verbinding met uw Azure PostgreSQL-database (in het volgende voorbeeld wordt psql gebruikt).

psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

e.g. 

psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require

Een tabel maken en records invoegen

CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));

INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');

De connector moet nu in actie komen en gegevensgebeurtenissen van wijzigingen verzenden naar een Event Hubs-onderwerp met de volgende naam my-server.public.todos, ervan uitgaande dat u my-server als waarde hebt en public.todos de tabel is waarvan u de wijzigingen bijhoudt, zoals gedefinieerd in de table.whitelist-configuratie.

Event Hubs-onderwerp controleren

Laten we de inhoud van het onderwerp introspecteren om te controleren of alles naar verwachting werkt. In het volgende voorbeeld wordt kafkacat gebruikt, maar u kunt ook een consument aanmaken met behulp van een van de hier vermelde opties.

Maak een bestand met de naam kafkacat.conf met de volgende inhoud:

metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>

Notitie

Update metadata.broker.list en sasl.password kenmerken in kafkacat.conf op basis van Event Hubs-informatie.

Start een consument in een andere terminal:

export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos

kafkacat -b $BROKER -t $TOPIC -o beginning

U ziet nu de JSON-nettoladingen die de wijzigingsgegevensgebeurtenissen vertegenwoordigen die zijn gegenereerd in PostgreSQL als reactie op de rijen die u aan de todos tabel hebt toegevoegd. Dit is een fragment van de inhoud:

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "description": "setup postgresql on azure",
            "todo_status": "complete"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "fulfillment",
            "ts_ms": 1593018069944,
            "snapshot": "last",
            "db": "postgres",
            "schema": "public",
            "table": "todos",
            "txId": 602,
            "lsn": 184579736,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1593018069947,
        "transaction": null
    }

De gebeurtenis bestaat uit de payload samen met de schema (weggelaten voor beknoptheid). In de sectie payload ziet u hoe de bewerking voor maken ("op": "c") wordt weergegeven - "before": null betekent dit dat het om een nieuwe INSERTrij gaat, after biedt waarden voor de kolommen in de rij, source biedt de metagegevens van het PostgreSQL-exemplaar van waaruit deze gebeurtenis is opgehaald, en verder.

U kunt hetzelfde proberen met update- of verwijderbewerkingen en de wijzigingsgegevensgebeurtenissen bekijken. Als u bijvoorbeeld de status van de taak configure and install connector wilt bijwerken (ervan uitgaande dat de id3 is):

UPDATE todos SET todo_status = 'complete' WHERE id = 3;

(Optioneel) FileStreamSink-connector installeren

Nu alle todos tabelwijzigingen worden vastgelegd in het Event Hubs-onderwerp, gebruikt u de FileStreamSink-connector (die standaard beschikbaar is in Kafka Connect) om deze gebeurtenissen te gebruiken.

Maak een configuratiebestand (file-sink-connector.json) voor de connector: vervang het file kenmerk op basis van uw bestandssysteem.

{
    "name": "cdc-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-server.public.todos",
        "file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
    }
}

De connector maken en de status ervan controleren:

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

curl http://localhost:8083/connectors/cdc-file-sink/status

Databaserecords invoegen/bijwerken/verwijderen en de records bewaken in het geconfigureerde uitvoersinkbestand:

tail -f /Users/foo/todos-cdc.txt

Opschonen

Kafka Connect maakt Event Hubs-onderwerpen voor het opslaan van configuraties, offsets en statussen die behouden blijven, zelfs nadat het Kafka Connect-cluster is uitgeschakeld. Tenzij deze persistentie gewenst is, raden we u aan deze onderwerpen te verwijderen. Mogelijk wilt u ook de my-server.public.todos Event Hub verwijderen die tijdens deze procedure is gemaakt.

Volgende stappen

Zie de volgende artikelen voor meer informatie over Event Hubs voor Kafka: