Share via


HDInsight 上の Apache Kafka を Azure IoT Hub と共に使用する

Apache Kafka Connect Azure IoT Hub コネクタを使用して HDInsight 上の Apache Kafka と Azure IoT Hub の間でデータを移動する方法を学習します。 このドキュメントでは、クラスター内のエッジ ノードから IoT Hub コネクタを実行する方法を説明します。

Kafka Connect API では、Kafka へのデータのプルまたは Kafka から別のシステムへのデータのプッシュを継続的に行うコネクタを実装できます。 Apache Kafka Connect Azure IoT Hub は、Azure IoT Hub から Kafka にデータをプルするコネクタです。 Kafka から IoT Hub にデータをプッシュすることもことできます。

IoT Hub からプルする場合は、ソース コネクタを使用します。 IoT Hub にプッシュする場合は、シンク コネクタを使用します。 IoT Hub コネクタでは、ソース コネクタとシンク コネクタの両方が提供されます。

次の図は、コネクタを使用するときの Azure IoT Hub と HDInsight 上の Kafka の間のデータ フローを示しています。

Image showing data flowing from IoT Hub to Kafka through the connector.

Connect API について詳しくは、https://kafka.apache.org/documentation/#connect をご覧ください。

前提条件

コネクタを作成します

  1. コネクタのソースを https://github.com/Azure/toketi-kafka-connect-iothub/ から開発環境にダウンロードします。

  2. コマンドプロンプトから、toketi-kafka-connect-iothub-master ディレクトリに移動します。 次に、次のコマンドを使用して、プロジェクトをビルドおよびパッケージ化します。

    sbt assembly
    

    ビルドが完了するまで数分かかります。 このコマンドにより、プロジェクトの toketi-kafka-connect-iothub-master\target\scala-2.11 ディレクトリに kafka-connect-iothub-assembly_2.11-0.7.0.jar というファイルが作成されます。

コネクタをインストールする

  1. .jar ファイルを HDInsight クラスター上の Kafka のエッジ ノードにアップロードします。 以下のコマンドを編集して、CLUSTERNAME をクラスターの実際の名前に置き換えます。 SSH ユーザーアカウントの既定値と エッジノード の名前が使用されています。必要に応じて変更してください。

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. ファイルのコピーが完了したら、SSH を使用してエッジ ノードに接続します。

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Kafka の libs ディレクトリにコネクタをインストールするには、次のコマンドを使用します。

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

残りの手順では、SSH 接続を有効のままにしておきます。

Apache Kafka を構成する

エッジ ノードへの SSH 接続から、次の手順を使用して、コネクタをスタンドアロン モードで実行するように Kafka を構成します。

  1. パスワード変数を設定します。 PASSWORD をクラスターのログインパスワードに置き換え、次のコマンドを入力します。

    export password='PASSWORD'
    
  2. jq ユーティリティをインストールします。 jq は、Ambari クエリから返された JSON ドキュメントの処理をしやすくします。 次のコマンドを入力します。

    sudo apt -y install jq
    
  3. Kafka ブローカーのアドレスを取得します。 クラスターには多くのブローカーがある場合がありますが、参照する必要があるのは 1 つか 2 つだけです。 2 つのブローカー ホストのアドレスを取得するには、次のコマンドを使用します。

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    後で使用するために値をコピーします。 次のテキストのような値が返されます。

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. Apache ZooKeeper ノードのアドレスを取得します。 クラスターには複数の Zookeeper ノードがありますが、参照する必要があるのは 1 つか 2 つだけです。 次のコマンドを使用して、変数 KAFKAZKHOSTSにアドレスを格納します。

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. コネクタをスタンドアロン モードで実行している場合は、Kafka ブローカーと通信するために /usr/hdp/current/kafka-broker/config/connect-standalone.properties ファイルが使用されます。 connect-standalone.properties ファイルを編集するには、次のコマンドを使用します。

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. 次の編集を行います。

    現在の値 新しい値 解説
    bootstrap.servers=localhost:9092 localhost:9092 値を前の手順のブローカー ホストに置き換えます。 エッジ ノードのスタンドアロン構成を設定して、Kafka ブローカーを検索します。
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter この変更により、Kafka に含まれているコンソール プロデューサーを使用してテストすることができます。 他のプロデューサーとコンシューマーには別のコンバーターが必要な場合があります。 他のコンバーター値の使用について、https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md をご覧ください。
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter 指定したものと同じです。
    該当なし consumer.max.poll.records=10 ファイルの末尾に追加します。 この変更により、一度に 10 個のレコードに限定することによって、シンク コネクタでのタイムアウトを防ぎます。 詳細については、「https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md」を参照してください。
  7. ファイルを保存するには、Ctrl + X キー、Y キー、Enter キーの順に押します。

  8. コネクタで使用されるトピックを作成するには、次のコマンドを使用します。

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    iotin および iotout トピックが存在することを確認するには、次のコマンドを使用します。

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    iotin トピックは、IoT ハブからメッセージを受信するために使用されます。 iotout トピックは、IoT ハブにメッセージを送信するために使用されます。

IoT ハブ接続情報の取得

コネクタによって使用される IoT ハブ情報を取得するには、次の手順を使用します。

  1. IoT ハブのイベント ハブ互換エンドポイントとイベント ハブ互換エンドポイント名を取得します。 この情報を取得するには、次のいずれかの方法を使用します。

    • Azure Portal、次の手順に従います。

      1. IoT ハブに移動し、[エンドポイント] を選択します。

      2. [組み込みのエンドポイント] から、[イベント] を選択します。

      3. [プロパティ] から、次のフィールドの値をコピーします。

        • イベント ハブ互換の名前
        • Event Hub と互換性があるエンドポイント
        • パーティション

        重要

        ポータルからのエンドポイント値には、この例では必要のない余分なテキストが含まれていることがあります。 このパターン sb://<randomnamespace>.servicebus.windows.net/ に一致するテキストを抽出します。

    • Azure CLI、次のコマンドを使用します。

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      myhubname は、お使いの IoT ハブの名前に置き換えます。 応答は次のテキストのようになります。

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. 共有アクセス ポリシーキー を取得します。 たとえば、サービス キーを使用します。 この情報を取得するには、次のいずれかの方法を使用します。

    • Azure Portal、次の手順に従います。

      1. [共有アクセス ポリシー] を選択してから、[サービス] を選択します。
      2. [主キー] の値をコピーします。
      3. [接続文字列 -- 主キー] の値をコピーします。
    • Azure CLI、次のコマンドを使用します。

      1. 主キーの値を取得するには、次のコマンドを使用します。

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        myhubname は、お使いの IoT ハブの名前に置き換えます。 応答は、このハブの service ポリシーの主キーです。

      2. service ポリシーの接続文字列を取得するには、次のコマンドを使用します。

        az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
        

        myhubname は、お使いの IoT ハブの名前に置き換えます。 応答は service ポリシー用の接続文字列です。

ソース接続の構成

IoT ハブを使用するようにソースを構成するには、エッジ ノードへの SSH 接続から次のアクションを実行します。

  1. /usr/hdp/current/kafka-broker/config/ ディレクトリに connect-iot-source.properties ファイルのコピーを作成します。 toketi-kafka-connect-iothub プロジェクトからファイルをダウンロードするには、次のコマンドを使用します。

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. connect-iot-source.properties ファイルを編集し、IoT ハブ情報を追加するには、次のコマンドを使用します。

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. エディターで、次のエントリを検索し、変更します。

    現在の値 編集
    Kafka.Topic=PLACEHOLDER PLACEHOLDERiotin で置き換え IoT ハブから受信したメッセージは iotin トピックに配置されます。
    IotHub.EventHubCompatibleName=PLACEHOLDER PLACEHOLDER をイベント ハブ互換の名前に置き換えます。
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER PLACEHOLDER をイベント ハブ互換エンドポイントに置き換えます。
    IotHub.AccessKeyName=PLACEHOLDER PLACEHOLDERservice で置き換え
    IotHub.AccessKeyValue=PLACEHOLDER PLACEHOLDERservice ポリシーの主キーに置き換えます。
    IotHub.Partitions=PLACEHOLDER PLACEHOLDER を前の手順のパーティション数に置き換えます。
    IotHub.StartTime=PLACEHOLDER PLACEHOLDER を UTC 日付に置き換えます。 この日付は、コネクタがメッセージの検査を開始した日です。 日付の形式は yyyy-mm-ddThh:mm:ssZ です。
    BatchSize=100 1005 で置き換え この変更により、コネクタは IoT ハブに 5 つの新しいメッセージが入った後にメッセージを Kafka に読み取ります。

    構成例については、「Kafka Connect Source Connector for Azure IoT Hub」を参照してください。

  4. 変更を保存するには、Ctrl + X キー、Y キー、Enter キーの順に押します。

コネクタ ソースの構成について詳しくは、https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md をご覧ください。

シンク接続の構成

IoT ハブを使用するようにシンク接続を構成するには、エッジ ノードへの SSH 接続から次のアクションを実行します。

  1. /usr/hdp/current/kafka-broker/config/ ディレクトリに connect-iothub-sink.properties ファイルのコピーを作成します。 toketi-kafka-connect-iothub プロジェクトからファイルをダウンロードするには、次のコマンドを使用します。

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. connect-iothub-sink.properties ファイルを編集し、IoT ハブ情報を追加するには、次のコマンドを使用します。

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. エディターで、次のエントリを検索し、変更します。

    現在の値 編集
    topics=PLACEHOLDER PLACEHOLDERiotout で置き換え iotout トピックに書き込まれたメッセージが IoT ハブに転送されます。
    IotHub.ConnectionString=PLACEHOLDER PLACEHOLDERservice ポリシーの接続文字列に置き換えます。

    構成例については、「Kafka Connect Source Connector for Azure IoT Hub」を参照してください。

  4. 変更を保存するには、Ctrl + X キー、Y キー、Enter キーの順に押します。

コネクタ シンクの構成について詳しくは、https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md をご覧ください。

ソース コネクタの開始

  1. ソース コネクタを開始するには、エッジ ノードへの SSH 接続から次のコマンドを使用します。

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    コネクタが開始された後で、デバイスから IoT ハブにメッセージを送信します。 コネクタは、IoT ハブからのメッセージを読み取り、Kafka トピックに格納するときに、コンソールに情報を記録します。

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    Note

    コネクタが開始されると、複数の警告が表示されることがあります。 これらの警告は、IoT ハブからのメッセージの受信で問題が発生する原因にはなりません。

  2. Ctrl + C を2回押して、数分後にコネクタを停止します。 コネクタが停止するまで数分かかります。

シンク コネクタの開始

エッジ ノードへの SSH 接続から、次のコマンドを使用して、シンク コネクタをスタンドアロン モードで開始します。

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

コネクタが実行されると、次のテキストのような情報が表示されます。

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Note

コネクタが開始されると、複数の警告が表示されることがあります。 これらは無視してかまいません。

メッセージを送信する

コネクタを通じてメッセージを送信するには、次の手順を使用します。

  1. Kafka クラスターへの 2 番目 の SSH セッションを開きます。

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. 新しい SSH セッションの Kafka ブローカーのアドレスを取得します。 PASSWORD をクラスターのログインパスワードに置き換え、次のコマンドを入力します。

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. iotout トピックにメッセージを送信するには、次のコマンドを使用します。

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    このコマンドでは、通常の Bash プロンプトに戻りません。 代わりに、キーボード入力を iotout トピックに送信します。

  4. デバイスにメッセージを送信するには、kafka-console-producer の SSH セッションに JSON ドキュメントを貼り付けます。

    重要

    "deviceId" エントリの値をデバイスの ID に設定する必要があります。 次の例では、デバイスの名前は myDeviceId です。

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    この JSON ドキュメントのスキーマについては、https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md で詳しく説明されています。

シミュレートされた Raspberry Pi デバイスを使用し、それが実行されている場合は、デバイスによって次のメッセージがログに記録されます。

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

シンク コネクタの使用について詳しくは、https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md をご覧ください。

次のステップ

このドキュメントでは、Apache Kafka Connect API を使用して HDInsight 上の IoT Kafka Connector を開始する方法について説明しました。 次のリンクを使用することで、Kafka のその他の活用方法を知ることができます。