Az Apache Kafka használata HDInsighton az Azure IoT Hubbal
Megtudhatja, hogyan helyezhet át adatokat az Apache Kafka a HDInsighton és az Azure IoT Hubon az Apache Kafka Csatlakozás Azure IoT Hub-összekötő használatával. Ebben a dokumentumban megtudhatja, hogyan futtathatja az IoT Hub-összekötőt a fürt peremcsomópontjáról.
A Kafka Csatlakozás API lehetővé teszi olyan összekötők implementálását, amelyek folyamatosan adatokat kérnek le a Kafkába, vagy adatokat küldhetnek a Kafkából egy másik rendszerbe. Az Apache Kafka Csatlakozás Azure IoT Hub egy összekötő, amely adatokat kér le az Azure IoT Hubból a Kafkába. Adatokat is leküldhet a Kafkából az IoT Hubba.
Az IoT Hubról való lekéréskor egy forrás-összekötőt használ. Az IoT Hubra való leküldéskor fogadó-összekötőt használ. Az IoT Hub-összekötő a forrás- és fogadó-összekötőket is biztosítja.
Az alábbi ábra az Azure IoT Hub és a HDInsighton futó Kafka közötti adatfolyamot mutatja be az összekötő használatakor.
Az Csatlakozás API-val kapcsolatos további információkért lásd: https://kafka.apache.org/documentation/#connect.
Előfeltételek
Apache Kafka-fürt a HDInsighton. További információkért lásd a Kafka on HDInsight használatába bevezető dokumentumot.
Egy élcsomópont a Kafka-fürtben. További információ: Élcsomópontok használata HDInsight-dokumentummal .
Egy SSH-ügyfél. További információért lásd: Csatlakozás a HDInsighthoz (Apache Hadoop) SSH-val.
Egy Azure IoT Hub és -eszköz. Ebben a cikkben érdemes Csatlakozás Raspberry Pi online szimulátort használni az Azure IoT Hubhoz.
Az összekötő létrehozása
Töltse le az összekötő forrását a helyi környezetbe https://github.com/Azure/toketi-kafka-connect-iothub/ .
A parancssorból keresse meg a
toketi-kafka-connect-iothub-master
könyvtárat. Ezután a következő paranccsal hozza létre és csomagolja be a projektet:sbt assembly
A build végrehajtása néhány percet vesz igénybe. A parancs létrehoz egy fájlt
kafka-connect-iothub-assembly_2.11-0.7.0.jar
atoketi-kafka-connect-iothub-master\target\scala-2.11
projekt könyvtárában.
Az összekötő telepítése
Töltse fel a .jar fájlt a Kafka peremcsomópontjára a HDInsight-fürtön. Szerkessze a következő parancsot a fürt tényleges nevére cserélve
CLUSTERNAME
. A rendszer szükség szerint az SSH-felhasználói fiók alapértelmezett értékeit és az élcsomópont nevét használja és módosítja.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
Miután a fájlmásolás befejeződött, csatlakozzon az élcsomóponthoz az SSH használatával:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Az összekötő kafka
libs
könyvtárba való telepítéséhez használja a következő parancsot:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Az SSH-kapcsolat aktív marad a többi lépésnél.
Az Apache Kafka konfigurálása
Az SSH-kapcsolatról a peremcsomópontra az alábbi lépésekkel konfigurálhatja a Kafkát az összekötő önálló módban való futtatásához:
Jelszóváltozó beállítása. Cserélje le a JELSZÓ elemet a fürt bejelentkezési jelszavára, majd írja be a következő parancsot:
export password='PASSWORD'
Telepítse a jq segédprogramot. A jq megkönnyíti az Ambari-lekérdezésekből visszaadott JSON-dokumentumok feldolgozását. Írja be az alábbi parancsot:
sudo apt -y install jq
Kérje le a Kafka-közvetítők címét. Előfordulhat, hogy sok közvetítő található a fürtben, de csak egy vagy kettőre kell hivatkoznia. Két közvetítő gazdagép címének lekéréséhez használja a következő parancsot:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
Másolja ki az értékeket későbbi használatra. A visszaadott érték az alábbi szöveghez hasonló:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Kérje le az Apache Zookeeper-csomópontok címét. A fürtben számos Zookeeper-csomópont található, de csak egy vagy két csomópontra kell hivatkoznia. A következő paranccsal tárolja a címeket a változóban
KAFKAZKHOSTS
:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
Ha az összekötőt önálló módban futtatja, a
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
fájl a Kafka-közvetítőkkel való kommunikációra szolgál. A fájl szerkesztéséhezconnect-standalone.properties
használja a következő parancsot:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
Végezze el a következő módosításokat:
Aktuális érték Új érték Megjegyzés bootstrap.servers=localhost:9092
Cserélje le az localhost:9092
értéket az előző lépésben szereplő közvetítő gazdagépekreA peremcsomópont önálló konfigurációját konfigurálja a Kafka-közvetítők megkereséséhez. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Ez a módosítás lehetővé teszi a Tesztelést a Kafka konzolgyártójával. Előfordulhat, hogy más gyártóknak és fogyasztóknak különböző konverterekre van szüksége. További információ az egyéb konverterértékek használatáról: https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Ugyanaz, mint a megadott. n/a consumer.max.poll.records=10
Hozzáadás a fájl végéhez. Ez a módosítás megakadályozza az időtúllépéseket a fogadó-összekötőben azáltal, hogy egyszerre 10 rekordra korlátozza. További információ: https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. A fájl mentéséhez használja a Ctrl + X, Y, majd az Enter billentyűkombinációt.
Az összekötő által használt témakörök létrehozásához használja az alábbi parancsokat:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
A témakörök és
iotout
aiotin
témakörök meglétének ellenőrzéséhez használja a következő parancsot:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
A
iotin
témakör az IoT Hubtól érkező üzenetek fogadására szolgál. Aiotout
témakör az IoT Hubnak küldött üzenetekre szolgál.
IoT Hub kapcsolati adatainak lekérése
Az összekötő által használt IoT Hub-információk lekéréséhez kövesse az alábbi lépéseket:
Szerezze be az Event Hub-kompatibilis végpontot és az Event Hub-kompatibilis végpont nevét az IoT Hubhoz. Az információk beszerzéséhez használja az alábbi módszerek egyikét:
Az Azure Portalon kövesse az alábbi lépéseket:
Lépjen az IoT Hubra, és válassza a Végpontok lehetőséget.
A beépített végpontok közül válassza az Események lehetőséget.
Másolja a tulajdonságok közül a következő mezők értékét:
- Event Hub-kompatibilis név
- Event Hub-kompatibilis végpont
- Partíciók
Fontos
A portál végpontértéke olyan további szöveget tartalmazhat, amelyre ebben a példában nincs szükség. Bontsa ki az ennek a mintának
sb://<randomnamespace>.servicebus.windows.net/
megfelelő szöveget.
Az Azure CLI-ben használja a következő parancsot:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
Cserélje le
myhubname
az IoT Hub nevére. A válasz hasonló a következő szöveghez:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Szerezze be a megosztott hozzáférési szabályzatot és kulcsot. Ebben a példában használja a szolgáltatáskulcsot . Az információk beszerzéséhez használja az alábbi módszerek egyikét:
Az Azure Portalon kövesse az alábbi lépéseket:
- Válassza a Megosztott hozzáférési szabályzatok lehetőséget, majd válassza ki a szolgáltatást.
- Másolja ki az elsődleges kulcs értékét.
- Másolja ki a Csatlakozás ion sztring elsődleges kulcs értékét.
Az Azure CLI-ben használja a következő parancsot:
Az elsődleges kulcs értékének lekéréséhez használja a következő parancsot:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
Cserélje le
myhubname
az IoT Hub nevére. A válasz a központ házirendjénekservice
elsődleges kulcsa.A szabályzathoz tartozó kapcsolati sztring a
service
következő paranccsal szerezheti be:az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
Cserélje le
myhubname
az IoT Hub nevére. A válasz a szabályzat kapcsolati sztringservice
.
A forráskapcsolat konfigurálása
Ha úgy szeretné konfigurálni a forrást, hogy működjön az IoT Hubbal, hajtsa végre a következő műveleteket egy SSH-kapcsolat és a peremcsomópont között:
Hozzon létre egy másolatot a
connect-iot-source.properties
fájlról a/usr/hdp/current/kafka-broker/config/
könyvtárban. A fájl toketi-kafka-connect-iothub projektből való letöltéséhez használja a következő parancsot:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
A fájl szerkesztéséhez
connect-iot-source.properties
és az IoT Hub adatainak hozzáadásához használja a következő parancsot:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
A szerkesztőben keresse meg és módosítsa a következő bejegyzéseket:
Aktuális érték Szerkesztés Kafka.Topic=PLACEHOLDER
Cserélje le a PLACEHOLDER
elemet aiotin
kérdésre. Az IoT Hubtól kapott üzenetek aiotin
témakörbe kerülnek.IotHub.EventHubCompatibleName=PLACEHOLDER
Cserélje le PLACEHOLDER
az Event Hub-kompatibilis névre.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
Cserélje le PLACEHOLDER
az Event Hub-kompatibilis végpontra.IotHub.AccessKeyName=PLACEHOLDER
Cserélje le a PLACEHOLDER
elemet aservice
kérdésre.IotHub.AccessKeyValue=PLACEHOLDER
Cserélje le PLACEHOLDER
a szabályzat elsődleges kulcsáraservice
.IotHub.Partitions=PLACEHOLDER
Cserélje le PLACEHOLDER
az előző lépések partícióinak számát.IotHub.StartTime=PLACEHOLDER
Cserélje le PLACEHOLDER
utc dátumra. Ez az a dátum, amikor az összekötő elkezdi ellenőrizni az üzeneteket. A dátumformátum a következőyyyy-mm-ddThh:mm:ssZ
: .BatchSize=100
Cserélje le a 100
elemet a5
kérdésre. Ez a változás azt eredményezi, hogy az összekötő öt új üzenetet olvas be a Kafkába az IoT Hubban.Példakonfiguráció: Kafka Csatlakozás Source Csatlakozás or for Azure IoT Hub.
A módosítások mentéséhez használja a Ctrl + X, Y, majd az Enter billentyűkombinációt.
További információ az összekötő forrásának konfigurálásáról: https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.
A fogadókapcsolat konfigurálása
Ha úgy szeretné konfigurálni a fogadókapcsolatot, hogy működjön az IoT Hubbal, hajtsa végre a következő műveleteket egy SSH-kapcsolatról a peremcsomópontra:
Hozzon létre egy másolatot a
connect-iothub-sink.properties
fájlról a/usr/hdp/current/kafka-broker/config/
könyvtárban. A fájl toketi-kafka-connect-iothub projektből való letöltéséhez használja a következő parancsot:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
A fájl szerkesztéséhez
connect-iothub-sink.properties
és az IoT Hub adatainak hozzáadásához használja a következő parancsot:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
A szerkesztőben keresse meg és módosítsa a következő bejegyzéseket:
Aktuális érték Szerkesztés topics=PLACEHOLDER
Cserélje le a PLACEHOLDER
elemet aiotout
kérdésre. A témakörbeiotout
írt üzeneteket a rendszer az IoT Hubra továbbítja.IotHub.ConnectionString=PLACEHOLDER
Cserélje le PLACEHOLDER
a szabályzat kapcsolati sztringservice
.Példakonfiguráció: Kafka Csatlakozás Sink Csatlakozás or for Azure IoT Hub.
A módosítások mentéséhez használja a Ctrl + X, Y, majd az Enter billentyűkombinációt.
Az összekötő fogadójának konfigurálásával kapcsolatos további információkért lásd: https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
A forrás-összekötő indítása
A forrás-összekötő elindításához használja a következő parancsot egy SSH-kapcsolatból a peremcsomóponthoz:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Az összekötő elindítása után küldjön üzeneteket az IoT Hubnak az eszköz(ek)ről. Ahogy az összekötő felolvassa az üzeneteket az IoT Hubról, és a Kafka-témakörben tárolja őket, naplózza az adatokat a konzolon:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Feljegyzés
Az összekötő indításakor számos figyelmeztetés jelenhet meg. Ezek a figyelmeztetések nem okoznak problémát az IoT Hubtól érkező üzenetek fogadásával kapcsolatban.
Állítsa le az összekötőt néhány perc múlva kétszer a Ctrl + C billentyűkombinációval. Az összekötő leállása néhány percet vesz igénybe.
A fogadó összekötő indítása
Az SSH-kapcsolatról a peremcsomópontra az alábbi paranccsal indíthatja el a fogadó-összekötőt önálló módban:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Az összekötő futtatásakor a következő szöveghez hasonló információk jelennek meg:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Feljegyzés
Az összekötő indításakor számos figyelmeztetés jelenhet meg. Ezek biztonságosan figyelmen kívül hagyhatók.
Üzenetek küldése
Ha az összekötőn keresztül szeretne üzeneteket küldeni, kövesse az alábbi lépéseket:
Nyisson meg egy második SSH-munkamenetet a Kafka-fürtben:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Kérje le a Kafka-közvetítők címét az új ssh-munkamenethez. Cserélje le a JELSZÓ elemet a fürt bejelentkezési jelszavára, majd írja be a következő parancsot:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
Ha üzeneteket szeretne küldeni a
iotout
témakörbe, használja a következő parancsot:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
Ez a parancs nem adja vissza a normál Bash-parancssorba. Ehelyett billentyűzetbemenetet küld a
iotout
témakörnek.Ha üzenetet szeretne küldeni az eszköznek, illessze be a JSON-dokumentumot az SSH-munkamenetbe.
kafka-console-producer
Fontos
A bejegyzés értékét az
"deviceId"
eszköz azonosítójára kell állítania. Az alábbi példában az eszköz nevemyDeviceId
:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
A JSON-dokumentum sémáját részletesebben itt ismertetjük https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md: .
Ha a szimulált Raspberry Pi-eszközt használja, és fut, az eszköz a következő üzenetet naplózza:
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
A fogadó összekötő használatával kapcsolatos további információkért lásd https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md: .
Következő lépések
Ebben a dokumentumban megtanulta, hogyan indíthatja el az IoT Kafka Csatlakozás ort a HDInsighton az Apache Kafka Csatlakozás API használatával. A Kafka használatának egyéb módjait az alábbi hivatkozások segítségével ismerheti meg:
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: