Ondersteuning voor Apache Kafka Connect integreren in Azure Event Hubs

Apache Kafka Verbinding maken is een framework voor het verbinden en importeren/exporteren van gegevens van/naar elk extern systeem, zoals MySQL, HDFS en bestandssysteem via een Kafka-cluster. In deze zelfstudie wordt u begeleid bij het gebruik van kafka Verbinding maken framework met Event Hubs.

Deze zelfstudie begeleidt u bij het integreren van Kafka-Verbinding maken met een Event Hub en het implementeren van eenvoudige FileStreamSource- en FileStreamSink-connectors. Hoewel deze connectors niet bedoeld zijn voor productiegebruik, demonstreren ze een end-to-end Kafka-Verbinding maken scenario waarin Azure Event Hubs fungeert als een Kafka-broker.

Notitie

Dit voorbeeld is beschikbaar op GitHub.

In deze zelfstudie voert u de volgende stappen uit:

  • Een Event Hubs-naamruimte maken
  • Het voorbeeldproject klonen
  • Kafka Connect configureren voor Event Hubs
  • Kafka Connect uitvoeren
  • Connectors maken

Vereisten

Zorg ervoor dat u aan de volgende vereisten voldoet om deze stappen uit te voeren:

Een Event Hubs-naamruimte maken

Er is een Event Hubs-naamruimte vereist om gegevens te verzenden naar en te ontvangen van Event Hubs-services. Zie Een Event Hub maken voor instructies voor het maken van een naamruimte en een Event Hub. Haal de Event Hubs-verbindingsreeks en de Fully Qualified Domain Name (FQDN) op voor later gebruik. Zie Get an Event Hubs connection string. (Een Event Hubs-verbindingsreeks ophalen).

Het voorbeeldproject klonen

Kloon de Azure Event Hubs-opslagplaats en ga naar de submap tutorials/connect:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Kafka Connect configureren voor Event Hubs

Er is een minimale herconfiguratie vereist als u doorvoer van Kafka Connect wilt omleiden van Kafka naar Event Hubs. In het volgende connect-distributed.properties-voorbeeld wordt getoond hoe u Connect kunt configureren om op Event Hubs het Kafka-eindpunt te verifiëren en hoe ermee te communiceren:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Belangrijk

Vervang {YOUR.EVENTHUBS.CONNECTION.STRING} door de verbindingsreeks voor uw Event Hubs-naamruimte. Zie Een verbindingsreeks voor Event Hubs ophalen voor instructies voor het ophalen van de verbindingsreeks. Hier volgt een voorbeeldconfiguratie: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Kafka Connect uitvoeren

In deze stap wordt een Kafka Connect-werkrol lokaal wordt gestart in gedistribueerde modus, waarbij Event Hubs wordt gebruikt om de clusterstatus te handhaven.

  1. Sla het bovenstaande connect-distributed.properties-bestand lokaal op. Vervang alle waarden tussen accolades.
  2. Ga naar de locatie van de Kafka-release op uw computer.
  3. Voer ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties uit. De REST API van de Connect-werkrol is klaar voor interactie als u 'INFO Finished starting connectors and tasks' ziet.

Notitie

Kafka Verbinding maken maakt gebruik van de Kafka Beheer Client-API om automatisch onderwerpen te maken met aanbevolen configuraties, waaronder compressie. Een snelle controle van de naamruimte in de Azure-portal laat zien dat de interne onderwerpen van de Connect-werkrol automatisch zijn gemaakt.

Kafka Verbinding maken interne onderwerpen moeten compressie gebruiken. Het Event Hubs-team is niet verantwoordelijk voor het oplossen van onjuiste configuraties als interne Verbinding maken onderwerpen onjuist zijn geconfigureerd.

Connectors maken

In deze sectie wordt u begeleidt bij het starten van de FileStreamSource- en FileStreamSink-connectors.

  1. Maak een map voor gegevensbestanden voor in- en uitvoer.

    mkdir ~/connect-quickstart
    
  2. Maak twee bestanden: een met seedgegevens die door de FileStreamSource-connector worden gelezen en een waarnaar de FileStreamSink-connector schrijft.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Maak een FileStreamSource-connector. Vervang de accolades door het pad naar uw basismap.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    U ziet de Event Hub connect-quickstart in uw Event Hubs-exemplaar nadat u de bovenstaande opdracht hebt uitgevoerd.

  4. Controleer de status van de bronconnector.

    curl -s http://localhost:8083/connectors/file-source/status
    

    U kunt eventueel Service Bus Explorer gebruiken om te verifiëren of er gebeurtenissen in het connect-quickstart-onderwerp zijn aangekomen.

  5. Maak een FileStreamSink-connector. Vervang ook hier de accolades door het pad naar uw basismap.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Controleer de status van de sinkconnector.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Verifieer of de gegevens van de bestanden zijn gerepliceerd en of de gegevens in beide bestanden identiek zijn.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Opschonen

Kafka Verbinding maken Event Hubs-onderwerpen maakt voor het opslaan van configuraties, offsets en statussen die behouden blijven, zelfs nadat het Verbinding maken cluster is uitgeschakeld. Tenzij deze persistentie gewenst is, wordt aanbevolen dat deze onderwerpen worden verwijderd. Mogelijk wilt u ook de connect-quickstart Event Hubs verwijderen die tijdens deze procedure zijn gemaakt.

Volgende stappen

Zie de volgende artikelen voor meer informatie over Event Hubs voor Kafka: