Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Change Data Capture (CDC) is een techniek die wordt gebruikt om wijzigingen op rijniveau in databasetabellen bij te houden als reactie op het maken, bijwerken en verwijderen van bewerkingen. Debezium is een gedistribueerd platform dat voortbouwt op de functies Change Data Capture die beschikbaar zijn in verschillende databases (bijvoorbeeld logische decodering in PostgreSQL). Het biedt een set Kafka Connect-connectors die gebruikmaken van wijzigingen op rijniveau in databasetabellen en deze converteren naar gebeurtenisstromen die vervolgens naar Apache Kafka worden verzonden.
In deze zelfstudie leert u hoe u een systeem voor het vastleggen van wijzigingengegevens instelt op basis van Azure met behulp van Event Hubs (voor Kafka), Azure Database for PostgreSQL en Debezium. De connector Debezium PostgreSQL wordt gebruikt om databasewijzigingen van PostgreSQL naar Kafka-onderwerpen in Event Hubs te streamen.
Notitie
Dit artikel bevat verwijzingen naar een term die microsoft niet meer gebruikt. Zodra de term uit de software wordt verwijderd, verwijderen we deze uit dit artikel.
In deze zelfstudie voert u de volgende stappen uit:
- Een Event Hubs-naamruimte maken
- Azure Database for PostgreSQL instellen en configureren
- Kafka Connect configureren en uitvoeren met Debezium PostgreSQL-connector
- Test van het vastleggen van wijzigingsgegevens
- (Optioneel) Maak gebruik van veranderingsgegevensgebeurtenissen met een
FileStreamSink
connector
Vereisten
U hebt het volgende nodig om deze procedure te voltooien:
- Azure-abonnement. Als u nog geen account hebt, kunt u een gratis account maken.
- Linux/macOS
- Kafka-release (versie 1.1.1, Scala-versie 2.11), beschikbaar op kafka.apache.org
- Lees het inleidende artikel Event Hubs voor Apache Kafka door
Een Event Hubs-naamruimte maken
Er is een Event Hubs-naamruimte vereist om gegevens te verzenden naar en te ontvangen van Event Hubs-services. Zie Een Event Hub maken voor instructies voor het maken van een naamruimte en een Event Hub. Haal de Event Hubs-verbindingsreeks en de Fully Qualified Domain Name (FQDN) op voor later gebruik. Zie Get an Event Hubs connection string voor instructies.
Azure Database for PostgreSQL instellen en configureren
Azure Database for PostgreSQL is een relationele databaseservice op basis van de communityversie van opensource PostgreSQL-database-engine en is beschikbaar in drie implementatieopties: Enkele server, flexibele server en Cosmos DB for PostgreSQL. Volg deze instructies om een Azure Database for PostgreSQL-server te maken met behulp van Azure Portal.
Kafka Connect instellen en uitvoeren
In deze sectie worden de volgende onderwerpen behandeld:
- Debezium connectorinstallatie
- Kafka Connect configureren voor Event Hubs
- Kafka Connect-cluster starten met Debezium-connector
Debezium-connector downloaden en instellen
Volg de nieuwste instructies in de Documentatie van Debezium om de connector te downloaden en in te stellen.
- Download het plug-inarchief van de connector. Als u bijvoorbeeld de versie
1.2.0
van de connector wilt downloaden, gebruikt u deze koppeling - https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.2.0.Final/debezium-connector-postgres-1.2.0.Final-plugin.tar.gz - Pak de JAR-bestanden uit en kopieer ze naar de Kafka Connect-invoegtoepassing.path.
Kafka Connect configureren voor Event Hubs
Er is een minimale herconfiguratie vereist als u doorvoer van Kafka Connect wilt omleiden van Kafka naar Event Hubs. In het volgende connect-distributed.properties
-voorbeeld wordt getoond hoe u Connect kunt configureren om op Event Hubs het Kafka-eindpunt te verifiëren en hoe ermee te communiceren:
Belangrijk
- Met Debezium wordt automatisch een onderwerp per tabel en een aantal metagegevensonderwerpen gemaakt. Kafka-onderwerp komt overeen met een Event Hubs-exemplaar (Event Hub). Zie Kafka en Event Hubs conceptuele mapping voor Apache Kafka naar Azure Event Hubs.
- Er gelden verschillende limieten voor het aantal Event Hubs in een Event Hubs-naamruimte, afhankelijk van de laag (Basic, Standard, Premium of Dedicated). Zie Quota voor deze limieten.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
Belangrijk
Vervang {YOUR.EVENTHUBS.CONNECTION.STRING}
door de verbindingsreeks voor uw Event Hubs-naamruimte. Zie Een verbindingsreeks voor Event Hubs ophalen voor instructies voor het ophalen van de verbindingsreeks. Hier volgt een voorbeeldconfiguratie: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Kafka Connect uitvoeren
In deze stap wordt een Kafka Connect-worker lokaal gestart in gedistribueerde modus, waarbij Event Hubs wordt gebruikt om de clusterstatus te handhaven.
- Sla het
connect-distributed.properties
bestand lokaal op. Zorg ervoor dat je alle waarden tussen accolades vervangt. - Ga naar de locatie van de Kafka-release op uw computer.
- Voer
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
uit en wacht totdat het cluster is gestart.
Notitie
Kafka Connect maakt gebruik van de Kafka AdminClient-API om automatisch onderwerpen te maken met aanbevolen configuraties, waaronder compressie. Een snelle controle van de naamruimte in de Azure-portal laat zien dat de interne topics van de Connect-worker automatisch zijn gemaakt.
Interne onderwerpen van Kafka Connect moeten gebruikmaken van compressie. Het Event Hubs-team is niet verantwoordelijk voor het oplossen van onjuiste configuraties als interne connect-onderwerpen onjuist zijn geconfigureerd.
De Debezium PostgreSQL-bronconnector configureren en starten
Maak een configuratiebestand (pg-source-connector.json
) voor de PostgreSQL-bronconnector: vervang de waarden volgens uw Azure PostgreSQL-exemplaar.
{
"name": "todo-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
"database.port": "5432",
"database.user": "<replace with database user name>",
"database.password": "<replace with database password>",
"database.dbname": "postgres",
"database.server.name": "my-server",
"plugin.name": "wal2json",
"table.whitelist": "public.todos"
}
}
Tip
database.server.name
kenmerk is een logische naam die een naamruimte identificeert en biedt voor de specifieke PostgreSQL-databaseserver/het cluster dat wordt bewaakt.
Als u een exemplaar van de connector wilt maken, gebruikt u het Kafka Connect REST API-eindpunt:
curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors
De status van de connector controleren:
curl -s http://localhost:8083/connectors/todo-connector/status
Het testen van gegevensvastlegging bij wijzigingen
Als u het vastleggen van gegevens in actie wilt zien, moet u records maken/bijwerken/verwijderen in de Azure PostgreSQL-database.
Maak eerst verbinding met uw Azure PostgreSQL-database (in het volgende voorbeeld wordt psql gebruikt).
psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require
e.g.
psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require
Een tabel maken en records invoegen
CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));
INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');
De connector moet nu in actie komen en gegevensgebeurtenissen van wijzigingen verzenden naar een Event Hubs-onderwerp met de volgende naam my-server.public.todos
, ervan uitgaande dat u my-server
als waarde hebt en public.todos
de tabel is waarvan u de wijzigingen bijhoudt, zoals gedefinieerd in de table.whitelist
-configuratie.
Event Hubs-onderwerp controleren
Laten we de inhoud van het onderwerp introspecteren om te controleren of alles naar verwachting werkt. In het volgende voorbeeld wordt kafkacat
gebruikt, maar u kunt ook een consument aanmaken met behulp van een van de hier vermelde opties.
Maak een bestand met de naam kafkacat.conf
met de volgende inhoud:
metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>
Notitie
Update metadata.broker.list
en sasl.password
kenmerken in kafkacat.conf
op basis van Event Hubs-informatie.
Start een consument in een andere terminal:
export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos
kafkacat -b $BROKER -t $TOPIC -o beginning
U ziet nu de JSON-nettoladingen die de wijzigingsgegevensgebeurtenissen vertegenwoordigen die zijn gegenereerd in PostgreSQL als reactie op de rijen die u aan de todos
tabel hebt toegevoegd. Dit is een fragment van de inhoud:
{
"schema": {...},
"payload": {
"before": null,
"after": {
"id": 1,
"description": "setup postgresql on azure",
"todo_status": "complete"
},
"source": {
"version": "1.2.0.Final",
"connector": "postgresql",
"name": "fulfillment",
"ts_ms": 1593018069944,
"snapshot": "last",
"db": "postgres",
"schema": "public",
"table": "todos",
"txId": 602,
"lsn": 184579736,
"xmin": null
},
"op": "c",
"ts_ms": 1593018069947,
"transaction": null
}
De gebeurtenis bestaat uit de payload
samen met de schema
(weggelaten voor beknoptheid). In de sectie payload
ziet u hoe de bewerking voor maken ("op": "c"
) wordt weergegeven - "before": null
betekent dit dat het om een nieuwe INSERT
rij gaat, after
biedt waarden voor de kolommen in de rij, source
biedt de metagegevens van het PostgreSQL-exemplaar van waaruit deze gebeurtenis is opgehaald, en verder.
U kunt hetzelfde proberen met update- of verwijderbewerkingen en de wijzigingsgegevensgebeurtenissen bekijken. Als u bijvoorbeeld de status van de taak configure and install connector
wilt bijwerken (ervan uitgaande dat de id
3
is):
UPDATE todos SET todo_status = 'complete' WHERE id = 3;
(Optioneel) FileStreamSink-connector installeren
Nu alle todos
tabelwijzigingen worden vastgelegd in het Event Hubs-onderwerp, gebruikt u de FileStreamSink-connector (die standaard beschikbaar is in Kafka Connect) om deze gebeurtenissen te gebruiken.
Maak een configuratiebestand (file-sink-connector.json
) voor de connector: vervang het file
kenmerk op basis van uw bestandssysteem.
{
"name": "cdc-file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "my-server.public.todos",
"file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
}
}
De connector maken en de status ervan controleren:
curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors
curl http://localhost:8083/connectors/cdc-file-sink/status
Databaserecords invoegen/bijwerken/verwijderen en de records bewaken in het geconfigureerde uitvoersinkbestand:
tail -f /Users/foo/todos-cdc.txt
Opschonen
Kafka Connect maakt Event Hubs-onderwerpen voor het opslaan van configuraties, offsets en statussen die behouden blijven, zelfs nadat het Kafka Connect-cluster is uitgeschakeld. Tenzij deze persistentie gewenst is, raden we u aan deze onderwerpen te verwijderen. Mogelijk wilt u ook de my-server.public.todos
Event Hub verwijderen die tijdens deze procedure is gemaakt.
Volgende stappen
Zie de volgende artikelen voor meer informatie over Event Hubs voor Kafka: