Korzystanie z platformy Apache Kafka w usłudze HDInsight z usługą Azure IoT Hub

Dowiedz się, jak używać łącznika platformy Apache Kafka Połączenie Azure IoT Hub do przenoszenia danych między platformą Apache Kafka w usłudze HDInsight i usłudze Azure IoT Hub. Z tego dokumentu dowiesz się, jak uruchomić łącznik usługi IoT Hub z węzła brzegowego w klastrze.

Interfejs API platformy Kafka Połączenie umożliwia implementowanie łączników, które stale ściągają dane do platformy Kafka lub wypychają dane z platformy Kafka do innego systemu. Platforma Apache Kafka Połączenie Azure IoT Hub to łącznik, który ściąga dane z usługi Azure IoT Hub do platformy Kafka. Może również wypychać dane z platformy Kafka do usługi IoT Hub.

Podczas ściągania z usługi IoT Hub należy użyć łącznika źródłowego. Podczas wypychania do usługi IoT Hub należy użyć łącznika ujścia. Łącznik usługi IoT Hub udostępnia łączniki źródła i ujścia.

Na poniższym diagramie przedstawiono przepływ danych między usługą Azure IoT Hub i platformą Kafka w usłudze HDInsight podczas korzystania z łącznika.

Image showing data flowing from IoT Hub to Kafka through the connector.

Aby uzyskać więcej informacji na temat interfejsu API Połączenie, zobacz https://kafka.apache.org/documentation/#connect.

Wymagania wstępne

Tworzenie łącznika

  1. Pobierz źródło łącznika ze https://github.com/Azure/toketi-kafka-connect-iothub/ środowiska lokalnego.

  2. W wierszu polecenia przejdź do toketi-kafka-connect-iothub-master katalogu. Następnie użyj następującego polecenia, aby skompilować i spakować projekt:

    sbt assembly
    

    Ukończenie kompilacji trwa kilka minut. Polecenie tworzy plik o nazwie kafka-connect-iothub-assembly_2.11-0.7.0.jar w toketi-kafka-connect-iothub-master\target\scala-2.11 katalogu dla projektu.

Instalowanie łącznika

  1. Przekaż plik .jar do węzła brzegowego platformy Kafka w klastrze usługi HDInsight. Zmodyfikuj następujące polecenie, zastępując CLUSTERNAME element rzeczywistą nazwą klastra. Wartości domyślne dla konta użytkownika SSH i nazwy węzła brzegowego są używane i modyfikowane zgodnie z potrzebami.

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. Po zakończeniu kopiowania pliku połącz się z węzłem krawędzi przy użyciu protokołu SSH:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Aby zainstalować łącznik w katalogu platformy Kafka libs , użyj następującego polecenia:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

Zachowaj aktywne połączenie SSH, aby wykonać pozostałe kroki.

Konfigurowanie platformy Apache Kafka

Z poziomu połączenia SSH z węzłem krawędzi wykonaj następujące kroki, aby skonfigurować platformę Kafka do uruchamiania łącznika w trybie autonomicznym:

  1. Konfigurowanie zmiennej hasła. Zastąp ciąg PASSWORD hasłem logowania klastra, a następnie wprowadź polecenie:

    export password='PASSWORD'
    
  2. Zainstaluj narzędzie jq. jq ułatwia przetwarzanie dokumentów JSON zwracanych z zapytań ambari. Podaj następujące polecenie:

    sudo apt -y install jq
    
  3. Uzyskaj adres brokerów platformy Kafka. W klastrze może istnieć wiele brokerów, ale wystarczy odwołać się tylko do jednego lub dwóch brokerów. Aby uzyskać adres dwóch hostów brokera, użyj następującego polecenia:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    Skopiuj wartości do późniejszego użycia. Zwrócona wartość będzie podobna do następującego tekstu:

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. Pobierz adres węzłów usługi Apache Zookeeper. W klastrze znajduje się kilka węzłów dozorców, ale wystarczy odwołać się tylko do jednego lub dwóch węzłów. Użyj następującego polecenia, aby zapisać adresy w zmiennej KAFKAZKHOSTS:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. Podczas uruchamiania łącznika w trybie /usr/hdp/current/kafka-broker/config/connect-standalone.properties autonomicznym plik jest używany do komunikowania się z brokerami platformy Kafka. Aby edytować connect-standalone.properties plik, użyj następującego polecenia:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. Wprowadź następujące zmiany:

    Bieżąca wartość Nowa wartość Komentarz
    bootstrap.servers=localhost:9092 Zastąp localhost:9092 wartość hostami brokera z poprzedniego kroku Konfiguruje konfigurację autonomiczną węzła brzegowego w celu znalezienia brokerów platformy Kafka.
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter Ta zmiana umożliwia przetestowanie przy użyciu producenta konsoli dołączonego do platformy Kafka. Mogą być potrzebne różne konwertery dla innych producentów i konsumentów. Aby uzyskać informacje na temat używania innych wartości konwertera, zobacz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter Takie same jak podane.
    Nie dotyczy consumer.max.poll.records=10 Dodaj do końca pliku. Ta zmiana polega na zapobieganiu przekroczeniom limitu czasu w łączniku ujścia przez ograniczenie go do 10 rekordów naraz. Aby uzyskać więcej informacji, zobacz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
  7. Aby zapisać plik, użyj klawiszy Ctrl + X, Y, a następnie wprowadź.

  8. Aby utworzyć tematy używane przez łącznik, użyj następujących poleceń:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    Aby sprawdzić, czy tematy iotin i iotout istnieją, użyj następującego polecenia:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    Temat iotin służy do odbierania komunikatów z usługi IoT Hub. Temat iotout służy do wysyłania komunikatów do usługi IoT Hub.

Uzyskiwanie informacji o połączeniu z usługą IoT Hub

Aby pobrać informacje o centrum IoT używane przez łącznik, wykonaj następujące kroki:

  1. Pobierz punkt końcowy zgodny z centrum zdarzeń i nazwę punktu końcowego zgodnego z centrum zdarzeń dla centrum IoT. Aby uzyskać te informacje, użyj jednej z następujących metod:

    • W witrynie Azure Portal wykonaj następujące czynności:

      1. Przejdź do centrum IoT Hub i wybierz pozycję Punkty końcowe.

      2. W obszarze Wbudowane punkty końcowe wybierz pozycję Zdarzenia.

      3. Z obszaru Właściwości skopiuj wartość następujących pól:

        • Nazwa zgodna z centrum zdarzeń
        • Punkt końcowy zgodny z centrum zdarzeń
        • Partycje

        Ważne

        Wartość punktu końcowego z portalu może zawierać dodatkowy tekst, który nie jest potrzebny w tym przykładzie. Wyodrębnij tekst pasujący do tego wzorca sb://<randomnamespace>.servicebus.windows.net/.

    • W interfejsie wiersza polecenia platformy Azure użyj następującego polecenia:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      Zastąp myhubname ciąg nazwą centrum IoT Hub. Odpowiedź jest podobna do następującego tekstu:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. Uzyskaj zasady dostępu współdzielonego i klucz. W tym przykładzie użyj klucza usługi . Aby uzyskać te informacje, użyj jednej z następujących metod:

    • W witrynie Azure Portal wykonaj następujące czynności:

      1. Wybierz pozycję Zasady dostępu współdzielonego, a następnie wybierz pozycję Usługa.
      2. Skopiuj wartość klucza podstawowego.
      3. Skopiuj wartość ciągu Połączenie ion --klucz podstawowy.
    • W interfejsie wiersza polecenia platformy Azure użyj następującego polecenia:

      1. Aby uzyskać wartość klucza podstawowego, użyj następującego polecenia:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        Zastąp myhubname ciąg nazwą centrum IoT Hub. Odpowiedź jest kluczem service podstawowym zasad dla tego centrum.

      2. Aby uzyskać parametry połączenia dla service zasad, użyj następującego polecenia:

        az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
        

        Zastąp myhubname ciąg nazwą centrum IoT Hub. Odpowiedź to parametry połączenia zasadservice.

Konfigurowanie połączenia źródłowego

Aby skonfigurować źródło do pracy z usługą IoT Hub, wykonaj następujące akcje z połączenia SSH z węzłem krawędzi:

  1. Utwórz kopię connect-iot-source.properties pliku w /usr/hdp/current/kafka-broker/config/ katalogu. Aby pobrać plik z projektu toketi-kafka-connect-iothub, użyj następującego polecenia:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. Aby edytować connect-iot-source.properties plik i dodać informacje o centrum IoT Hub, użyj następującego polecenia:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. W edytorze znajdź i zmień następujące wpisy:

    Bieżąca wartość Edytuj
    Kafka.Topic=PLACEHOLDER Zamień PLACEHOLDER na iotin. Komunikaty odebrane z centrum IoT są umieszczane w temacie iotin .
    IotHub.EventHubCompatibleName=PLACEHOLDER Zastąp PLACEHOLDER ciąg nazwą zgodną z centrum zdarzeń.
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER Zastąp element PLACEHOLDER punktem końcowym zgodnym z centrum zdarzeń.
    IotHub.AccessKeyName=PLACEHOLDER Zamień PLACEHOLDER na service.
    IotHub.AccessKeyValue=PLACEHOLDER Zastąp PLACEHOLDER element kluczem service podstawowym zasad.
    IotHub.Partitions=PLACEHOLDER Zastąp PLACEHOLDER ciąg liczbą partycji z poprzednich kroków.
    IotHub.StartTime=PLACEHOLDER Zastąp element PLACEHOLDER datą UTC. Ta data to godzina rozpoczęcia sprawdzania komunikatów przez łącznik. Format daty to yyyy-mm-ddThh:mm:ssZ.
    BatchSize=100 Zamień 100 na 5. Ta zmiana powoduje, że łącznik odczytuje komunikaty na platformie Kafka po utworzeniu pięciu nowych komunikatów w centrum IoT.

    Aby uzyskać przykładową konfigurację, zobacz Kafka Połączenie Source Połączenie or for Azure IoT Hub (Platforma Kafka Połączenie source Połączenie or for Azure IoT Hub).

  4. Aby zapisać zmiany, użyj klawiszy Ctrl + X, Y, a następnie wprowadź.

Aby uzyskać więcej informacji na temat konfigurowania źródła łącznika, zobacz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.

Konfigurowanie połączenia ujścia

Aby skonfigurować połączenie ujścia do pracy z usługą IoT Hub, wykonaj następujące akcje z połączenia SSH z węzłem krawędzi:

  1. Utwórz kopię connect-iothub-sink.properties pliku w /usr/hdp/current/kafka-broker/config/ katalogu. Aby pobrać plik z projektu toketi-kafka-connect-iothub, użyj następującego polecenia:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. Aby edytować connect-iothub-sink.properties plik i dodać informacje o centrum IoT Hub, użyj następującego polecenia:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. W edytorze znajdź i zmień następujące wpisy:

    Bieżąca wartość Edytuj
    topics=PLACEHOLDER Zamień PLACEHOLDER na iotout. Komunikaty zapisywane w iotout temacie są przekazywane do centrum IoT Hub.
    IotHub.ConnectionString=PLACEHOLDER Zastąp PLACEHOLDER element parametry połączenia zasadservice.

    Aby zapoznać się z przykładową konfiguracją, zobacz Kafka Połączenie Sink Połączenie or for Azure IoT Hub (Ujście Połączenie platformy Kafka dla usługi Azure IoT Hub).

  4. Aby zapisać zmiany, użyj klawiszy Ctrl + X, Y, a następnie wprowadź.

Aby uzyskać więcej informacji na temat konfigurowania ujścia łącznika, zobacz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Uruchamianie łącznika źródłowego

  1. Aby uruchomić łącznik źródłowy, użyj następującego polecenia z połączenia SSH z węzłem krawędzi:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    Po uruchomieniu łącznika wyślij komunikaty do centrum IoT Hub z urządzeń. Gdy łącznik odczytuje komunikaty z centrum IoT i przechowuje je w temacie platformy Kafka, rejestruje informacje w konsoli:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    Uwaga

    Podczas uruchamiania łącznika może zostać wyświetlonych kilka ostrzeżeń. Te ostrzeżenia nie powodują problemów z odbieraniem komunikatów z centrum IoT Hub.

  2. Zatrzymaj łącznik po kilku minutach za pomocą klawiszy Ctrl + C dwa razy. Zatrzymanie łącznika trwa kilka minut.

Uruchamianie łącznika ujścia

Z poziomu połączenia SSH z węzłem krawędzi użyj następującego polecenia, aby uruchomić łącznik ujścia w trybie autonomicznym:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

W miarę uruchamiania łącznika wyświetlane są informacje podobne do następującego tekstu:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Uwaga

Podczas uruchamiania łącznika można zauważyć kilka ostrzeżeń. Możesz je bezpiecznie zignorować.

Wysyłanie komunikatów

Aby wysyłać komunikaty za pośrednictwem łącznika, wykonaj następujące czynności:

  1. Otwórz drugą sesję SSH w klastrze platformy Kafka:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Uzyskaj adres brokerów platformy Kafka dla nowej sesji SSH. Zastąp ciąg PASSWORD hasłem logowania klastra, a następnie wprowadź polecenie:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. Aby wysłać komunikaty do tematu iotout , użyj następującego polecenia:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    To polecenie nie zwraca normalnego wiersza polecenia powłoki Bash. Zamiast tego wysyła dane wejściowe klawiatury do tematu iotout .

  4. Aby wysłać komunikat do urządzenia, wklej dokument JSON do sesji SSH dla elementu kafka-console-producer.

    Ważne

    Musisz ustawić wartość "deviceId" wpisu na identyfikator urządzenia. W poniższym przykładzie urządzenie ma nazwę myDeviceId:

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    Schemat dla tego dokumentu JSON został opisany bardziej szczegółowo w temacie https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Jeśli używasz symulowanego urządzenia Raspberry Pi i jest ono uruchomione, urządzenie rejestruje następujący komunikat:

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

Aby uzyskać więcej informacji na temat korzystania z łącznika ujścia, zobacz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Następne kroki

W tym dokumencie przedstawiono sposób uruchamiania usługi IoT Kafka Połączenie or w usłudze HDInsight przy użyciu interfejsu API Połączenie platformy Apache Kafka. Skorzystaj z poniższych linków, aby odnaleźć inne sposoby pracy z platformą Kafka: