Integrieren der Apache Kafka Connect-Unterstützung in Azure Event Hubs
Artikel
Apache Kafka Connect ist ein Framework zum Verbinden und Importieren/Exportieren von Daten aus einem beliebigen externen System bzw. in ein beliebiges externes System wie MySQL, HDFS und ein Dateisystem über einen Kafka-Cluster. In diesem Artikel wird die Nutzung eines Kafka Connect-Frameworks mit Event Hubs Schritt für Schritt beschrieben.
Dieser Artikel führt Sie schrittweise durch die Vorgehensweise zum Integrieren von Kafka Connect in einen Event Hub und zum Bereitstellen von einfachen FileStreamSource- und FileStreamSink-Connectors. Die Connectors sind zwar nicht für die Verwendung in der Produktion bestimmt, aber sie stellen ein End-to-End-Szenario für Kafka Connect dar, bei dem Azure Event Hubs als Kafka-Broker fungiert.
Ein Event Hubs-Namespace ist erforderlich, um Nachrichten an einen Event Hubs-Dienst zu senden und von diesem zu empfangen. Anweisungen zum Erstellen eines Namespace und eines Event Hub finden Sie unter Erstellen eines Event Hubs. Rufen Sie die Event Hubs-Verbindungszeichenfolge und den vollqualifizierten Domänennamen (Fully Qualified Domain Name, FQDN) zur späteren Verwendung ab. Anweisungen hierzu finden Sie unter Get an Event Hubs connection string (Abrufen einer Event Hubs-Verbindungszeichenfolge).
Klonen des Beispielprojekts
Klonen Sie das Azure Event Hubs-Repository, und navigieren Sie zum Unterordner „tutorials/connect“:
Bash
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Konfigurieren von Kafka Connect für Event Hubs
Es ist nur eine minimale Neukonfiguration erforderlich, wenn Sie den Kafka Connect-Durchsatz von Kafka an Event Hubs umleiten. Im folgenden Beispiel connect-distributed.properties ist dargestellt, wie Sie Connect konfigurieren, um die Authentifizierung und Kommunikation mit dem Kafka-Endpunkt unter Event Hubs einzurichten:
properties
# e.g. namespace.servicebus.windows.net:9093bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093group.id=connect-cluster-group
# connect internal topic names, auto-created if not existsconfig.storage.topic=connect-cluster-configsoffset.storage.topic=connect-cluster-offsetsstatus.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storageconfig.storage.replication.factor=1offset.storage.replication.factor=1status.storage.replication.factor=1rest.advertised.host.name=connectoffset.flush.interval.ms=10000key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=false
# required EH Kafka security settingssecurity.protocol=SASL_SSLsasl.mechanism=PLAINsasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";producer.security.protocol=SASL_SSLproducer.sasl.mechanism=PLAINproducer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";consumer.security.protocol=SASL_SSLconsumer.sasl.mechanism=PLAINconsumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka releaseplugin.path={KAFKA.DIRECTORY}/libs
Wichtig
Ersetzen Sie {YOUR.EVENTHUBS.CONNECTION.STRING} durch die Verbindungszeichenfolge für Ihren Event Hubs-Namespace. Anweisungen zum Abrufen der Verbindungszeichenfolge finden Sie unter Abrufen einer Event Hubs-Verbindungszeichenfolge. Hier sehen Sie eine Beispielkonfiguration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Ausführen von Kafka Connect
In diesem Schritt wird ein Kafka Connect-Worker lokal im verteilten Modus gestartet, indem Event Hubs zum Warten des Clusterzustands verwendet wird.
Speichern Sie die Datei connect-distributed.properties lokal. Achten Sie darauf, alle Werte in geschweiften Klammern zu ersetzen.
Navigieren Sie zum Speicherort des Kafka-Release auf Ihrem Computer.
Führen Sie ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties aus. Die Connect-Worker-REST-API ist für die Interaktion bereit, wenn 'INFO Finished starting connectors and tasks' angezeigt wird.
Hinweis
Kafka Connect verwendet die Kafka-AdminClient-API, um automatisch Themen mit empfohlenen Konfigurationen, einschließlich Komprimierung, zu erstellen. Eine schnelle Überprüfung des Namespace im Azure-Portal ergibt, dass die internen Themen des Connect-Workers automatisch erstellt wurden.
Interne Kafka Connect-Themen müssen Komprimierung verwenden. Das Event Hubs-Team ist nicht für die Korrektur falscher Konfigurationen zuständig, sollten interne Connect-Themen nicht ordnungsgemäß konfiguriert sein.
Erstellen von Connectors
In diesem Abschnitt wird das Hochfahren von FileStreamSource- und FileStreamSink-Connectors erläutert.
Erstellen Sie ein Verzeichnis für Ein- und Ausgabedatendateien.
Bash
mkdir ~/connect-quickstart
Erstellen Sie zwei Dateien: eine Datei mit Startdaten, aus der der FileStreamSource-Connector liest, und eine weitere Datei, in die der FileStreamSink-Connector schreibt.
Erstellen Sie einen FileStreamSource-Connector. Achten Sie darauf, dass Sie den Inhalt der geschweiften Klammern durch den Pfad Ihres Basisverzeichnisses ersetzen.
Optional können Sie Service Bus Explorer nutzen, um zu überprüfen, ob Ereignisse im Thema connect-quickstart eingegangen sind.
Erstellen Sie einen FileStreamSink-Connector. Achten Sie auch hier wieder darauf, dass Sie den Inhalt der geschweiften Klammern durch den Pfad Ihres Basisverzeichnisses ersetzen.
Stellen Sie sicher, dass die Daten zwischen den Dateien repliziert wurden und in beiden Dateien identisch sind.
Bash
# read the file
cat ~/connect-quickstart/output.txt
# diff the input and output files
diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Cleanup
Mit Kafka Connect werden Event Hub-Themen zum Speichern von Konfigurationen, Offsets und des Status erstellt, die auch dann beibehalten werden, nachdem der Connect-Cluster heruntergefahren wurde. Sofern diese Persistenz nicht gewünscht ist, empfehlen wir, diese Themen zu löschen. Sie können auch die Event Hubs-Instanzen vom Typ connect-quickstart löschen, die in dieser exemplarischen Vorgehensweise erstellt wurden.
Zugehöriger Inhalt
Weitere Informationen zu Event Hubs für Kafka finden Sie in folgenden Artikeln:
Hier erfahren Sie, was Apache Kafka in Azure Event Hubs ist und wie Sie damit Daten aus Apache Kafka-Anwendungen streamen, ohne selbst einen Kafka-Cluster einzurichten.