Megosztás a következőn keresztül:


Az Apache Kafka használata HDInsighton az Azure IoT Hubbal

Megtudhatja, hogyan helyezhet át adatokat az Apache Kafka a HDInsighton és az Azure IoT Hubon az Apache Kafka Csatlakozás Azure IoT Hub-összekötő használatával. Ebben a dokumentumban megtudhatja, hogyan futtathatja az IoT Hub-összekötőt a fürt peremcsomópontjáról.

A Kafka Csatlakozás API lehetővé teszi olyan összekötők implementálását, amelyek folyamatosan adatokat kérnek le a Kafkába, vagy adatokat küldhetnek a Kafkából egy másik rendszerbe. Az Apache Kafka Csatlakozás Azure IoT Hub egy összekötő, amely adatokat kér le az Azure IoT Hubból a Kafkába. Adatokat is leküldhet a Kafkából az IoT Hubba.

Az IoT Hubról való lekéréskor egy forrás-összekötőt használ. Az IoT Hubra való leküldéskor fogadó-összekötőt használ. Az IoT Hub-összekötő a forrás- és fogadó-összekötőket is biztosítja.

Az alábbi ábra az Azure IoT Hub és a HDInsighton futó Kafka közötti adatfolyamot mutatja be az összekötő használatakor.

Image showing data flowing from IoT Hub to Kafka through the connector.

Az Csatlakozás API-val kapcsolatos további információkért lásd: https://kafka.apache.org/documentation/#connect.

Előfeltételek

Az összekötő létrehozása

  1. Töltse le az összekötő forrását a helyi környezetbe https://github.com/Azure/toketi-kafka-connect-iothub/ .

  2. A parancssorból keresse meg a toketi-kafka-connect-iothub-master könyvtárat. Ezután a következő paranccsal hozza létre és csomagolja be a projektet:

    sbt assembly
    

    A build végrehajtása néhány percet vesz igénybe. A parancs létrehoz egy fájlt kafka-connect-iothub-assembly_2.11-0.7.0.jar a toketi-kafka-connect-iothub-master\target\scala-2.11 projekt könyvtárában.

Az összekötő telepítése

  1. Töltse fel a .jar fájlt a Kafka peremcsomópontjára a HDInsight-fürtön. Szerkessze a következő parancsot a fürt tényleges nevére cserélve CLUSTERNAME . A rendszer szükség szerint az SSH-felhasználói fiók alapértelmezett értékeit és az élcsomópont nevét használja és módosítja.

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. Miután a fájlmásolás befejeződött, csatlakozzon az élcsomóponthoz az SSH használatával:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Az összekötő kafka libs könyvtárba való telepítéséhez használja a következő parancsot:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

Az SSH-kapcsolat aktív marad a többi lépésnél.

Az Apache Kafka konfigurálása

Az SSH-kapcsolatról a peremcsomópontra az alábbi lépésekkel konfigurálhatja a Kafkát az összekötő önálló módban való futtatásához:

  1. Jelszóváltozó beállítása. Cserélje le a JELSZÓ elemet a fürt bejelentkezési jelszavára, majd írja be a következő parancsot:

    export password='PASSWORD'
    
  2. Telepítse a jq segédprogramot. A jq megkönnyíti az Ambari-lekérdezésekből visszaadott JSON-dokumentumok feldolgozását. Írja be az alábbi parancsot:

    sudo apt -y install jq
    
  3. Kérje le a Kafka-közvetítők címét. Előfordulhat, hogy sok közvetítő található a fürtben, de csak egy vagy kettőre kell hivatkoznia. Két közvetítő gazdagép címének lekéréséhez használja a következő parancsot:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    Másolja ki az értékeket későbbi használatra. A visszaadott érték az alábbi szöveghez hasonló:

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. Kérje le az Apache Zookeeper-csomópontok címét. A fürtben számos Zookeeper-csomópont található, de csak egy vagy két csomópontra kell hivatkoznia. A következő paranccsal tárolja a címeket a változóban KAFKAZKHOSTS:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. Ha az összekötőt önálló módban futtatja, a /usr/hdp/current/kafka-broker/config/connect-standalone.properties fájl a Kafka-közvetítőkkel való kommunikációra szolgál. A fájl szerkesztéséhez connect-standalone.properties használja a következő parancsot:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. Végezze el a következő módosításokat:

    Aktuális érték Új érték Megjegyzés
    bootstrap.servers=localhost:9092 Cserélje le az localhost:9092 értéket az előző lépésben szereplő közvetítő gazdagépekre A peremcsomópont önálló konfigurációját konfigurálja a Kafka-közvetítők megkereséséhez.
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter Ez a módosítás lehetővé teszi a Tesztelést a Kafka konzolgyártójával. Előfordulhat, hogy más gyártóknak és fogyasztóknak különböző konverterekre van szüksége. További információ az egyéb konverterértékek használatáról: https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter Ugyanaz, mint a megadott.
    n/a consumer.max.poll.records=10 Hozzáadás a fájl végéhez. Ez a módosítás megakadályozza az időtúllépéseket a fogadó-összekötőben azáltal, hogy egyszerre 10 rekordra korlátozza. További információ: https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
  7. A fájl mentéséhez használja a Ctrl + X, Y, majd az Enter billentyűkombinációt.

  8. Az összekötő által használt témakörök létrehozásához használja az alábbi parancsokat:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    A témakörök és iotout a iotin témakörök meglétének ellenőrzéséhez használja a következő parancsot:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    A iotin témakör az IoT Hubtól érkező üzenetek fogadására szolgál. A iotout témakör az IoT Hubnak küldött üzenetekre szolgál.

IoT Hub kapcsolati adatainak lekérése

Az összekötő által használt IoT Hub-információk lekéréséhez kövesse az alábbi lépéseket:

  1. Szerezze be az Event Hub-kompatibilis végpontot és az Event Hub-kompatibilis végpont nevét az IoT Hubhoz. Az információk beszerzéséhez használja az alábbi módszerek egyikét:

    • Az Azure Portalon kövesse az alábbi lépéseket:

      1. Lépjen az IoT Hubra, és válassza a Végpontok lehetőséget.

      2. A beépített végpontok közül válassza az Események lehetőséget.

      3. Másolja a tulajdonságok közül a következő mezők értékét:

        • Event Hub-kompatibilis név
        • Event Hub-kompatibilis végpont
        • Partíciók

        Fontos

        A portál végpontértéke olyan további szöveget tartalmazhat, amelyre ebben a példában nincs szükség. Bontsa ki az ennek a mintának sb://<randomnamespace>.servicebus.windows.net/megfelelő szöveget.

    • Az Azure CLI-ben használja a következő parancsot:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      Cserélje le myhubname az IoT Hub nevére. A válasz hasonló a következő szöveghez:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. Szerezze be a megosztott hozzáférési szabályzatot és kulcsot. Ebben a példában használja a szolgáltatáskulcsot . Az információk beszerzéséhez használja az alábbi módszerek egyikét:

    • Az Azure Portalon kövesse az alábbi lépéseket:

      1. Válassza a Megosztott hozzáférési szabályzatok lehetőséget, majd válassza ki a szolgáltatást.
      2. Másolja ki az elsődleges kulcs értékét.
      3. Másolja ki a Csatlakozás ion sztring elsődleges kulcs értékét.
    • Az Azure CLI-ben használja a következő parancsot:

      1. Az elsődleges kulcs értékének lekéréséhez használja a következő parancsot:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        Cserélje le myhubname az IoT Hub nevére. A válasz a központ házirendjének service elsődleges kulcsa.

      2. A szabályzathoz tartozó kapcsolati sztring a service következő paranccsal szerezheti be:

        az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
        

        Cserélje le myhubname az IoT Hub nevére. A válasz a szabályzat kapcsolati sztringservice.

A forráskapcsolat konfigurálása

Ha úgy szeretné konfigurálni a forrást, hogy működjön az IoT Hubbal, hajtsa végre a következő műveleteket egy SSH-kapcsolat és a peremcsomópont között:

  1. Hozzon létre egy másolatot a connect-iot-source.properties fájlról a /usr/hdp/current/kafka-broker/config/ könyvtárban. A fájl toketi-kafka-connect-iothub projektből való letöltéséhez használja a következő parancsot:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. A fájl szerkesztéséhez connect-iot-source.properties és az IoT Hub adatainak hozzáadásához használja a következő parancsot:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. A szerkesztőben keresse meg és módosítsa a következő bejegyzéseket:

    Aktuális érték Szerkesztés
    Kafka.Topic=PLACEHOLDER Cserélje le a PLACEHOLDER elemet a iotin kérdésre. Az IoT Hubtól kapott üzenetek a iotin témakörbe kerülnek.
    IotHub.EventHubCompatibleName=PLACEHOLDER Cserélje le PLACEHOLDER az Event Hub-kompatibilis névre.
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER Cserélje le PLACEHOLDER az Event Hub-kompatibilis végpontra.
    IotHub.AccessKeyName=PLACEHOLDER Cserélje le a PLACEHOLDER elemet a service kérdésre.
    IotHub.AccessKeyValue=PLACEHOLDER Cserélje le PLACEHOLDER a szabályzat elsődleges kulcsára service .
    IotHub.Partitions=PLACEHOLDER Cserélje le PLACEHOLDER az előző lépések partícióinak számát.
    IotHub.StartTime=PLACEHOLDER Cserélje le PLACEHOLDER utc dátumra. Ez az a dátum, amikor az összekötő elkezdi ellenőrizni az üzeneteket. A dátumformátum a következő yyyy-mm-ddThh:mm:ssZ: .
    BatchSize=100 Cserélje le a 100 elemet a 5 kérdésre. Ez a változás azt eredményezi, hogy az összekötő öt új üzenetet olvas be a Kafkába az IoT Hubban.

    Példakonfiguráció: Kafka Csatlakozás Source Csatlakozás or for Azure IoT Hub.

  4. A módosítások mentéséhez használja a Ctrl + X, Y, majd az Enter billentyűkombinációt.

További információ az összekötő forrásának konfigurálásáról: https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.

A fogadókapcsolat konfigurálása

Ha úgy szeretné konfigurálni a fogadókapcsolatot, hogy működjön az IoT Hubbal, hajtsa végre a következő műveleteket egy SSH-kapcsolatról a peremcsomópontra:

  1. Hozzon létre egy másolatot a connect-iothub-sink.properties fájlról a /usr/hdp/current/kafka-broker/config/ könyvtárban. A fájl toketi-kafka-connect-iothub projektből való letöltéséhez használja a következő parancsot:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. A fájl szerkesztéséhez connect-iothub-sink.properties és az IoT Hub adatainak hozzáadásához használja a következő parancsot:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. A szerkesztőben keresse meg és módosítsa a következő bejegyzéseket:

    Aktuális érték Szerkesztés
    topics=PLACEHOLDER Cserélje le a PLACEHOLDER elemet a iotout kérdésre. A témakörbe iotout írt üzeneteket a rendszer az IoT Hubra továbbítja.
    IotHub.ConnectionString=PLACEHOLDER Cserélje le PLACEHOLDER a szabályzat kapcsolati sztringservice.

    Példakonfiguráció: Kafka Csatlakozás Sink Csatlakozás or for Azure IoT Hub.

  4. A módosítások mentéséhez használja a Ctrl + X, Y, majd az Enter billentyűkombinációt.

Az összekötő fogadójának konfigurálásával kapcsolatos további információkért lásd: https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

A forrás-összekötő indítása

  1. A forrás-összekötő elindításához használja a következő parancsot egy SSH-kapcsolatból a peremcsomóponthoz:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    Az összekötő elindítása után küldjön üzeneteket az IoT Hubnak az eszköz(ek)ről. Ahogy az összekötő felolvassa az üzeneteket az IoT Hubról, és a Kafka-témakörben tárolja őket, naplózza az adatokat a konzolon:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    Feljegyzés

    Az összekötő indításakor számos figyelmeztetés jelenhet meg. Ezek a figyelmeztetések nem okoznak problémát az IoT Hubtól érkező üzenetek fogadásával kapcsolatban.

  2. Állítsa le az összekötőt néhány perc múlva kétszer a Ctrl + C billentyűkombinációval. Az összekötő leállása néhány percet vesz igénybe.

A fogadó összekötő indítása

Az SSH-kapcsolatról a peremcsomópontra az alábbi paranccsal indíthatja el a fogadó-összekötőt önálló módban:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

Az összekötő futtatásakor a következő szöveghez hasonló információk jelennek meg:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Feljegyzés

Az összekötő indításakor számos figyelmeztetés jelenhet meg. Ezek biztonságosan figyelmen kívül hagyhatók.

Üzenetek küldése

Ha az összekötőn keresztül szeretne üzeneteket küldeni, kövesse az alábbi lépéseket:

  1. Nyisson meg egy második SSH-munkamenetet a Kafka-fürtben:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Kérje le a Kafka-közvetítők címét az új ssh-munkamenethez. Cserélje le a JELSZÓ elemet a fürt bejelentkezési jelszavára, majd írja be a következő parancsot:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. Ha üzeneteket szeretne küldeni a iotout témakörbe, használja a következő parancsot:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    Ez a parancs nem adja vissza a normál Bash-parancssorba. Ehelyett billentyűzetbemenetet küld a iotout témakörnek.

  4. Ha üzenetet szeretne küldeni az eszköznek, illessze be a JSON-dokumentumot az SSH-munkamenetbe.kafka-console-producer

    Fontos

    A bejegyzés értékét az "deviceId" eszköz azonosítójára kell állítania. Az alábbi példában az eszköz neve myDeviceId:

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    A JSON-dokumentum sémáját részletesebben itt ismertetjük https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md: .

Ha a szimulált Raspberry Pi-eszközt használja, és fut, az eszköz a következő üzenetet naplózza:

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

A fogadó összekötő használatával kapcsolatos további információkért lásd https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md: .

Következő lépések

Ebben a dokumentumban megtanulta, hogyan indíthatja el az IoT Kafka Csatlakozás ort a HDInsighton az Apache Kafka Csatlakozás API használatával. A Kafka használatának egyéb módjait az alábbi hivatkozások segítségével ismerheti meg: