Az Apache Kafka Connect-támogatás és az Azure Event Hubs integrálása
Az Apache Kafka Connect egy keretrendszer, amellyel adatokat importálhat és exportálhat bármely külső rendszerből, például a MySQL-ből, a HDFS-ből és a fájlrendszerből egy Kafka-fürtön keresztül. Ez a cikk végigvezeti a Kafka Connect keretrendszer és az Event Hubs használatával.
Ez a cikk végigvezeti a Kafka Connect eseményközpontba való integrálásán, valamint az alapszintű FileStreamSource
és FileStreamSink
összekötők üzembe helyezésén. Bár ezek az összekötők nem éles használatra vannak szánva, egy teljes körű Kafka Connect-forgatókönyvet mutatnak be, amelyben az Azure Event Hubs Kafka-közvetítőként működik.
Feljegyzés
Ez a minta elérhető a GitHubon.
Előfeltételek
A bemutató elvégzéséhez győződjön meg arról, hogy rendelkezik az alábbiakkal:
- Egy Azure-előfizetés. Ha még nincs előfizetése, hozzon létre egy ingyenes fiókot.
- Git
- Linux/MacOS
- A legújabb Kafka-kiadás elérhető a kafka.apache.org
- Az Apache Kafkához készült Event Hubsot ismertető cikket is mindenképpen olvassa át.
Event Hubs-névtér létrehozása
Az Event Hubs-szolgáltatásokból való küldéshez és fogadáshoz szükség van egy Event Hubs-névtérre. A névtér és az eseményközpont létrehozásának utasításait az eseményközpont létrehozása című témakörben találja. Szerezze be az Event Hubs kapcsolati sztringjét és teljes tartománynevét (FQDN) későbbi használatra. Útmutatásért lásd az Event Hubs kapcsolati sztring lekérésével foglalkozó témakört.
A példaprojekt klónozása
Klónozza az Azure Event Hubs-adattárat, és keresse meg a tutorials/connect almappát:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
A Kafka Connect konfigurálása az Event Hubshoz
A Kafka Connect teljesítményének a Kafkából az Event Hubsba való átirányításához minimális újrakonfigurálásra van szükség. Az alábbi connect-distributed.properties
példa bemutatja, hogyan konfigurálhatja a Connectet úgy, hogy az Event Hubs Kafka-végpontjával hitelesítsen és kommunikáljon:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
Fontos
Cserélje le {YOUR.EVENTHUBS.CONNECTION.STRING}
az Event Hubs-névtér kapcsolati sztring. A kapcsolati sztring beszerzésére vonatkozó útmutatásért lásd: Event Hubs-kapcsolati sztring beszerzése. Íme egy példakonfiguráció: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
A Kafka Connect futtatása
Ebben a lépésben helyileg el fog indítani egy Kafka Connect-feldolgozót elosztott módban, és az Event Hubsot fogja használni a fürtállapot fenntartásához.
- Mentse a
connect-distributed.properties
fájlt helyileg. Ne felejtse el lecserélni a zárójelbe foglalt értékeket. - Keresse meg a Kafka-példány helyét a számítógépén.
- Futtassa az
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
parancsot. A Connect-feldolgozó REST API akkor áll készen az interakcióra, amikor meglátja az'INFO Finished starting connectors and tasks'
szöveget.
Feljegyzés
A Kafka Connect a Kafka AdminClient API használatával automatikusan létrehoz olyan témaköröket, amelyek az ajánlott konfigurációkat tartalmazzák, beleértve a tömörítést is. A névtérből az Azure Portalon gyorsan ki lehet deríteni, hogy a Connect-feldolgozó belső témakörei automatikusan jöttek létre.
A Kafka Connect belső témakörei tömörítést használnak. Az Event Hubs csapata nem felelős a helytelen konfigurációk javításáért, ha a belső kapcsolódási témakörök helytelenül vannak konfigurálva.
Összekötők létrehozása
Ez a szakasz végigvezeti a pörgetésen FileStreamSource
és FileStreamSink
az összekötőken.
Hozzon létre egy könyvtárat a bemeneti és kimeneti adatfájloknak.
mkdir ~/connect-quickstart
Hozzon létre két fájlt: egy fájlt olyan magadatokkal, amelyekből az
FileStreamSource
összekötő olvas, a másikat, amelyreFileStreamSink
az összekötő ír.seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Hozzon létre egy összekötőt
FileStreamSource
. Ne felejtse el lecserélni a kapcsos zárójelbe zárt kifejezést a kezdőkönyvtár elérési útjára.curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
A parancs futtatása után meg kell jelennie az eseményközpontnak
connect-quickstart
az Event Hubs-példányon.Ellenőrizze a forrásösszekötő állapotát.
curl -s http://localhost:8083/connectors/file-source/status
Igény szerint a Service Bus Explorerrel ellenőrizheti, hogy az események megérkeztek-e a
connect-quickstart
témakörbe.Hozzon létre egy FileStreamSink összekötőt. Most se felejtse el lecserélni a kapcsos zárójelbe zárt kifejezést a kezdőkönyvtár elérési útjára.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Ellenőrizze a fogadó összekötő állapotát.
curl -s http://localhost:8083/connectors/file-sink/status
Ellenőrizze, hogy az adatok replikálva lettek-e a fájlok között, és hogy a két fájl adatai azonosak-e.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Felesleges tartalmak törlése
A Kafka Connect Event Hubs-témaköröket hoz létre a Connect-fürt levétele után is megmaradó konfigurációk, eltolások és állapotok tárolására. Ha nem kívánja ezt az adatmegőrzést, javasoljuk, hogy törölje ezeket a témaköröket. Az útmutató során létrehozott Event Hubs-központokat is érdemes törölni connect-quickstart
.
Kapcsolódó tartalom
A Kafkához készült Event Hubsról az alábbi cikkekben olvashat bővebben: