Condividi tramite


Integrare il supporto di Apache Kafka Connect in Hub eventi di Azure con Debezium per Change Data Capture

Change Data Capture (CDC) è una tecnica usata per tenere traccia delle modifiche a livello di riga nelle tabelle di database in risposta alle operazioni di creazione, aggiornamento ed eliminazione. Debezium è una piattaforma distribuita basata sulle funzionalità change data capture disponibili in database diversi, ad esempio la decodifica logica in PostgreSQL. Fornisce un set di connettori Kafka Connect che sfruttano le modifiche a livello di riga nelle tabelle di database e le convertono in flussi di eventi che vengono quindi inviati ad Apache Kafka.

Questa esercitazione illustra come configurare un sistema di Change Data Capture basato su Azure usando Hub eventi (per Kafka), Database di Azure per PostgreSQL e Debezium. Usa il connettore Debezium PostgreSQL per trasmettere le modifiche del database da PostgreSQL ad argomenti Kafka in Hub eventi.

Nota

Questo articolo contiene riferimenti a un termine che Microsoft non usa più. Quando il termine verrà rimosso dal software, verrà rimosso anche dall'articolo.

In questa esercitazione vengono completati i passaggi seguenti:

  • Creare uno spazio dei nomi di Hub eventi
  • Configurare Database di Azure per PostgreSQL
  • Configurare ed eseguire Kafka Connect con il connettore Debezium PostgreSQL
  • Testare Change Data Capture
  • (Facoltativo) Utilizzare gli eventi dei dati delle modifiche con un FileStreamSink connettore

Prerequisiti

Per completare questa procedura dettagliata, è necessario:

Creare uno spazio dei nomi di Hub eventi

Per l'invio e la ricezione da qualsiasi servizio Hub eventi è richiesto uno spazio dei nomi di Hub eventi. Vedere Creazione di un hub eventi per istruzioni su come creare uno spazio dei nomi e un hub eventi. Ottenere la stringa di connessione di Hub eventi e il nome di dominio completo (FQDN) da usare successivamente. Per istruzioni, vedere Ottenere una stringa di connessione ad Hub eventi.

Configurare e configurare Database di Azure per PostgreSQL

Database di Azure per PostgreSQL è un servizio di database relazionale basato sulla versione community del motore di database PostgreSQL open source ed è disponibile in tre opzioni di distribuzione: server singolo, server flessibile e Cosmos DB per PostgreSQL. Seguire queste istruzioni per creare un server Database di Azure per PostgreSQL usando il portale di Azure.

Configurare ed eseguire Kafka Connect

Questa sezione contiene gli argomenti seguenti:

  • Installazione del connettore Debezium
  • Configurazione di Kafka Connect per Hub eventi
  • Avviare il cluster Kafka Connect con il connettore Debezium

Scaricare e configurare il connettore Debezium

Seguire le istruzioni più recenti nella documentazione di Debezium per scaricare e configurare il connettore.

Configurare Kafka Connect per Hub eventi

Quando si reindirizza la velocità effettiva di Kafka Connect da Kafka a Hub eventi, è necessaria una riconfigurazione minima. Il codice di esempio connect-distributed.properties seguente illustra come configurare Connect per autenticare e comunicare con l'endpoint Kafka in Hub eventi:

Importante

  • Debezium creerà automaticamente un argomento per tabella e una serie di argomenti sui metadati. L'argomento Kafka corrisponde a un'istanza di Hub eventi (hub eventi). Per i mapping di Apache Kafka Hub eventi di Azure, vedere Mapping concettuale di Kafka e Hub eventi.
  • Esistono limiti diversi per il numero di hub eventi in uno spazio dei nomi di Hub eventi a seconda del livello (Basic, Standard, Premium o Dedicato). Per questi limiti, vedere Quote.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Importante

Sostituire {YOUR.EVENTHUBS.CONNECTION.STRING} con la stringa di connessione per lo spazio dei nomi di Hub eventi. Per istruzioni su come ottenere la stringa di connessione, vedere Ottenere una stringa di connessione ad Hub eventi. Ecco un esempio di configurazione: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Eseguire Kafka Connect

In questo passaggio un ruolo di lavoro Kafka Connect viene avviato localmente in modalità distribuita, usando Hub eventi per gestire lo stato del cluster.

  1. Salvare il file connect-distributed.properties in locale. Assicurarsi di sostituire tutti i valori racchiusi tra parentesi graffe.
  2. Passare alla posizione della versione di Kafka nel computer.
  3. Eseguire ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties e attendere l'avvio del cluster.

Nota

Kafka Connect usa l'API Kafka AdminClient per creare automaticamente argomenti con le configurazioni consigliate, inclusa la compattazione. Un rapido controllo dello spazio dei nomi nel portale di Azure conforma che gli argomenti interni del ruolo di lavoro Connect sono stati creati automaticamente.

Gli argomenti interni di Kafka Connect devono usare la compattazione. Il team di Hub eventi non è responsabile della correzione di configurazioni non corrette se gli argomenti interni di Kafka Connect non sono configurati correttamente.

Configurare e avviare il connettore di origine Debezium PostgreSQL

Creare un file di configurazione (pg-source-connector.json) per il connettore di origine PostgreSQL: sostituire i valori in base all'istanza di PostgreSQL di Azure.

{
    "name": "todo-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
        "database.port": "5432",
        "database.user": "<replace with database user name>",
        "database.password": "<replace with database password>",
        "database.dbname": "postgres",
        "database.server.name": "my-server",
        "plugin.name": "wal2json",
        "table.whitelist": "public.todos"
    }
}

Suggerimento

database.server.name l'attributo è un nome logico che identifica e fornisce uno spazio dei nomi per il server o il cluster di database PostgreSQL specifico monitorato.

Per creare un'istanza del connettore, usare l'endpoint dell'API REST Kafka Connect:

curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors

Per controllare lo stato del connettore:

curl -s http://localhost:8083/connectors/todo-connector/status

Testare Change Data Capture

Per visualizzare Change Data Capture in azione, è necessario creare/aggiornare/eliminare record nel database PostgreSQL di Azure.

Per iniziare, connettersi al database PostgreSQL di Azure (l'esempio seguente usa psql).

psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

e.g. 

psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require

Creare una tabella e inserire record

CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));

INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');

Il connettore dovrebbe ora attivare l'azione e inviare eventi di dati delle modifiche a un argomento di Hub eventi con il nome my-server.public.todosseguente , presupponendo che il my-server valore per database.server.name e public.todos sia la tabella le cui modifiche vengono monitorate (in base alla table.whitelist configurazione).

Controllare l'argomento hub eventi

Si esaminerà ora il contenuto dell'argomento per assicurarsi che tutto funzioni come previsto. L'esempio seguente usa kafkacat, ma è anche possibile creare un consumer usando una delle opzioni elencate qui.

Creare un file denominato kafkacat.conf con il contenuto seguente:

metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>

Nota

Aggiornare metadata.broker.list gli attributi in sasl.password base alle kafkacat.conf informazioni di Hub eventi.

In un terminale diverso avviare un consumer:

export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos

kafkacat -b $BROKER -t $TOPIC -o beginning

Verranno visualizzati i payload JSON che rappresentano gli eventi dei dati delle modifiche generati in PostgreSQL in risposta alle righe aggiunte alla todos tabella. Ecco un frammento di payload:

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "description": "setup postgresql on azure",
            "todo_status": "complete"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "fulfillment",
            "ts_ms": 1593018069944,
            "snapshot": "last",
            "db": "postgres",
            "schema": "public",
            "table": "todos",
            "txId": 602,
            "lsn": 184579736,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1593018069947,
        "transaction": null
    }

L'evento è costituito da insieme al payload relativo schema (omesso per brevità). Nella payload sezione si noti che l'operazione di creazione ("op": "c") è rappresentata "before": null : significa che si tratta di una riga appena INSERTed, after fornisce i valori per le colonne nella riga, source fornisce i metadati dell'istanza di PostgreSQL da cui è stato prelevato l'evento e così via.

È possibile provare lo stesso con le operazioni di aggiornamento o eliminazione e osservare gli eventi dei dati delle modifiche. Ad esempio, per aggiornare lo stato dell'attività per configure and install connector (presupponendo che sia id3):

UPDATE todos SET todo_status = 'complete' WHERE id = 3;

(Facoltativo) Installare il connettore FileStreamSink

Ora che tutte le modifiche alla todos tabella vengono acquisite nell'argomento hub eventi, usare il connettore FileStreamSink (disponibile per impostazione predefinita in Kafka Connect) per utilizzare questi eventi.

Creare un file di configurazione (file-sink-connector.json) per il connettore: sostituire l'attributo in base al file file system.

{
    "name": "cdc-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-server.public.todos",
        "file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
    }
}

Per creare il connettore e controllarne lo stato:

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

curl http://localhost:8083/connectors/cdc-file-sink/status

Inserire/aggiornare/eliminare record di database e monitorare i record nel file sink di output configurato:

tail -f /Users/foo/todos-cdc.txt

Pulizia

Kafka Connect crea argomenti di Hub eventi per archiviare configurazioni, offset e stato che vengono mantenuti anche dopo l'arresto del cluster Kafka Connect. A meno che non si desideri questa persistenza, è consigliabile eliminare questi argomenti. È anche possibile eliminare l'hub my-server.public.todos eventi creato durante questa procedura dettagliata.

Passaggi successivi

Per altre informazioni su Hub eventi di Azure per Kafka, vedere gli articoli seguenti: