Condividi tramite


Integrare il supporto di Apache Kafka Connect in Hub eventi di Azure

Apache Kafka Connect è un framework che consente di connettere e importare/esportare dati da/a qualsiasi sistema esterno come MySQL, HDFS e il file system tramite un cluster Kafka. Questo articolo illustra in dettaglio come usare il framework di Kafka Connect con Hub eventi.

L’articolo mostra come integrare Kafka Connect con un hub eventi e come distribuire i connettori di base FileStreamSource e FileStreamSink. Questi connettori non sono destinati all'uso in ambiente di produzione, ma servono semplicemente a dimostrare uno scenario completo di Kafka Connect in cui il servizio Hub eventi di Azure funge da broker Kafka.

Nota

Questo esempio è disponibile in GitHub.

Prerequisiti

Per completare questa procedura dettagliata, verificare di disporre dei prerequisiti seguenti:

Creare uno spazio dei nomi di Hub eventi

Per l'invio e la ricezione da qualsiasi servizio Hub eventi è richiesto uno spazio dei nomi di Hub eventi. Vedere Creazione di un hub eventi per istruzioni su come creare uno spazio dei nomi e un hub eventi. Ottenere la stringa di connessione di Hub eventi e il nome di dominio completo (FQDN) da usare successivamente. Per istruzioni, vedere Ottenere una stringa di connessione ad Hub eventi.

Clonare il progetto di esempio

Clonare il repository di Hub eventi di Azure e passare alla sottocartella tutorials/connect:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Configurare Kafka Connect per Hub eventi

Quando si reindirizza la velocità effettiva di Kafka Connect da Kafka a Hub eventi, è necessaria una riconfigurazione minima. Il codice di esempio connect-distributed.properties seguente illustra come configurare Connect per autenticare e comunicare con l'endpoint Kafka in Hub eventi:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs 

Importante

Sostituire {YOUR.EVENTHUBS.CONNECTION.STRING} con la stringa di connessione per lo spazio dei nomi di Hub eventi. Per istruzioni su come ottenere la stringa di connessione, vedere Ottenere una stringa di connessione ad Hub eventi. Ecco un esempio di configurazione: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Eseguire Kafka Connect

In questo passaggio un ruolo di lavoro Kafka Connect viene avviato localmente in modalità distribuita, usando Hub eventi per gestire lo stato del cluster.

  1. Salvare il file in locale connect-distributed.properties. Assicurarsi di sostituire tutti i valori racchiusi tra parentesi graffe.
  2. Passare alla posizione della versione di Kafka nel computer.
  3. Eseguire ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. Quando viene visualizzato l'avviso 'INFO Finished starting connectors and tasks', l'API REST del ruolo di lavoro Connect è pronta per l'interazione.

Nota

Kafka Connect usa l'API Kafka AdminClient per creare automaticamente argomenti con le configurazioni consigliate, inclusa la compattazione. Un rapido controllo dello spazio dei nomi nel portale di Azure conforma che gli argomenti interni del ruolo di lavoro Connect sono stati creati automaticamente.

Gli argomenti interni di Kafka Connect devono usare la compattazione. Il team di Hub eventi non è responsabile della correzione di configurazioni non corrette se gli argomenti interni di Kafka Connect non sono configurati correttamente.

Creare i connettori

Questa sezione illustra in modo dettagliato i connettori FileStreamSource e FileStreamSink.

  1. Creare una directory per i file di dati di input e output.

    mkdir ~/connect-quickstart
    
  2. Creare due file: uno con i dati di inizializzazione che vengono letti dal connettore FileStreamSource e l'altro in cui il connettore FileStreamSink scrive.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Creare un connettore FileStreamSource. Assicurarsi di sostituire i valori nelle parentesi graffe con il percorso della home directory.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    Dopo l'esecuzione del comando si dovrebbe vedere il comando connect-quickstart dell'Hub eventi nell'istanza di Hub eventi.

  4. Controllare lo stato del connettore di origine.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Facoltativamente, è possibile usare Service Bus Explorer per verificare che gli eventi abbiamo raggiunto l'argomento connect-quickstart.

  5. Creare un connettore FileStreamSink. Anche in questo caso, assicurarsi di sostituire i valori nelle parentesi graffe con il percorso della home directory.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Controllare lo stato del connettore sink.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Verificare che i dati siano stati replicati tra i file e che siano identici in entrambi i file.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Pulizia

Kafka Connect crea argomenti di Hub eventi in cui archiviare le configurazioni, gli offset e lo stato che rimangono persistenti anche dopo che il cluster Connect viene disattivato. A meno che non si desideri questa persistenza, è consigliabile eliminare questi argomenti. Si potrebbe anche desiderare di eliminare il codice connect-quickstart di Hub eventi creato nel corso della procedura dettagliata.

Per altre informazioni su Hub eventi di Azure per Kafka, vedere gli articoli seguenti: