Ondersteuning voor Apache Kafka Connect integreren in Azure Event Hubs
Apache Kafka Connect is een framework voor het verbinden en importeren/exporteren van gegevens van/naar elk extern systeem, zoals MySQL, HDFS en bestandssysteem via een Kafka-cluster. In dit artikel wordt u begeleid bij het gebruik van kafka Connect-framework met Event Hubs.
Dit artikel begeleidt u bij het integreren van Kafka Connect met een Event Hub en het implementeren van basis FileStreamSource
- en FileStreamSink
connectors. Hoewel deze connectors niet bedoeld zijn voor productiegebruik, laten ze een end-to-end Kafka Connect-scenario zien waarin Azure Event Hubs fungeert als een Kafka-broker.
Notitie
Dit voorbeeld is beschikbaar op GitHub.
Vereisten
Zorg ervoor dat u aan de volgende vereisten voldoet om deze stappen uit te voeren:
- Azure-abonnement. Als u nog geen account hebt, kunt u een gratis account maken.
- Git
- Linux/Mac OS
- Nieuwste Kafka-release beschikbaar via kafka.apache.org
- Lees het inleidende artikel Event Hubs voor Apache Kafka door
Een Event Hubs-naamruimte maken
Er is een Event Hubs-naamruimte vereist om gegevens te verzenden naar en te ontvangen van Event Hubs-services. Zie Een Event Hub maken voor instructies voor het maken van een naamruimte en een Event Hub. Haal de Event Hubs-verbindingsreeks en de Fully Qualified Domain Name (FQDN) op voor later gebruik. Zie Get an Event Hubs connection string. (Een Event Hubs-verbindingsreeks ophalen).
Het voorbeeldproject klonen
Kloon de Azure Event Hubs-opslagplaats en ga naar de submap tutorials/connect:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Kafka Connect configureren voor Event Hubs
Er is een minimale herconfiguratie vereist als u doorvoer van Kafka Connect wilt omleiden van Kafka naar Event Hubs. In het volgende connect-distributed.properties
-voorbeeld wordt getoond hoe u Connect kunt configureren om op Event Hubs het Kafka-eindpunt te verifiëren en hoe ermee te communiceren:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
Belangrijk
Vervang {YOUR.EVENTHUBS.CONNECTION.STRING}
door de verbindingsreeks voor uw Event Hubs-naamruimte. Zie Een verbindingsreeks voor Event Hubs ophalen voor instructies voor het ophalen van de verbindingsreeks. Hier volgt een voorbeeldconfiguratie: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Kafka Connect uitvoeren
In deze stap wordt een Kafka Connect-werkrol lokaal wordt gestart in gedistribueerde modus, waarbij Event Hubs wordt gebruikt om de clusterstatus te handhaven.
- Sla het
connect-distributed.properties
bestand lokaal op. Vervang alle waarden tussen accolades. - Ga naar de locatie van de Kafka-release op uw computer.
- Voer
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
uit. De REST API van de Connect-werkrol is klaar voor interactie als u'INFO Finished starting connectors and tasks'
ziet.
Notitie
Kafka Connect maakt gebruik van de Kafka AdminClient-API om automatisch onderwerpen te maken met aanbevolen configuraties, waaronder compressie. Een snelle controle van de naamruimte in de Azure-portal laat zien dat de interne onderwerpen van de Connect-werkrol automatisch zijn gemaakt.
Interne onderwerpen van Kafka Connect moeten gebruikmaken van compressie. Het Event Hubs-team is niet verantwoordelijk voor het oplossen van onjuiste configuraties als interne Connect-onderwerpen onjuist zijn geconfigureerd.
Connectors maken
In deze sectie wordt uitgelegd hoe u connectors kunt maken FileStreamSource
en FileStreamSink
connectors kunt maken.
Maak een map voor gegevensbestanden voor in- en uitvoer.
mkdir ~/connect-quickstart
Maak twee bestanden: één bestand met seed-gegevens waaruit de
FileStreamSource
connector leest, en een ander bestand waarnaar onzeFileStreamSink
connector schrijft.seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Maak een
FileStreamSource
connector. Vervang de accolades door het pad naar uw basismap.curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
U ziet nu de Event Hub
connect-quickstart
in uw Event Hubs-exemplaar nadat u de opdracht hebt uitgevoerd.Controleer de status van de bronconnector.
curl -s http://localhost:8083/connectors/file-source/status
U kunt Service Bus Explorer desgewenst gebruiken om te controleren of er gebeurtenissen zijn aangekomen in het
connect-quickstart
onderwerp.Maak een FileStreamSink-connector. Vervang ook hier de accolades door het pad naar uw basismap.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Controleer de status van de sinkconnector.
curl -s http://localhost:8083/connectors/file-sink/status
Verifieer of de gegevens van de bestanden zijn gerepliceerd en of de gegevens in beide bestanden identiek zijn.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Opschonen
Kafka Connect maakt Event Hubs-onderwerpen voor het opslaan van configuraties, offsets en statussen die behouden blijven, zelfs nadat het Connect-cluster is uitgeschakeld. Tenzij deze persistentie gewenst is, raden we u aan deze onderwerpen te verwijderen. Mogelijk wilt u ook de connect-quickstart
Event Hubs verwijderen die tijdens dit scenario zijn gemaakt.
Gerelateerde inhoud
Zie de volgende artikelen voor meer informatie over Event Hubs voor Kafka:
- Ontwikkelaarshandleiding voor Apache Kafka voor Azure Event Hubs
- Explore samples on our GitHub (Voorbeelden bekijken op GitHub)