Megosztás a következőn keresztül:


Az Apache Kafka Connect-támogatás és az Azure Event Hubs integrálása

Az Apache Kafka Connect egy keretrendszer, amellyel adatokat importálhat és exportálhat bármely külső rendszerből, például a MySQL-ből, a HDFS-ből és a fájlrendszerből egy Kafka-fürtön keresztül. Ez a cikk végigvezeti a Kafka Connect keretrendszer és az Event Hubs használatával.

Ez a cikk végigvezeti a Kafka Connect eseményközpontba való integrálásán, valamint az alapszintű FileStreamSource és FileStreamSink összekötők üzembe helyezésén. Bár ezek az összekötők nem éles használatra vannak szánva, egy teljes körű Kafka Connect-forgatókönyvet mutatnak be, amelyben az Azure Event Hubs Kafka-közvetítőként működik.

Feljegyzés

Ez a minta elérhető a GitHubon.

Előfeltételek

A bemutató elvégzéséhez győződjön meg arról, hogy rendelkezik az alábbiakkal:

Event Hubs-névtér létrehozása

Az Event Hubs-szolgáltatásokból való küldéshez és fogadáshoz szükség van egy Event Hubs-névtérre. A névtér és az eseményközpont létrehozásának utasításait az eseményközpont létrehozása című témakörben találja. Szerezze be az Event Hubs kapcsolati sztringjét és teljes tartománynevét (FQDN) későbbi használatra. Útmutatásért lásd az Event Hubs kapcsolati sztring lekérésével foglalkozó témakört.

A példaprojekt klónozása

Klónozza az Azure Event Hubs-adattárat, és keresse meg a tutorials/connect almappát:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

A Kafka Connect konfigurálása az Event Hubshoz

A Kafka Connect teljesítményének a Kafkából az Event Hubsba való átirányításához minimális újrakonfigurálásra van szükség. Az alábbi connect-distributed.properties példa bemutatja, hogyan konfigurálhatja a Connectet úgy, hogy az Event Hubs Kafka-végpontjával hitelesítsen és kommunikáljon:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs 

Fontos

Cserélje le {YOUR.EVENTHUBS.CONNECTION.STRING} az Event Hubs-névtér kapcsolati sztring. A kapcsolati sztring beszerzésére vonatkozó útmutatásért lásd: Event Hubs-kapcsolati sztring beszerzése. Íme egy példakonfiguráció: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

A Kafka Connect futtatása

Ebben a lépésben helyileg el fog indítani egy Kafka Connect-feldolgozót elosztott módban, és az Event Hubsot fogja használni a fürtállapot fenntartásához.

  1. Mentse a connect-distributed.properties fájlt helyileg. Ne felejtse el lecserélni a zárójelbe foglalt értékeket.
  2. Keresse meg a Kafka-példány helyét a számítógépén.
  3. Futtassa az ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties parancsot. A Connect-feldolgozó REST API akkor áll készen az interakcióra, amikor meglátja az 'INFO Finished starting connectors and tasks' szöveget.

Feljegyzés

A Kafka Connect a Kafka AdminClient API használatával automatikusan létrehoz olyan témaköröket, amelyek az ajánlott konfigurációkat tartalmazzák, beleértve a tömörítést is. A névtérből az Azure Portalon gyorsan ki lehet deríteni, hogy a Connect-feldolgozó belső témakörei automatikusan jöttek létre.

A Kafka Connect belső témakörei tömörítést használnak. Az Event Hubs csapata nem felelős a helytelen konfigurációk javításáért, ha a belső kapcsolódási témakörök helytelenül vannak konfigurálva.

Összekötők létrehozása

Ez a szakasz végigvezeti a pörgetésen FileStreamSource és FileStreamSink az összekötőken.

  1. Hozzon létre egy könyvtárat a bemeneti és kimeneti adatfájloknak.

    mkdir ~/connect-quickstart
    
  2. Hozzon létre két fájlt: egy fájlt olyan magadatokkal, amelyekből az FileStreamSource összekötő olvas, a másikat, amelyre FileStreamSink az összekötő ír.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Hozzon létre egy összekötőt FileStreamSource . Ne felejtse el lecserélni a kapcsos zárójelbe zárt kifejezést a kezdőkönyvtár elérési útjára.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    A parancs futtatása után meg kell jelennie az eseményközpontnak connect-quickstart az Event Hubs-példányon.

  4. Ellenőrizze a forrásösszekötő állapotát.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Igény szerint a Service Bus Explorerrel ellenőrizheti, hogy az események megérkeztek-e a connect-quickstart témakörbe.

  5. Hozzon létre egy FileStreamSink összekötőt. Most se felejtse el lecserélni a kapcsos zárójelbe zárt kifejezést a kezdőkönyvtár elérési útjára.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Ellenőrizze a fogadó összekötő állapotát.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Ellenőrizze, hogy az adatok replikálva lettek-e a fájlok között, és hogy a két fájl adatai azonosak-e.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Felesleges tartalmak törlése

A Kafka Connect Event Hubs-témaköröket hoz létre a Connect-fürt levétele után is megmaradó konfigurációk, eltolások és állapotok tárolására. Ha nem kívánja ezt az adatmegőrzést, javasoljuk, hogy törölje ezeket a témaköröket. Az útmutató során létrehozott Event Hubs-központokat is érdemes törölni connect-quickstart .

A Kafkához készült Event Hubsról az alábbi cikkekben olvashat bővebben: