Integrate Apache Kafka Connect support on Azure Event Hubs
Article
Apache Kafka Connect is a framework to connect and import/export data from/to any external system such as MySQL, HDFS, and file system through a Kafka cluster. This article walks you through using Kafka Connect framework with Event Hubs.
This article walks you through integrating Kafka Connect with an event hub and deploying basic FileStreamSource and FileStreamSink connectors. While these connectors aren't meant for production use, they demonstrate an end-to-end Kafka Connect scenario where Azure Event Hubs acts as a Kafka broker.
An Event Hubs namespace is required to send and receive from any Event Hubs service. See Creating an event hub for instructions to create a namespace and an event hub. Get the Event Hubs connection string and fully qualified domain name (FQDN) for later use. For instructions, see Get an Event Hubs connection string.
Clone the example project
Clone the Azure Event Hubs repository and navigate to the tutorials/connect subfolder:
Bash
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Configure Kafka Connect for Event Hubs
Minimal reconfiguration is necessary when redirecting Kafka Connect throughput from Kafka to Event Hubs. The following connect-distributed.properties sample illustrates how to configure Connect to authenticate and communicate with the Kafka endpoint on Event Hubs:
properties
# e.g. namespace.servicebus.windows.net:9093bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093group.id=connect-cluster-group
# connect internal topic names, auto-created if not existsconfig.storage.topic=connect-cluster-configsoffset.storage.topic=connect-cluster-offsetsstatus.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storageconfig.storage.replication.factor=1offset.storage.replication.factor=1status.storage.replication.factor=1rest.advertised.host.name=connectoffset.flush.interval.ms=10000key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=false
# required EH Kafka security settingssecurity.protocol=SASL_SSLsasl.mechanism=PLAINsasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";producer.security.protocol=SASL_SSLproducer.sasl.mechanism=PLAINproducer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";consumer.security.protocol=SASL_SSLconsumer.sasl.mechanism=PLAINconsumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka releaseplugin.path={KAFKA.DIRECTORY}/libs
Important
Replace {YOUR.EVENTHUBS.CONNECTION.STRING} with the connection string for your Event Hubs namespace. For instructions on getting the connection string, see Get an Event Hubs connection string. Here's an example configuration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Run Kafka Connect
In this step, a Kafka Connect worker is started locally in distributed mode, using Event Hubs to maintain cluster state.
Save the connect-distributed.properties file locally. Be sure to replace all values in braces.
Navigate to the location of the Kafka release on your machine.
Run ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. The Connect worker REST API is ready for interaction when you see 'INFO Finished starting connectors and tasks'.
Note
Kafka Connect uses the Kafka AdminClient API to automatically create topics with recommended configurations, including compaction. A quick check of the namespace in the Azure portal reveals that the Connect worker's internal topics have been created automatically.
Kafka Connect internal topics must use compaction. The Event Hubs team isn't responsible for fixing improper configurations if internal Connect topics are incorrectly configured.
Create connectors
This section walks you through spinning up FileStreamSource and FileStreamSink connectors.
Create a directory for input and output data files.
Bash
mkdir ~/connect-quickstart
Create two files: one file with seed data from which the FileStreamSource connector reads, and another to which our FileStreamSink connector writes.
Verify that data has been replicated between files and that the data is identical across both files.
Bash
# read the file
cat ~/connect-quickstart/output.txt
# diff the input and output files
diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Cleanup
Kafka Connect creates Event Hubs topics to store configurations, offsets, and status that persist even after the Connect cluster has been taken down. Unless this persistence is desired, we recommend that you delete these topics. You might also want to delete the connect-quickstart Event Hubs that were created during this walkthrough.
Related content
To learn more about Event Hubs for Kafka, see the following articles: