Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Change Data Capture (CDC) è una tecnica usata per tenere traccia delle modifiche a livello di riga nelle tabelle di database in risposta alle operazioni di creazione, aggiornamento ed eliminazione. Debezium è una piattaforma distribuita basata sulle funzionalità change data capture disponibili in database diversi, ad esempio la decodifica logica in PostgreSQL. Fornisce un set di connettori Kafka Connect che sfruttano le modifiche a livello di riga nelle tabelle di database e le convertono in flussi di eventi che vengono quindi inviati ad Apache Kafka.
Questa esercitazione illustra come configurare un sistema di Change Data Capture basato su Azure usando Hub eventi (per Kafka), Database di Azure per PostgreSQL e Debezium. Usa il connettore Debezium PostgreSQL per trasmettere le modifiche del database da PostgreSQL ad argomenti Kafka in Hub eventi.
Nota
Questo articolo contiene riferimenti a un termine che Microsoft non usa più. Quando il termine verrà rimosso dal software, verrà rimosso anche dall'articolo.
In questa esercitazione vengono completati i passaggi seguenti:
- Creare uno spazio dei nomi di Hub eventi
- Configurare Database di Azure per PostgreSQL
- Configurare ed eseguire Kafka Connect con il connettore Debezium PostgreSQL
- Testare Change Data Capture
- (Facoltativo) Utilizzare gli eventi dei dati delle modifiche con un
FileStreamSink
connettore
Prerequisiti
Per completare questa procedura dettagliata, è necessario:
- Sottoscrizione di Azure. Se non se ne ha una, creare un account gratuito.
- Linux/macOS
- Kafka (versione 1.1.1, versione di Scala 2.11), disponibile su kafka.apache.org
- Leggere con attenzione l'articolo introduttivo Hub eventi per Apache Kafka
Creare uno spazio dei nomi di Hub eventi
Per l'invio e la ricezione da qualsiasi servizio Hub eventi è richiesto uno spazio dei nomi di Hub eventi. Vedere Creazione di un hub eventi per istruzioni su come creare uno spazio dei nomi e un hub eventi. Ottenere la stringa di connessione di Hub eventi e il nome di dominio completo (FQDN) da usare successivamente. Per istruzioni, vedere Ottenere una stringa di connessione ad Hub eventi.
Configurare e configurare Database di Azure per PostgreSQL
Database di Azure per PostgreSQL è un servizio di database relazionale basato sulla versione community del motore di database PostgreSQL open source ed è disponibile in tre opzioni di distribuzione: server singolo, server flessibile e Cosmos DB per PostgreSQL. Seguire queste istruzioni per creare un server Database di Azure per PostgreSQL usando il portale di Azure.
Configurare ed eseguire Kafka Connect
Questa sezione contiene gli argomenti seguenti:
- Installazione del connettore Debezium
- Configurazione di Kafka Connect per Hub eventi
- Avviare il cluster Kafka Connect con il connettore Debezium
Scaricare e configurare il connettore Debezium
Seguire le istruzioni più recenti nella documentazione di Debezium per scaricare e configurare il connettore.
- Scaricare l'archivio plug-in del connettore. Ad esempio, per scaricare la versione
1.2.0
del connettore, usare questo collegamento: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.2.0.Final/debezium-connector-postgres-1.2.0.Final-plugin.tar.gz - Estrarre i file JAR e copiarli nel plug.path Kafka Connect.
Configurare Kafka Connect per Hub eventi
Quando si reindirizza la velocità effettiva di Kafka Connect da Kafka a Hub eventi, è necessaria una riconfigurazione minima. Il codice di esempio connect-distributed.properties
seguente illustra come configurare Connect per autenticare e comunicare con l'endpoint Kafka in Hub eventi:
Importante
- Debezium creerà automaticamente un argomento per tabella e una serie di argomenti sui metadati. L'argomento Kafka corrisponde a un'istanza di Hub eventi (hub eventi). Per i mapping di Apache Kafka Hub eventi di Azure, vedere Mapping concettuale di Kafka e Hub eventi.
- Esistono limiti diversi per il numero di hub eventi in uno spazio dei nomi di Hub eventi a seconda del livello (Basic, Standard, Premium o Dedicato). Per questi limiti, vedere Quote.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
Importante
Sostituire {YOUR.EVENTHUBS.CONNECTION.STRING}
con la stringa di connessione per lo spazio dei nomi di Hub eventi. Per istruzioni su come ottenere la stringa di connessione, vedere Ottenere una stringa di connessione ad Hub eventi. Ecco un esempio di configurazione: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Eseguire Kafka Connect
In questo passaggio un ruolo di lavoro Kafka Connect viene avviato localmente in modalità distribuita, usando Hub eventi per gestire lo stato del cluster.
- Salvare il file
connect-distributed.properties
in locale. Assicurarsi di sostituire tutti i valori racchiusi tra parentesi graffe. - Passare alla posizione della versione di Kafka nel computer.
- Eseguire
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
e attendere l'avvio del cluster.
Nota
Kafka Connect usa l'API Kafka AdminClient per creare automaticamente argomenti con le configurazioni consigliate, inclusa la compattazione. Un rapido controllo dello spazio dei nomi nel portale di Azure conforma che gli argomenti interni del ruolo di lavoro Connect sono stati creati automaticamente.
Gli argomenti interni di Kafka Connect devono usare la compattazione. Il team di Hub eventi non è responsabile della correzione di configurazioni non corrette se gli argomenti interni di Kafka Connect non sono configurati correttamente.
Configurare e avviare il connettore di origine Debezium PostgreSQL
Creare un file di configurazione (pg-source-connector.json
) per il connettore di origine PostgreSQL: sostituire i valori in base all'istanza di PostgreSQL di Azure.
{
"name": "todo-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
"database.port": "5432",
"database.user": "<replace with database user name>",
"database.password": "<replace with database password>",
"database.dbname": "postgres",
"database.server.name": "my-server",
"plugin.name": "wal2json",
"table.whitelist": "public.todos"
}
}
Suggerimento
database.server.name
l'attributo è un nome logico che identifica e fornisce uno spazio dei nomi per il server o il cluster di database PostgreSQL specifico monitorato.
Per creare un'istanza del connettore, usare l'endpoint dell'API REST Kafka Connect:
curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors
Per controllare lo stato del connettore:
curl -s http://localhost:8083/connectors/todo-connector/status
Testare Change Data Capture
Per visualizzare Change Data Capture in azione, è necessario creare/aggiornare/eliminare record nel database PostgreSQL di Azure.
Per iniziare, connettersi al database PostgreSQL di Azure (l'esempio seguente usa psql).
psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require
e.g.
psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require
Creare una tabella e inserire record
CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));
INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');
Il connettore dovrebbe ora attivare l'azione e inviare eventi di dati delle modifiche a un argomento di Hub eventi con il nome my-server.public.todos
seguente , presupponendo che il my-server
valore per database.server.name
e public.todos
sia la tabella le cui modifiche vengono monitorate (in base alla table.whitelist
configurazione).
Controllare l'argomento hub eventi
Si esaminerà ora il contenuto dell'argomento per assicurarsi che tutto funzioni come previsto. L'esempio seguente usa kafkacat
, ma è anche possibile creare un consumer usando una delle opzioni elencate qui.
Creare un file denominato kafkacat.conf
con il contenuto seguente:
metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>
Nota
Aggiornare metadata.broker.list
gli attributi in sasl.password
base alle kafkacat.conf
informazioni di Hub eventi.
In un terminale diverso avviare un consumer:
export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos
kafkacat -b $BROKER -t $TOPIC -o beginning
Verranno visualizzati i payload JSON che rappresentano gli eventi dei dati delle modifiche generati in PostgreSQL in risposta alle righe aggiunte alla todos
tabella. Ecco un frammento di payload:
{
"schema": {...},
"payload": {
"before": null,
"after": {
"id": 1,
"description": "setup postgresql on azure",
"todo_status": "complete"
},
"source": {
"version": "1.2.0.Final",
"connector": "postgresql",
"name": "fulfillment",
"ts_ms": 1593018069944,
"snapshot": "last",
"db": "postgres",
"schema": "public",
"table": "todos",
"txId": 602,
"lsn": 184579736,
"xmin": null
},
"op": "c",
"ts_ms": 1593018069947,
"transaction": null
}
L'evento è costituito da insieme al payload
relativo schema
(omesso per brevità). Nella payload
sezione si noti che l'operazione di creazione ("op": "c"
) è rappresentata "before": null
: significa che si tratta di una riga appena INSERT
ed, after
fornisce i valori per le colonne nella riga, source
fornisce i metadati dell'istanza di PostgreSQL da cui è stato prelevato l'evento e così via.
È possibile provare lo stesso con le operazioni di aggiornamento o eliminazione e osservare gli eventi dei dati delle modifiche. Ad esempio, per aggiornare lo stato dell'attività per configure and install connector
(presupponendo che sia id
3
):
UPDATE todos SET todo_status = 'complete' WHERE id = 3;
(Facoltativo) Installare il connettore FileStreamSink
Ora che tutte le modifiche alla todos
tabella vengono acquisite nell'argomento hub eventi, usare il connettore FileStreamSink (disponibile per impostazione predefinita in Kafka Connect) per utilizzare questi eventi.
Creare un file di configurazione (file-sink-connector.json
) per il connettore: sostituire l'attributo in base al file
file system.
{
"name": "cdc-file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "my-server.public.todos",
"file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
}
}
Per creare il connettore e controllarne lo stato:
curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors
curl http://localhost:8083/connectors/cdc-file-sink/status
Inserire/aggiornare/eliminare record di database e monitorare i record nel file sink di output configurato:
tail -f /Users/foo/todos-cdc.txt
Pulizia
Kafka Connect crea argomenti di Hub eventi per archiviare configurazioni, offset e stato che vengono mantenuti anche dopo l'arresto del cluster Kafka Connect. A meno che non si desideri questa persistenza, è consigliabile eliminare questi argomenti. È anche possibile eliminare l'hub my-server.public.todos
eventi creato durante questa procedura dettagliata.
Passaggi successivi
Per altre informazioni su Hub eventi di Azure per Kafka, vedere gli articoli seguenti: