Azure IoT Hub ile HDInsight üzerinde Apache Kafka kullanma
Verileri HDInsight üzerinde Apache Kafka ile Azure IoT Hub arasında taşımak için Apache Kafka Bağlan Azure IoT Hub bağlayıcısını kullanmayı öğrenin. Bu belgede IoT Hub bağlayıcısını kümedeki bir kenar düğümünden çalıştırmayı öğreneceksiniz.
Kafka Bağlan API'si, kafkaya sürekli veri çeken veya Kafka'dan başka bir sisteme veri gönderen bağlayıcılar uygulamanıza olanak tanır. Apache Kafka Bağlan Azure IoT Hub, Azure IoT Hub'dan Kafka'ya veri çeken bir bağlayıcıdır. Ayrıca Kafka'dan IoT Hub'a veri gönderebilirsiniz.
IoT Hub'dan çekerken bir kaynak bağlayıcısı kullanırsınız. IoT Hub'a gönderim yaparken bir havuz bağlayıcısı kullanırsınız. IoT Hub bağlayıcısı hem kaynak hem de havuz bağlayıcıları sağlar.
Aşağıdaki diyagramda bağlayıcı kullanılırken HDInsight üzerinde Azure IoT Hub ile Kafka arasındaki veri akışı gösterilmektedir.
Bağlan API hakkında daha fazla bilgi için bkzhttps://kafka.apache.org/documentation/#connect. .
Önkoşullar
HDInsight üzerinde Apache Kafka kümesi. Daha fazla bilgi için HDInsight üzerinde Kafka hızlı başlangıcı belgesine bakın.
Kafka kümesindeki bir kenar düğümü. Daha fazla bilgi için HDInsight ile kenar düğümlerini kullanma belgesine bakın.
Bir SSH istemcisi. Daha fazla bilgi için bkz. SSH kullanarak HDInsight'a (Apache Hadoop) bağlanma.
Azure IoT Hub ve cihaz. Bu makale için Bağlan Raspberry Pi çevrimiçi simülatörünü Azure IoT Hub'a kullanmayı göz önünde bulundurun.
Bağlayıcıyı oluşturma
Bağlayıcının https://github.com/Azure/toketi-kafka-connect-iothub/ kaynağını yerel ortamınıza indirin.
Komut isteminden dizinine
toketi-kafka-connect-iothub-master
gidin. Ardından projeyi derlemek ve paketlemek için aşağıdaki komutu kullanın:sbt assembly
Derlemenin tamamlanması birkaç dakika sürer. komutu, projenin dizininde
toketi-kafka-connect-iothub-master\target\scala-2.11
adlıkafka-connect-iothub-assembly_2.11-0.7.0.jar
bir dosya oluşturur.
Bağlayıcıyı yükleme
.jar dosyasını HDInsight kümesindeki Kafka'nızın kenar düğümüne yükleyin. değerini kümenizin gerçek adıyla değiştirerek
CLUSTERNAME
aşağıdaki komutu düzenleyin. SSH kullanıcı hesabı ve kenar düğümünün adı için varsayılan değerler kullanılır ve gerektiğinde değiştirilir.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
Dosya kopyalama işlemi tamamlandıktan sonra SSH kullanarak kenar düğümüne bağlanın:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Bağlayıcıyı Kafka
libs
dizinine yüklemek için aşağıdaki komutu kullanın:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Kalan adımlar için SSH bağlantınızı etkin tutun.
Apache Kafka'yi yapılandırma
SSH bağlantınızdan kenar düğümüne, Kafka'yı bağlayıcıyı tek başına modda çalıştıracak şekilde yapılandırmak için aşağıdaki adımları kullanın:
Parola değişkenini ayarlayın. PASSWORD değerini küme oturum açma parolası ile değiştirin ve komutunu girin:
export password='PASSWORD'
jq yardımcı programını yükleyin. jq, Ambari sorgularından döndürülen JSON belgelerinin işlenmesini kolaylaştırır. Aşağıdaki komutu girin:
sudo apt -y install jq
Kafka aracılarının adresini alın. Kümenizde birçok aracı olabilir, ancak yalnızca bir veya iki aracıya başvurmanız gerekir. İki aracı ana bilgisayar adresini almak için aşağıdaki komutu kullanın:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
Değerleri daha sonra kullanmak üzere kopyalayın. Döndürülen değer aşağıdaki metne benzer:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Apache Zookeeper düğümlerinin adresini alın. Kümede birkaç Zookeeper düğümü vardır, ancak yalnızca bir veya ikisine başvurmanız gerekir. Değişkeninde
KAFKAZKHOSTS
adresleri depolamak için aşağıdaki komutu kullanın:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
Bağlayıcıyı tek başına modda çalıştırırken,
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
dosya Kafka aracılarıyla iletişim kurmak için kullanılır. Dosyayı düzenlemekconnect-standalone.properties
için aşağıdaki komutu kullanın:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
Aşağıdaki düzenlemeleri yapın:
Geçerli değer Yeni değer Yorum bootstrap.servers=localhost:9092
localhost:9092
değerini önceki adımda yer alan aracı konaklarıyla değiştirinKafka aracılarını bulmak için kenar düğümü için tek başına yapılandırmayı yapılandırılır. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Bu değişiklik, Kafka ile birlikte gelen konsol üreticisini kullanarak test etmenizi sağlar. Diğer üreticiler ve tüketiciler için farklı dönüştürücülere ihtiyacınız olabilir. Diğer dönüştürücü değerlerini kullanma hakkında bilgi için bkz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. . value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Verilenle aynı. Yok consumer.max.poll.records=10
Dosyanın sonuna ekleyin. Bu değişiklik, havuz bağlayıcısını bir kerede 10 kayıtla sınırlayarak zaman aşımlarını önlemektir. Daha fazla bilgi için bkz. https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. Dosyayı kaydetmek için Ctrl + X, Y ve ardından Enter tuşlarını kullanın.
Bağlayıcı tarafından kullanılan konuları oluşturmak için aşağıdaki komutları kullanın:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
ve
iotout
konularınıniotin
mevcut olduğunu doğrulamak için aşağıdaki komutu kullanın:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
Konu
iotin
, IoT Hub'dan ileti almak için kullanılır. Konuiotout
, IoT Hub'a ileti göndermek için kullanılır.
IoT Hub bağlantı bilgilerini alma
Bağlayıcı tarafından kullanılan IoT hub bilgilerini almak için aşağıdaki adımları kullanın:
IoT hub'ınız için Event Hub ile uyumlu uç nokta ve Event Hub uyumlu uç nokta adını alın. Bu bilgileri almak için aşağıdaki yöntemlerden birini kullanın:
Azure portalında aşağıdaki adımları kullanın:
IoT Hub'ınıza gidin ve Uç Noktalar'ı seçin.
Yerleşik uç noktalardan Olaylar'ı seçin.
Özellikler'den aşağıdaki alanların değerini kopyalayın:
- Olay Hub'ı ile uyumlu ad
- Event Hub uyumlu uç nokta
- Bölümler
Önemli
Portaldaki uç nokta değeri, bu örnekte gerekli olmayan ek metinler içerebilir. Bu desenle
sb://<randomnamespace>.servicebus.windows.net/
eşleşen metni ayıklayın.
Azure CLI'dan aşağıdaki komutu kullanın:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
değerini IoT hub'ınızın adıyla değiştirin
myhubname
. Yanıt aşağıdaki metne benzer:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Paylaşılan erişim ilkesini ve anahtarını alın. Bu örnekte hizmet anahtarını kullanın. Bu bilgileri almak için aşağıdaki yöntemlerden birini kullanın:
Azure portalında aşağıdaki adımları kullanın:
- Paylaşılan erişim ilkeleri'ni ve ardından hizmet'i seçin.
- Birincil anahtar değerini kopyalayın.
- Bağlan ion dizesini--birincil anahtar değerini kopyalayın.
Azure CLI'dan aşağıdaki komutu kullanın:
Birincil anahtar değerini almak için aşağıdaki komutu kullanın:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
değerini IoT hub'ınızın adıyla değiştirin
myhubname
. Yanıt, bu hub için ilkeninservice
birincil anahtarıdır.İlkenin bağlantı dizesi almak için
service
aşağıdaki komutu kullanın:az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
değerini IoT hub'ınızın adıyla değiştirin
myhubname
. Yanıt, ilkenin bağlantı dizesiservice
.
Kaynak bağlantıyı yapılandırma
Kaynağı IoT Hub'ınızla çalışacak şekilde yapılandırmak için SSH bağlantısından kenar düğümüne aşağıdaki eylemleri gerçekleştirin:
Dizininde
connect-iot-source.properties
/usr/hdp/current/kafka-broker/config/
dosyanın bir kopyasını oluşturun. Dosyayı toketi-kafka-connect-iothub projesinden indirmek için aşağıdaki komutu kullanın:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
Dosyayı düzenlemek
connect-iot-source.properties
ve IoT hub bilgilerini eklemek için aşağıdaki komutu kullanın:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Düzenleyicide aşağıdaki girdileri bulun ve değiştirin:
Geçerli değer Düzenle Kafka.Topic=PLACEHOLDER
PLACEHOLDER
öğesiniiotin
ile değiştirin. IoT hub'ından alınan iletiler konuya yerleştiriliriotin
.IotHub.EventHubCompatibleName=PLACEHOLDER
değerini Event Hub ile uyumlu adla değiştirin PLACEHOLDER
.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
değerini Event Hub ile uyumlu uç noktayla değiştirin PLACEHOLDER
.IotHub.AccessKeyName=PLACEHOLDER
PLACEHOLDER
öğesiniservice
ile değiştirin.IotHub.AccessKeyValue=PLACEHOLDER
değerini ilkenin birincil anahtarıyla service
değiştirinPLACEHOLDER
.IotHub.Partitions=PLACEHOLDER
değerini önceki adımlardan bölüm sayısıyla değiştirin PLACEHOLDER
.IotHub.StartTime=PLACEHOLDER
değerini UTC tarihiyle değiştirin PLACEHOLDER
. Bu tarih, bağlayıcının iletileri denetlemeye başladığı tarihtir. Tarih biçimi şeklindediryyyy-mm-ddThh:mm:ssZ
.BatchSize=100
100
öğesini5
ile değiştirin. Bu değişiklik, IoT hub'ında beş yeni ileti olduğunda bağlayıcının Kafka'ya ileti okumasına neden olur.Örnek yapılandırma için bkz. Azure IoT Hub için Kafka Bağlan Kaynak Bağlan veya.
Değişiklikleri kaydetmek için Ctrl + X, Y ve ardından Enter tuşlarını kullanın.
Bağlayıcı kaynağını yapılandırma hakkında daha fazla bilgi için bkz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md. .
Havuz bağlantısını yapılandırma
Havuz bağlantısını IoT Hub'ınızla çalışacak şekilde yapılandırmak için bir SSH bağlantısından kenar düğümüne aşağıdaki eylemleri gerçekleştirin:
Dizininde
connect-iothub-sink.properties
/usr/hdp/current/kafka-broker/config/
dosyanın bir kopyasını oluşturun. Dosyayı toketi-kafka-connect-iothub projesinden indirmek için aşağıdaki komutu kullanın:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
Dosyayı düzenlemek
connect-iothub-sink.properties
ve IoT hub bilgilerini eklemek için aşağıdaki komutu kullanın:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Düzenleyicide aşağıdaki girdileri bulun ve değiştirin:
Geçerli değer Düzenle topics=PLACEHOLDER
PLACEHOLDER
öğesiniiotout
ile değiştirin. Konuya yazılaniotout
iletiler IoT hub'ına iletilir.IotHub.ConnectionString=PLACEHOLDER
değerini ilkenin bağlantı dizesi service
ile değiştirinPLACEHOLDER
.Örnek yapılandırma için bkz. Azure IoT Hub için Kafka Bağlan Sink Bağlan or.
Değişiklikleri kaydetmek için Ctrl + X, Y ve ardından Enter tuşlarını kullanın.
Bağlayıcı havuzu yapılandırma hakkında daha fazla bilgi için bkz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. .
Kaynak bağlayıcıyı başlatma
Kaynak bağlayıcıyı başlatmak için SSH bağlantısından kenar düğümüne aşağıdaki komutu kullanın:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Bağlayıcı başlatıldıktan sonra cihazınızdan IoT hub'ına ileti gönderin. Bağlayıcı IoT hub'ından gelen iletileri okuyup Kafka konusunda depoladıkça, bilgileri konsola kaydeder:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Not
Bağlayıcı başlatılırken birkaç uyarı görebilirsiniz. Bu uyarılar IoT hub'ından ileti almayla ilgili sorunlara neden olmaz.
Ctrl + C tuşlarını iki kez kullanarak bağlayıcıyı birkaç dakika sonra durdurun. Bağlayıcının durması birkaç dakika sürer.
Havuz bağlayıcısını başlatma
Kenar düğümüne bir SSH bağlantısından havuz bağlayıcısını tek başına modda başlatmak için aşağıdaki komutu kullanın:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Bağlayıcı çalışırken, aşağıdaki metne benzer bilgiler görüntülenir:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Not
Bağlayıcı başlatılırken birkaç uyarıyla karşılaşabilirsiniz. Bunları güvenle yok sayabilirsiniz.
İleti gönderme
Bağlayıcı aracılığıyla ileti göndermek için aşağıdaki adımları kullanın:
Kafka kümesine ikinci bir SSH oturumu açın:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Yeni ssh oturumu için Kafka aracılarının adresini alın. PASSWORD değerini küme oturum açma parolası ile değiştirin ve komutunu girin:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
Konuya ileti göndermek için
iotout
aşağıdaki komutu kullanın:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
Bu komut sizi normal Bash istemine döndürmez. Bunun yerine konuya klavye girişi
iotout
gönderir.Cihazınıza ileti göndermek için, SSH oturumuna bir JSON belgesi yapıştırın
kafka-console-producer
.Önemli
Girdinin
"deviceId"
değerini cihazınızın kimliğine ayarlamanız gerekir. Aşağıdaki örnekte, cihaz olarak adlandırılmıştırmyDeviceId
:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
Bu JSON belgesinin şeması adresinde https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.mddaha ayrıntılı olarak açıklanmıştır.
Raspberry Pi simülasyon cihazını kullanıyorsanız ve çalışıyorsa cihaz aşağıdaki iletiyi günlüğe kaydeder::
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
Havuz bağlayıcısını kullanma hakkında daha fazla bilgi için bkz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. .
Sonraki adımlar
Bu belgede, HDInsight'ta IoT Kafka Bağlan veya başlatmak için Apache Kafka Bağlan API'sini kullanmayı öğrendiniz. Kafka ile çalışmanın diğer yollarını keşfetmek için aşağıdaki bağlantıları kullanın:
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin