Integrera stöd för Apache Kafka Connect i Azure Event Hubs
Apache Kafka Connect är ett ramverk för att ansluta och importera/exportera data från/till ett externt system som MySQL, HDFS och filsystem via ett Kafka-kluster. Den här artikeln beskriver hur du använder Kafka Connect-ramverket med Event Hubs.
Den här artikeln beskriver hur du integrerar Kafka Connect med en händelsehubb och distribuerar grundläggande FileStreamSource
anslutningar och FileStreamSink
anslutningsappar. Även om dessa anslutningsappar inte är avsedda för produktionsanvändning, visar de ett Kafka Connect-scenario från slutpunkt till slutpunkt där Azure Event Hubs fungerar som en Kafka-mäklare.
Kommentar
Det här exemplet finns på GitHub.
Förutsättningar
För att kunna slutföra den här genomgången behöver du följande:
- En Azure-prenumeration. Om du inte har en skapar du ett kostnadsfritt konto.
- Git
- Linux/MacOS
- Senaste Kafka-versionen tillgänglig från kafka.apache.org
- Läs introduktionsartikeln Event Hubs för Apache Kafka
Skapa ett Event Hubs-namnområde
En Event Hubs-namnrymd krävs för att skicka och ta emot från Event Hubs-tjänster. Se Skapa en händelsehubb för instruktioner för att skapa ett namnområde och en händelsehubb. Hämta Event Hubs-anslutningssträngen och fullständigt domännamn (FQDN) för senare användning. Anvisningar finns i avsnittet om att hämta en Event Hubs-anslutningssträng.
Klona exempelprojektet
Klona Azure Event Hubs-lagringsplatsen och navigera till undermappen tutorials/connect:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Konfigurera Kafka Connect för Event Hubs
Minimal omkonfiguration krävs vid omdirigering av Kafka Connect-dataflöde från Kafka till Event Hubs. Följande exempel med connect-distributed.properties
illustrerar hur du konfigurerar Connect för att autentisera och kommunicera med Kafka-slutpunkten i Event Hubs:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
Viktigt!
Ersätt {YOUR.EVENTHUBS.CONNECTION.STRING}
med niska veze för Event Hubs-namnområdet. Anvisningar om hur du hämtar niska veze finns i Hämta en händelsehubb niska veze. Här är ett exempel på konfiguration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Köra Kafka Connect
I det här steget startas en Kafka Connect-arbetare lokalt i distribuerat läge med hjälp av Event Hubs för att upprätthålla klustertillståndet.
connect-distributed.properties
Spara filen lokalt. Se till att ersätta alla värden inom klammerparenteser.- Navigera till platsen för Kafka-versionen på datorn.
- Kör
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
. REST API för Connect-arbetaren är redo för interaktion när du ser'INFO Finished starting connectors and tasks'
.
Kommentar
Kafka Connect använder Kafka AdminClient API för att automatiskt skapa ämnen med rekommenderade konfigurationer, inklusive komprimering. En snabb kontroll av namnrymden i Azure-portalen visar att Connect-arbetarens interna ämnen har skapats automatiskt.
Kafka Connect interna ämnen måste använda komprimering. Event Hubs-teamet ansvarar inte för att åtgärda felaktiga konfigurationer om interna Connect-ämnen är felaktigt konfigurerade.
Skapa anslutningsappar
Det här avsnittet beskriver hur du snurrar upp FileStreamSource
och FileStreamSink
ansluter.
Skapa en katalog för indata- och utdatafiler.
mkdir ~/connect-quickstart
Skapa två filer: en fil med startdata som anslutningsappen läser från och en annan som anslutningsappen
FileStreamSource
FileStreamSink
skriver till.seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Skapa en
FileStreamSource
anslutningsapp. Se till att ersätta klamrarna med din startkatalogsökväg.curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
Du bör se händelsehubben
connect-quickstart
på din Event Hubs-instans när du har kört kommandot.Kontrollera statusen för källanslutningsappen.
curl -s http://localhost:8083/connectors/file-source/status
Du kan också använda Service Bus Explorer för att verifiera att händelser har kommit till ämnet
connect-quickstart
.Skapa en FileStreamSink-anslutningsapp. Se även denna gång till att ersätta klamrarna med din startkatalogsökväg.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Kontrollera statusen för kanalmottagarens anslutningsapp.
curl -s http://localhost:8083/connectors/file-sink/status
Kontrollera att data har replikerats mellan filer och att data är identiska i båda filerna.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Rensa
Kafka Connect skapar Event Hubs-ämnen för att lagra konfigurationer, förskjutningar och status som bevaras även efter att Connect-klustret har tagits bort. Om du inte vill ha den här beständigheten rekommenderar vi att du tar bort de här ämnena. Du kanske också vill ta bort de connect-quickstart
händelsehubbar som skapades under den här genomgången.
Relaterat innehåll
Mer information om Event Hubs för Kafka finns i följande artiklar: