Az Apache Kafka Connect-támogatás és az Azure Event Hubs integrálása

Az Apache Kafka Csatlakozás egy keretrendszer, amellyel adatokat csatlakoztathat és importálhat/exportálhat bármely külső rendszerből, például a MySQL-ből, a HDFS-ből és a fájlrendszerből egy Kafka-fürtön keresztül. Ez az oktatóanyag végigvezeti a Kafka Csatlakozás-keretrendszer és az Event Hubs használatával.

Ez az oktatóanyag végigvezeti a Kafka Csatlakozás eseményközpontba való integrálásán, valamint az alapvető FileStreamSource- és FileStreamSink-összekötők üzembe helyezésén. Bár ezek az összekötők nem éles használatra vannak szánva, egy teljes körű Kafka-Csatlakozás forgatókönyvet mutatnak be, amelyben az Azure Event Hubs Kafka-közvetítőként működik.

Feljegyzés

Ez a minta elérhető a GitHubon.

Ebben az oktatóanyagban a következő lépéseket hajtja végre:

  • Event Hubs-névtér létrehozása
  • A példaprojekt klónozása
  • A Kafka Connect konfigurálása az Event Hubshoz
  • A Kafka Connect futtatása
  • Összekötők létrehozása

Előfeltételek

A bemutató elvégzéséhez győződjön meg arról, hogy rendelkezik az alábbiakkal:

Event Hubs-névtér létrehozása

Az Event Hubs-szolgáltatásokból való küldéshez és fogadáshoz szükség van egy Event Hubs-névtérre. A névtér és az eseményközpont létrehozásának utasításait az eseményközpont létrehozása című témakörben találja. Szerezze be az Event Hubs kapcsolati sztringjét és teljes tartománynevét (FQDN) későbbi használatra. Útmutatásért lásd az Event Hubs kapcsolati sztring lekérésével foglalkozó témakört.

A példaprojekt klónozása

Klónozza az Azure Event Hubs-adattárat, és keresse meg a tutorials/connect almappát:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

A Kafka Connect konfigurálása az Event Hubshoz

A Kafka Connect teljesítményének a Kafkából az Event Hubsba való átirányításához minimális újrakonfigurálásra van szükség. Az alábbi connect-distributed.properties példa bemutatja, hogyan konfigurálhatja a Connectet úgy, hogy az Event Hubs Kafka-végpontjával hitelesítsen és kommunikáljon:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Fontos

Cserélje le {YOUR.EVENTHUBS.CONNECTION.STRING} az Event Hubs-névtér kapcsolati sztring. A kapcsolati sztring beszerzésére vonatkozó útmutatásért lásd: Event Hubs-kapcsolati sztring beszerzése. Íme egy példakonfiguráció: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

A Kafka Connect futtatása

Ebben a lépésben helyileg el fog indítani egy Kafka Connect-feldolgozót elosztott módban, és az Event Hubsot fogja használni a fürtállapot fenntartásához.

  1. Mentse helyileg a fenti connect-distributed.properties fájlt. Ne felejtse el lecserélni a zárójelbe foglalt értékeket.
  2. Keresse meg a Kafka-példány helyét a számítógépén.
  3. Futtassa az ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties parancsot. A Connect-feldolgozó REST API akkor áll készen az interakcióra, amikor meglátja az 'INFO Finished starting connectors and tasks' szöveget.

Feljegyzés

A Kafka Csatlakozás a Kafka Rendszergazda Client API használatával automatikusan létrehoz olyan témaköröket, amelyek az ajánlott konfigurációkat tartalmazzák, beleértve a tömörítést is. A névtérből az Azure Portalon gyorsan ki lehet deríteni, hogy a Connect-feldolgozó belső témakörei automatikusan jöttek létre.

A Kafka Csatlakozás belső témaköröknek tömörítést kell használniuk. Az Event Hubs csapata nem felelős a helytelen konfigurációk javításáért, ha a belső Csatlakozás témakörök helytelenül vannak konfigurálva.

Összekötők létrehozása

Ez a szakasz végigvezeti a FileStreamSource és a FileStreamSink összekötő elindításán.

  1. Hozzon létre egy könyvtárat a bemeneti és kimeneti adatfájloknak.

    mkdir ~/connect-quickstart
    
  2. Hozzon létre két fájlt: egyet a kiindulási adatokkal, amelyből a FileStreamSource összekötő olvas, és egy másikat, amelybe a FileStreamSink összekötő ír.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Hozzon létre egy FileStreamSource összekötőt. Ne felejtse el lecserélni a kapcsos zárójelbe zárt kifejezést a kezdőkönyvtár elérési útjára.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    A fenti parancs futtatása után meg kell jelennie az eseményközpontnak connect-quickstart az Event Hubs-példányon.

  4. Ellenőrizze a forrásösszekötő állapotát.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Igény szerint a Service Bus Explorerrel is ellenőrizheti, hogy beérkeztek-e az események a connect-quickstart témakörbe.

  5. Hozzon létre egy FileStreamSink összekötőt. Most se felejtse el lecserélni a kapcsos zárójelbe zárt kifejezést a kezdőkönyvtár elérési útjára.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Ellenőrizze a fogadó összekötő állapotát.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Ellenőrizze, hogy az adatok replikálva lettek-e a fájlok között, és hogy a két fájl adatai azonosak-e.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Felesleges tartalmak törlése

A Kafka Csatlakozás Event Hubs-témaköröket hoz létre a Csatlakozás-fürt levétele után is megmaradó konfigurációk, eltolások és állapotok tárolására. Ha nem kívánja ezt az adatmegőrzést, javasoljuk, hogy törölje ezeket a témaköröket. Az útmutató során létrehozott Eseményközpontokat is törölheti connect-quickstart .

Következő lépések

A Kafkához készült Event Hubsról az alábbi cikkekben olvashat bővebben: