Share via


Verwendung von Apache Kafka in HDInsight mit Azure IoT Hub

Hier erfahren Sie, wie Sie mithilfe des Apache Kafka Connect-Azure IoT Hub-Connectors Daten zwischen Apache Kafka in HDInsight und Azure IoT Hub verschieben. In diesem Dokument erfahren Sie, wie der IoT Hub-Connector über einen Edgeknoten im Cluster ausgeführt wird.

Mit der Kafka Connect-API können Sie Connector implementieren, die kontinuierlich Daten in Kafka abrufen oder Daten über Kafka mithilfe von Push an ein anderes System übertragen. Apache Kafka Connect Azure IoT Hub ist ein Connector, der Daten aus Azure IoT Hub in Kafka pullt. Dieser kann Daten auch über Kafka mithilfe von Push an IoT Hub übertragen.

Für den Abruf von Daten aus IoT Hub verwenden Sie einen Quellconnector. Für die Übertragung von Daten mithilfe von Push an IoT Hub verwenden Sie einen Senkenconnector. Der IoT Hub-Connector stellt sowohl Quell- als auch Senkenconnector bereit.

In der folgenden Abbildung wird der Datenfluss zwischen Azure IoT Hub und Kafka in HDInsight unter Verwendung des Connectors gezeigt.

Image showing data flowing from IoT Hub to Kafka through the connector.

Weitere Informationen zur Connect-API finden Sie unter https://kafka.apache.org/documentation/#connect.

Voraussetzungen

Erstellen des Connectors

  1. Laden Sie die Quelle für den Connector unter https://github.com/Azure/toketi-kafka-connect-iothub/ in Ihre lokale Umgebung herunter.

  2. Navigieren Sie in einer Eingabeaufforderung zum Verzeichnis toketi-kafka-connect-iothub-master. Erstellen und packen Sie das Projekt dann mit dem folgenden Befehl:

    sbt assembly
    

    Die Erstellung kann einige Minuten dauern. Mit diesem Befehl wird die Datei kafka-connect-iothub-assembly_2.11-0.7.0.jar im Zielverzeichnis toketi-kafka-connect-iothub-master\target\scala-2.11 für das Projekt erstellt.

Installieren des Connectors

  1. Laden Sie die JAR-Datei auf den Edgeknoten Ihres Kafka in HDInsight-Clusters hoch. Bearbeiten Sie den folgenden Befehl, indem Sie CLUSTERNAME durch den tatsächlichen Namen Ihres Clusters ersetzen. Die Standardwerte für das SSH-Benutzerkonto und der Name des Edgeknotens werden verwendet und nach Bedarf geändert.

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. Stellen Sie nach dem Dateikopiervorgang über SSH eine Verbindung mit dem Edgeknoten her:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Verwenden Sie für die Installation des Connectors im Kafka-Verzeichnis libs den folgenden Befehl:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

Halten Sie die SSH-Verbindung für die restlichen Schritte aufrecht.

Konfigurieren von Apache Kafka

Führen Sie über Ihre SSH-Verbindung mit dem Edgeknoten die folgenden Schritte durch, um Kafka für die Ausführung des Connectors im eigenständigen Modus zu konfigurieren:

  1. Richten Sie eine Kennwortvariable ein. Ersetzen Sie PASSWORD durch das Kennwort für die Clusteranmeldung, und geben Sie dann den folgenden Befehl ein:

    export password='PASSWORD'
    
  2. Installieren Sie das Hilfsprogramm jq. Durch jq können die von Ambari-Abfragen zurückgegebenen JSON-Dokumente einfacher verarbeitet werden. Geben Sie folgenden Befehl ein:

    sudo apt -y install jq
    
  3. Rufen Sie die Adresse der Kafka-Broker ab. Auch wenn in Ihrem Cluster möglicherweise viele Broker vorhanden sind, müssen Sie nur auf einen oder zwei verweisen. Um die Adresse der zwei Brokerhosts zu ermitteln, verwenden Sie den folgenden Befehl:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    Kopieren Sie diese Werte zur späteren Verwendung. Der zurückgegebene Wert ähnelt dem folgenden Text:

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. Rufen Sie die Adresse der Apache ZooKeeper-Knoten ab. Auch wenn in Ihrem Cluster möglicherweise mehrere ZooKeeper-Knoten vorhanden sind, müssen Sie nur auf einen oder zwei verweisen. Verwenden Sie den folgenden Befehl, um die Adressen in der Variable KAFKAZKHOSTS zu speichern:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. Bei der Ausführung des Connectors im eigenständigen Modus wird für die Kommunikation mit den Kafka-Brokern die Datei /usr/hdp/current/kafka-broker/config/connect-standalone.properties verwendet. Verwenden Sie zum Bearbeiten der Datei connect-standalone.properties den folgenden Befehl:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. Nehmen Sie die folgenden Bearbeitungen vor:

    Aktueller Wert Neuer Wert Comment
    bootstrap.servers=localhost:9092 Ersetzen Sie den Wert localhost:9092 durch die Brokerhosts aus dem vorherigen Schritt. Konfiguriert die eigenständige Konfiguration für den Edgeknoten zum Suchen der Kafka-Broker.
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter Durch diese Änderung können Sie Tests mit dem Konsolenproducer, der bei Kafka enthalten ist, durchführen. Eventuell benötigen Sie für andere Producer und Consumer unterschiedliche Konverter. Informationen zur Verwendung anderer Konverterwerte finden Sie unter https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter Identisch mit der Angabe.
    N/V consumer.max.poll.records=10 Am Ende der Datei hinzufügen. Durch diese Änderung soll Timeouts im Senkenconnector verhindert werden, indem dieser auf jeweils 10 Datensätze begrenzt wird. Weitere Informationen finden Sie unter https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
  7. Drücken Sie zum Speichern der Datei STRG+X, Y und dann die EINGABETASTE.

  8. Um die vom Connector verwendeten Themen zu erstellen, verwenden Sie die folgenden Befehle:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    Um zu bestätigen, dass die Themen iotin und iotout erstellt wurden, verwenden Sie den folgenden Befehl:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    Das Thema iotin dient zum Empfangen von Nachrichten von IoT Hub. Das Thema iotout dient zum Senden von Nachrichten an IoT Hub.

Abrufen der IoT Hub-Verbindungsinformationen

Führen Sie zum Abrufen von IoT Hub-Informationen, die vom Connector verwendet werden, die folgenden Schritte durch:

  1. Rufen Sie den Event Hub-kompatiblen Endpunkt und den zugehörigen Endpunktnamen für Ihren IoT Hub ab. Sie können diese Informationen mit einer der folgenden Methoden ermitteln:

    • Führen Sie die folgenden Schritte über das Azure-Portal aus:

      1. Navigieren Sie zu Ihrer IoT Hub-Instanz, und klicken Sie auf Endpunkte.

      2. Wählen Sie unter Integrierte Endpunkte die Option Ereignisse aus.

      3. Kopieren Sie unter Eigenschaften den Wert der folgenden Felder:

        • Event Hub-kompatibler Name
        • Event Hub-kompatibler Endpunkt
        • Partitionen

        Wichtig

        Der Endpunktwert aus dem Portal enthält möglicherweise zusätzlichen Text, der in diesem Beispiel nicht benötigt wird. Extrahieren Sie den Text, der dem Muster sb://<randomnamespace>.servicebus.windows.net/ entspricht.

    • Geben Sie über die Azure CLI den folgenden Befehl ein:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      Ersetzen Sie myhubname durch den Namen Ihrer IoT Hub-Instanz. Die Antwort ähnelt dem folgenden Text:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. Rufen Sie die SAS-Richtlinie und den Schlüssel ab. In diesem Beispiel wird der Dienstschlüssel verwendet. Sie können diese Informationen mit einer der folgenden Methoden ermitteln:

    • Führen Sie die folgenden Schritte über das Azure-Portal aus:

      1. Wählen Sie SAS-Richtlinien aus, und klicken Sie dann auf Dienst.
      2. Kopieren Sie den Wert unter Primärschlüssel.
      3. Kopieren Sie den Wert von Verbindungszeichenfolge – Primärschlüssel.
    • Geben Sie über die Azure CLI den folgenden Befehl ein:

      1. Um den Wert des Primärschlüssels zu ermitteln, verwenden Sie den folgenden Befehl:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        Ersetzen Sie myhubname durch den Namen Ihrer IoT Hub-Instanz. Die Antwort ist der Primärschlüssel für die Richtlinie service für diesen Hub.

      2. Verwenden Sie zum Abrufen der Verbindungszeichenfolge für die Richtlinie service den folgenden Befehl:

        az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
        

        Ersetzen Sie myhubname durch den Namen Ihrer IoT Hub-Instanz. Die Antwort ist die Verbindungszeichenfolge für die Richtlinie service.

Konfigurieren der Quellverbindung

Um die Quelle für die Arbeit mit Ihrer IoT Hub-Instanz zu konfigurieren, führen Sie die folgenden Aktionen über eine SSH-Verbindung mit dem Edgeknoten aus:

  1. Erstellen Sie eine Kopie von der Datei connect-iot-source.properties im Verzeichnis /usr/hdp/current/kafka-broker/config/. Um die Datei aus dem Projekt „toketi-kafka-connect-iothub“ herunterzuladen, verwenden Sie den folgenden Befehl:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. Verwenden Sie zum Bearbeiten der Datei connect-iot-source.properties und Hinzufügen der IoT Hub-Informationen den folgenden Befehl:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. Suchen Sie im Editor nach den folgenden Einträgen, und ändern Sie sie:

    Aktueller Wert Edit (Bearbeiten)
    Kafka.Topic=PLACEHOLDER Ersetzen Sie PLACEHOLDER durch iotin. Die vom IoT Hub empfangenen Nachrichten werden in das Thema iotin platziert.
    IotHub.EventHubCompatibleName=PLACEHOLDER Ersetzen Sie PLACEHOLDER durch den Event Hub-kompatiblen Namen.
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER Ersetzen Sie PLACEHOLDER durch den Event Hub-kompatiblen Endpunkt.
    IotHub.AccessKeyName=PLACEHOLDER Ersetzen Sie PLACEHOLDER durch service.
    IotHub.AccessKeyValue=PLACEHOLDER Ersetzen Sie PLACEHOLDER durch den Primärschlüssel der Richtlinie service.
    IotHub.Partitions=PLACEHOLDER Ersetzen Sie PLACEHOLDER durch die Anzahl von Partitionen aus den vorherigen Schritten.
    IotHub.StartTime=PLACEHOLDER Ersetzen Sie PLACEHOLDER durch ein Datum im UTC-Format. Dieses Datum bezieht sich auf den Zeitpunkt, an dem der Connector mit der Prüfung auf Nachrichten begonnen hat. Das Datumsformat ist yyyy-mm-ddThh:mm:ssZ.
    BatchSize=100 Ersetzen Sie 100 durch 5. Diese Änderung bewirkt, dass der Connector Nachrichten in Kafka liest, sobald fünf neue Nachrichten in IoT Hub vorhanden sind.

    Eine Beispielkonfiguration finden Sie unter Kafka Connect Source Connector for Azure IoT Hub (Kafka Connect-Quellconnector für Azure IoT Hub).

  4. Drücken Sie zum Speichern der Änderungen STRG+X, Y und dann die EINGABETASTE.

Weitere Informationen zum Konfigurieren der Connectorquelle finden Sie unter https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.

Konfigurieren der Senkenverbindung

Um die Senkenverbindung für die Arbeit mit Ihrer IoT Hub-Instanz zu konfigurieren, führen Sie die folgenden Aktionen über eine SSH-Verbindung mit dem Edgeknoten aus:

  1. Erstellen Sie eine Kopie von der Datei connect-iothub-sink.properties im Verzeichnis /usr/hdp/current/kafka-broker/config/. Um die Datei aus dem Projekt „toketi-kafka-connect-iothub“ herunterzuladen, verwenden Sie den folgenden Befehl:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. Verwenden Sie zum Bearbeiten der Datei connect-iothub-sink.properties und Hinzufügen der IoT Hub-Informationen den folgenden Befehl:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. Suchen Sie im Editor nach den folgenden Einträgen, und ändern Sie sie:

    Aktueller Wert Edit (Bearbeiten)
    topics=PLACEHOLDER Ersetzen Sie PLACEHOLDER durch iotout. Nachrichten, die in das Thema iotout geschrieben wurden, werden an IoT Hub weitergeleitet.
    IotHub.ConnectionString=PLACEHOLDER Ersetzen Sie PLACEHOLDER durch die Verbindungszeichenfolge für die Richtlinie service.

    Eine Beispielkonfiguration finden Sie unter Kafka Connect Sink Connector for Azure IoT Hub (Kafka Connect-Senkenconnector für Azure IoT Hub).

  4. Drücken Sie zum Speichern der Änderungen STRG+X, Y und dann die EINGABETASTE.

Weitere Informationen zum Konfigurieren der Connectorsenke finden Sie unter https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Starten des Quellconnectors

  1. Geben Sie zum Starten des Quellconnectors den folgenden Befehl über eine SSH-Verbindung mit dem Edgeknoten ein:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    Sobald der Connector gestartet wurde, senden Sie Nachrichten an IoT Hub von Ihren Geräten. Während der Connector Nachrichten von IoT Hub liest und im Kafka-Thema speichert, protokolliert er Informationen in der Konsole:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    Hinweis

    Beim Starten des Connectors werden möglicherweise mehrere Warnungen angezeigt. Diese Warnungen führen nicht zu Problemen mit dem Empfang von Nachrichten von IoT Hub.

  2. Brechen Sie den Connector nach einigen Minuten ab, indem Sie zweimal STRG+C drücken. Das Anhalten des Connectors dauert einige Minuten.

Starten des Senkenconnectors

Geben Sie über eine SSH-Verbindung mit dem Edgeknoten den folgenden Befehl ein, um den Senkenconnector zu starten:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

Bei der Ausführung des Connectors werden Informationen ähnlich dem folgenden Text angezeigt:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Hinweis

Beim Starten des Connectors werden möglicherweise mehrere Warnungen angezeigt. Sie können dies ignorieren.

Senden von Nachrichten

Führen Sie zum Senden von Nachrichten über den Connector folgende Schritte durch:

  1. Öffnen Sie eine zweite SSH-Sitzung mit dem Kafka-Cluster:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Rufen Sie die Adresse der Kafka-Broker für die neue SSH-Sitzung ab. Ersetzen Sie PASSWORD durch das Kennwort für die Clusteranmeldung, und geben Sie dann den folgenden Befehl ein:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. Um Nachrichten an das Thema iotout zu senden, verwenden Sie folgenden Befehl:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    Bei diesem Befehl werden Sie nicht zur gewohnten Bash-Eingabeaufforderung weitergeleitet. Stattdessen sendet diese Tastatureingaben an das Thema iotout.

  4. Fügen Sie zum Senden einer Nachricht an Ihr Gerät ein JSON-Dokument in der SSH-Sitzung für kafka-console-producer ein.

    Wichtig

    Sie müssen den Wert des Eintrags "deviceId" auf die ID Ihres Geräts festlegen. Im folgenden Beispiel ist das Gerät mit dem Namen myDeviceId versehen:

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    Das Schema für dieses JSON-Dokument wird unter https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md ausführlicher beschrieben.

Wenn Sie das simulierte Raspberry Pi-Gerät verwenden und dieses ausgeführt wird, protokolliert das Gerät die folgende Nachricht:

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

Weitere Informationen zur Verwendung des Senkenconnectors finden Sie unter https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Nächste Schritte

In diesem Dokument haben Sie gelernt, wie Sie mithilfe der Apache Kafka Connect-API den IoT Kafka-Connector in HDInsight starten. Verwenden Sie die folgenden Links, um weitere Möglichkeiten zur Arbeit mit Kafka kennenzulernen: