Share via


Ondersteuning voor Apache Kafka Connect integreren in Azure Event Hubs

Apache Kafka Connect is een framework voor het verbinden en importeren/exporteren van gegevens van/naar elk extern systeem, zoals MySQL, HDFS en bestandssysteem via een Kafka-cluster. In dit artikel wordt u begeleid bij het gebruik van kafka Connect-framework met Event Hubs.

Dit artikel begeleidt u bij het integreren van Kafka Connect met een Event Hub en het implementeren van basis FileStreamSource - en FileStreamSink connectors. Hoewel deze connectors niet bedoeld zijn voor productiegebruik, laten ze een end-to-end Kafka Connect-scenario zien waarin Azure Event Hubs fungeert als een Kafka-broker.

Notitie

Dit voorbeeld is beschikbaar op GitHub.

Vereisten

Zorg ervoor dat u aan de volgende vereisten voldoet om deze stappen uit te voeren:

Een Event Hubs-naamruimte maken

Er is een Event Hubs-naamruimte vereist om gegevens te verzenden naar en te ontvangen van Event Hubs-services. Zie Een Event Hub maken voor instructies voor het maken van een naamruimte en een Event Hub. Haal de Event Hubs-verbindingsreeks en de Fully Qualified Domain Name (FQDN) op voor later gebruik. Zie Get an Event Hubs connection string. (Een Event Hubs-verbindingsreeks ophalen).

Het voorbeeldproject klonen

Kloon de Azure Event Hubs-opslagplaats en ga naar de submap tutorials/connect:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Kafka Connect configureren voor Event Hubs

Er is een minimale herconfiguratie vereist als u doorvoer van Kafka Connect wilt omleiden van Kafka naar Event Hubs. In het volgende connect-distributed.properties-voorbeeld wordt getoond hoe u Connect kunt configureren om op Event Hubs het Kafka-eindpunt te verifiëren en hoe ermee te communiceren:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs 

Belangrijk

Vervang {YOUR.EVENTHUBS.CONNECTION.STRING} door de verbindingsreeks voor uw Event Hubs-naamruimte. Zie Een verbindingsreeks voor Event Hubs ophalen voor instructies voor het ophalen van de verbindingsreeks. Hier volgt een voorbeeldconfiguratie: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Kafka Connect uitvoeren

In deze stap wordt een Kafka Connect-werkrol lokaal wordt gestart in gedistribueerde modus, waarbij Event Hubs wordt gebruikt om de clusterstatus te handhaven.

  1. Sla het connect-distributed.properties bestand lokaal op. Vervang alle waarden tussen accolades.
  2. Ga naar de locatie van de Kafka-release op uw computer.
  3. Voer ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties uit. De REST API van de Connect-werkrol is klaar voor interactie als u 'INFO Finished starting connectors and tasks' ziet.

Notitie

Kafka Connect maakt gebruik van de Kafka AdminClient-API om automatisch onderwerpen te maken met aanbevolen configuraties, waaronder compressie. Een snelle controle van de naamruimte in de Azure-portal laat zien dat de interne onderwerpen van de Connect-werkrol automatisch zijn gemaakt.

Interne onderwerpen van Kafka Connect moeten gebruikmaken van compressie. Het Event Hubs-team is niet verantwoordelijk voor het oplossen van onjuiste configuraties als interne Connect-onderwerpen onjuist zijn geconfigureerd.

Connectors maken

In deze sectie wordt uitgelegd hoe u connectors kunt maken FileStreamSource en FileStreamSink connectors kunt maken.

  1. Maak een map voor gegevensbestanden voor in- en uitvoer.

    mkdir ~/connect-quickstart
    
  2. Maak twee bestanden: één bestand met seed-gegevens waaruit de FileStreamSource connector leest, en een ander bestand waarnaar onze FileStreamSink connector schrijft.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Maak een FileStreamSource connector. Vervang de accolades door het pad naar uw basismap.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    U ziet nu de Event Hub connect-quickstart in uw Event Hubs-exemplaar nadat u de opdracht hebt uitgevoerd.

  4. Controleer de status van de bronconnector.

    curl -s http://localhost:8083/connectors/file-source/status
    

    U kunt Service Bus Explorer desgewenst gebruiken om te controleren of er gebeurtenissen zijn aangekomen in het connect-quickstart onderwerp.

  5. Maak een FileStreamSink-connector. Vervang ook hier de accolades door het pad naar uw basismap.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Controleer de status van de sinkconnector.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Verifieer of de gegevens van de bestanden zijn gerepliceerd en of de gegevens in beide bestanden identiek zijn.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Opschonen

Kafka Connect maakt Event Hubs-onderwerpen voor het opslaan van configuraties, offsets en statussen die behouden blijven, zelfs nadat het Connect-cluster is uitgeschakeld. Tenzij deze persistentie gewenst is, raden we u aan deze onderwerpen te verwijderen. Mogelijk wilt u ook de connect-quickstart Event Hubs verwijderen die tijdens dit scenario zijn gemaakt.

Zie de volgende artikelen voor meer informatie over Event Hubs voor Kafka: