Aracılığıyla paylaş


Azure IoT Hub ile HDInsight üzerinde Apache Kafka kullanma

Verileri HDInsight üzerinde Apache Kafka ile Azure IoT Hub arasında taşımak için Apache Kafka Bağlan Azure IoT Hub bağlayıcısını kullanmayı öğrenin. Bu belgede IoT Hub bağlayıcısını kümedeki bir kenar düğümünden çalıştırmayı öğreneceksiniz.

Kafka Bağlan API'si, kafkaya sürekli veri çeken veya Kafka'dan başka bir sisteme veri gönderen bağlayıcılar uygulamanıza olanak tanır. Apache Kafka Bağlan Azure IoT Hub, Azure IoT Hub'dan Kafka'ya veri çeken bir bağlayıcıdır. Ayrıca Kafka'dan IoT Hub'a veri gönderebilirsiniz.

IoT Hub'dan çekerken bir kaynak bağlayıcısı kullanırsınız. IoT Hub'a gönderim yaparken bir havuz bağlayıcısı kullanırsınız. IoT Hub bağlayıcısı hem kaynak hem de havuz bağlayıcıları sağlar.

Aşağıdaki diyagramda bağlayıcı kullanılırken HDInsight üzerinde Azure IoT Hub ile Kafka arasındaki veri akışı gösterilmektedir.

Image showing data flowing from IoT Hub to Kafka through the connector.

Bağlan API hakkında daha fazla bilgi için bkzhttps://kafka.apache.org/documentation/#connect. .

Önkoşullar

Bağlayıcıyı oluşturma

  1. Bağlayıcının https://github.com/Azure/toketi-kafka-connect-iothub/ kaynağını yerel ortamınıza indirin.

  2. Komut isteminden dizinine toketi-kafka-connect-iothub-master gidin. Ardından projeyi derlemek ve paketlemek için aşağıdaki komutu kullanın:

    sbt assembly
    

    Derlemenin tamamlanması birkaç dakika sürer. komutu, projenin dizininde toketi-kafka-connect-iothub-master\target\scala-2.11 adlı kafka-connect-iothub-assembly_2.11-0.7.0.jar bir dosya oluşturur.

Bağlayıcıyı yükleme

  1. .jar dosyasını HDInsight kümesindeki Kafka'nızın kenar düğümüne yükleyin. değerini kümenizin gerçek adıyla değiştirerek CLUSTERNAME aşağıdaki komutu düzenleyin. SSH kullanıcı hesabı ve kenar düğümünün adı için varsayılan değerler kullanılır ve gerektiğinde değiştirilir.

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. Dosya kopyalama işlemi tamamlandıktan sonra SSH kullanarak kenar düğümüne bağlanın:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Bağlayıcıyı Kafka libs dizinine yüklemek için aşağıdaki komutu kullanın:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

Kalan adımlar için SSH bağlantınızı etkin tutun.

Apache Kafka'yi yapılandırma

SSH bağlantınızdan kenar düğümüne, Kafka'yı bağlayıcıyı tek başına modda çalıştıracak şekilde yapılandırmak için aşağıdaki adımları kullanın:

  1. Parola değişkenini ayarlayın. PASSWORD değerini küme oturum açma parolası ile değiştirin ve komutunu girin:

    export password='PASSWORD'
    
  2. jq yardımcı programını yükleyin. jq, Ambari sorgularından döndürülen JSON belgelerinin işlenmesini kolaylaştırır. Aşağıdaki komutu girin:

    sudo apt -y install jq
    
  3. Kafka aracılarının adresini alın. Kümenizde birçok aracı olabilir, ancak yalnızca bir veya iki aracıya başvurmanız gerekir. İki aracı ana bilgisayar adresini almak için aşağıdaki komutu kullanın:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    Değerleri daha sonra kullanmak üzere kopyalayın. Döndürülen değer aşağıdaki metne benzer:

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. Apache Zookeeper düğümlerinin adresini alın. Kümede birkaç Zookeeper düğümü vardır, ancak yalnızca bir veya ikisine başvurmanız gerekir. Değişkeninde KAFKAZKHOSTSadresleri depolamak için aşağıdaki komutu kullanın:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. Bağlayıcıyı tek başına modda çalıştırırken, /usr/hdp/current/kafka-broker/config/connect-standalone.properties dosya Kafka aracılarıyla iletişim kurmak için kullanılır. Dosyayı düzenlemek connect-standalone.properties için aşağıdaki komutu kullanın:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. Aşağıdaki düzenlemeleri yapın:

    Geçerli değer Yeni değer Yorum
    bootstrap.servers=localhost:9092 localhost:9092 değerini önceki adımda yer alan aracı konaklarıyla değiştirin Kafka aracılarını bulmak için kenar düğümü için tek başına yapılandırmayı yapılandırılır.
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter Bu değişiklik, Kafka ile birlikte gelen konsol üreticisini kullanarak test etmenizi sağlar. Diğer üreticiler ve tüketiciler için farklı dönüştürücülere ihtiyacınız olabilir. Diğer dönüştürücü değerlerini kullanma hakkında bilgi için bkz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. .
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter Verilenle aynı.
    Yok consumer.max.poll.records=10 Dosyanın sonuna ekleyin. Bu değişiklik, havuz bağlayıcısını bir kerede 10 kayıtla sınırlayarak zaman aşımlarını önlemektir. Daha fazla bilgi için bkz. https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
  7. Dosyayı kaydetmek için Ctrl + X, Y ve ardından Enter tuşlarını kullanın.

  8. Bağlayıcı tarafından kullanılan konuları oluşturmak için aşağıdaki komutları kullanın:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    ve iotout konularının iotin mevcut olduğunu doğrulamak için aşağıdaki komutu kullanın:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    Konu iotin , IoT Hub'dan ileti almak için kullanılır. Konu iotout , IoT Hub'a ileti göndermek için kullanılır.

IoT Hub bağlantı bilgilerini alma

Bağlayıcı tarafından kullanılan IoT hub bilgilerini almak için aşağıdaki adımları kullanın:

  1. IoT hub'ınız için Event Hub ile uyumlu uç nokta ve Event Hub uyumlu uç nokta adını alın. Bu bilgileri almak için aşağıdaki yöntemlerden birini kullanın:

    • Azure portalında aşağıdaki adımları kullanın:

      1. IoT Hub'ınıza gidin ve Uç Noktalar'ı seçin.

      2. Yerleşik uç noktalardan Olaylar'ı seçin.

      3. Özellikler'den aşağıdaki alanların değerini kopyalayın:

        • Olay Hub'ı ile uyumlu ad
        • Event Hub uyumlu uç nokta
        • Bölümler

        Önemli

        Portaldaki uç nokta değeri, bu örnekte gerekli olmayan ek metinler içerebilir. Bu desenle sb://<randomnamespace>.servicebus.windows.net/eşleşen metni ayıklayın.

    • Azure CLI'dan aşağıdaki komutu kullanın:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      değerini IoT hub'ınızın adıyla değiştirin myhubname . Yanıt aşağıdaki metne benzer:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. Paylaşılan erişim ilkesini ve anahtarını alın. Bu örnekte hizmet anahtarını kullanın. Bu bilgileri almak için aşağıdaki yöntemlerden birini kullanın:

    • Azure portalında aşağıdaki adımları kullanın:

      1. Paylaşılan erişim ilkeleri'ni ve ardından hizmet'i seçin.
      2. Birincil anahtar değerini kopyalayın.
      3. Bağlan ion dizesini--birincil anahtar değerini kopyalayın.
    • Azure CLI'dan aşağıdaki komutu kullanın:

      1. Birincil anahtar değerini almak için aşağıdaki komutu kullanın:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        değerini IoT hub'ınızın adıyla değiştirin myhubname . Yanıt, bu hub için ilkenin service birincil anahtarıdır.

      2. İlkenin bağlantı dizesi almak için service aşağıdaki komutu kullanın:

        az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
        

        değerini IoT hub'ınızın adıyla değiştirin myhubname . Yanıt, ilkenin bağlantı dizesiservice.

Kaynak bağlantıyı yapılandırma

Kaynağı IoT Hub'ınızla çalışacak şekilde yapılandırmak için SSH bağlantısından kenar düğümüne aşağıdaki eylemleri gerçekleştirin:

  1. Dizininde connect-iot-source.properties /usr/hdp/current/kafka-broker/config/ dosyanın bir kopyasını oluşturun. Dosyayı toketi-kafka-connect-iothub projesinden indirmek için aşağıdaki komutu kullanın:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. Dosyayı düzenlemek connect-iot-source.properties ve IoT hub bilgilerini eklemek için aşağıdaki komutu kullanın:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. Düzenleyicide aşağıdaki girdileri bulun ve değiştirin:

    Geçerli değer Düzenle
    Kafka.Topic=PLACEHOLDER PLACEHOLDER öğesini iotin ile değiştirin. IoT hub'ından alınan iletiler konuya yerleştirilir iotin .
    IotHub.EventHubCompatibleName=PLACEHOLDER değerini Event Hub ile uyumlu adla değiştirin PLACEHOLDER .
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER değerini Event Hub ile uyumlu uç noktayla değiştirin PLACEHOLDER .
    IotHub.AccessKeyName=PLACEHOLDER PLACEHOLDER öğesini service ile değiştirin.
    IotHub.AccessKeyValue=PLACEHOLDER değerini ilkenin birincil anahtarıyla service değiştirinPLACEHOLDER.
    IotHub.Partitions=PLACEHOLDER değerini önceki adımlardan bölüm sayısıyla değiştirin PLACEHOLDER .
    IotHub.StartTime=PLACEHOLDER değerini UTC tarihiyle değiştirin PLACEHOLDER . Bu tarih, bağlayıcının iletileri denetlemeye başladığı tarihtir. Tarih biçimi şeklindedir yyyy-mm-ddThh:mm:ssZ.
    BatchSize=100 100 öğesini 5 ile değiştirin. Bu değişiklik, IoT hub'ında beş yeni ileti olduğunda bağlayıcının Kafka'ya ileti okumasına neden olur.

    Örnek yapılandırma için bkz. Azure IoT Hub için Kafka Bağlan Kaynak Bağlan veya.

  4. Değişiklikleri kaydetmek için Ctrl + X, Y ve ardından Enter tuşlarını kullanın.

Bağlayıcı kaynağını yapılandırma hakkında daha fazla bilgi için bkz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md. .

Havuz bağlantısını yapılandırma

Havuz bağlantısını IoT Hub'ınızla çalışacak şekilde yapılandırmak için bir SSH bağlantısından kenar düğümüne aşağıdaki eylemleri gerçekleştirin:

  1. Dizininde connect-iothub-sink.properties /usr/hdp/current/kafka-broker/config/ dosyanın bir kopyasını oluşturun. Dosyayı toketi-kafka-connect-iothub projesinden indirmek için aşağıdaki komutu kullanın:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. Dosyayı düzenlemek connect-iothub-sink.properties ve IoT hub bilgilerini eklemek için aşağıdaki komutu kullanın:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. Düzenleyicide aşağıdaki girdileri bulun ve değiştirin:

    Geçerli değer Düzenle
    topics=PLACEHOLDER PLACEHOLDER öğesini iotout ile değiştirin. Konuya yazılan iotout iletiler IoT hub'ına iletilir.
    IotHub.ConnectionString=PLACEHOLDER değerini ilkenin bağlantı dizesi service ile değiştirinPLACEHOLDER.

    Örnek yapılandırma için bkz. Azure IoT Hub için Kafka Bağlan Sink Bağlan or.

  4. Değişiklikleri kaydetmek için Ctrl + X, Y ve ardından Enter tuşlarını kullanın.

Bağlayıcı havuzu yapılandırma hakkında daha fazla bilgi için bkz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. .

Kaynak bağlayıcıyı başlatma

  1. Kaynak bağlayıcıyı başlatmak için SSH bağlantısından kenar düğümüne aşağıdaki komutu kullanın:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    Bağlayıcı başlatıldıktan sonra cihazınızdan IoT hub'ına ileti gönderin. Bağlayıcı IoT hub'ından gelen iletileri okuyup Kafka konusunda depoladıkça, bilgileri konsola kaydeder:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    Not

    Bağlayıcı başlatılırken birkaç uyarı görebilirsiniz. Bu uyarılar IoT hub'ından ileti almayla ilgili sorunlara neden olmaz.

  2. Ctrl + C tuşlarını iki kez kullanarak bağlayıcıyı birkaç dakika sonra durdurun. Bağlayıcının durması birkaç dakika sürer.

Havuz bağlayıcısını başlatma

Kenar düğümüne bir SSH bağlantısından havuz bağlayıcısını tek başına modda başlatmak için aşağıdaki komutu kullanın:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

Bağlayıcı çalışırken, aşağıdaki metne benzer bilgiler görüntülenir:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Not

Bağlayıcı başlatılırken birkaç uyarıyla karşılaşabilirsiniz. Bunları güvenle yok sayabilirsiniz.

İleti gönderme

Bağlayıcı aracılığıyla ileti göndermek için aşağıdaki adımları kullanın:

  1. Kafka kümesine ikinci bir SSH oturumu açın:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Yeni ssh oturumu için Kafka aracılarının adresini alın. PASSWORD değerini küme oturum açma parolası ile değiştirin ve komutunu girin:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. Konuya ileti göndermek için iotout aşağıdaki komutu kullanın:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    Bu komut sizi normal Bash istemine döndürmez. Bunun yerine konuya klavye girişi iotout gönderir.

  4. Cihazınıza ileti göndermek için, SSH oturumuna bir JSON belgesi yapıştırın kafka-console-producer.

    Önemli

    Girdinin "deviceId" değerini cihazınızın kimliğine ayarlamanız gerekir. Aşağıdaki örnekte, cihaz olarak adlandırılmıştır myDeviceId:

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    Bu JSON belgesinin şeması adresinde https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.mddaha ayrıntılı olarak açıklanmıştır.

Raspberry Pi simülasyon cihazını kullanıyorsanız ve çalışıyorsa cihaz aşağıdaki iletiyi günlüğe kaydeder::

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

Havuz bağlayıcısını kullanma hakkında daha fazla bilgi için bkz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. .

Sonraki adımlar

Bu belgede, HDInsight'ta IoT Kafka Bağlan veya başlatmak için Apache Kafka Bağlan API'sini kullanmayı öğrendiniz. Kafka ile çalışmanın diğer yollarını keşfetmek için aşağıdaki bağlantıları kullanın: