Share via


Usare Apache Kafka in HDInsight con l'hub IoT

Informazioni su come usare il connettore di Apache Kafka Connect Azure IoT Hub per spostare dati tra Apache Kafka in HDInsight e l'hub IoT di Azure. In questo documento sono incluse informazioni su come eseguire il connettore dell'hub IoT da un nodo perimetrale del cluster.

L'API Kafka Connect consente di implementare i connettori che eseguono in modo continuativo il pull dei dati in Kafka o eseguono il push dei dati da Kafka a un altro sistema. Apache Kafka Connect Azure IoT Hub è un connettore che esegue il pull dei dati dall'hub IoT di Azure a Kafka. Può anche eseguire il push dei dati da Kafka all'hub IoT.

Quando si esegue il pull dall'hub IoT, si usa un connettore di origine. Quando si esegue il push all'hub IoT, si usa un connettore sink. Il connettore hub IoT offre sia il connettore di origine che il connettore sink.

Nel diagramma seguente viene illustrato il flusso di dati tra l'hub IoT e Kafka in HDInsight quando si usa il connettore.

Image showing data flowing from IoT Hub to Kafka through the connector.

Per altre informazioni sull'API Connect, vedere https://kafka.apache.org/documentation/#connect.

Prerequisiti

Compilare il connettore

  1. Scaricare l'origine per il connettore da https://github.com/Azure/toketi-kafka-connect-iothub/ all'ambiente locale.

  2. Da un prompt dei comandi passare alla toketi-kafka-connect-iothub-master directory. Usare quindi il comando seguente per compilare e creare il pacchetto del progetto:

    sbt assembly
    

    Il completamento della compilazione richiede alcuni minuti. Il comando crea un file denominato kafka-connect-iothub-assembly_2.11-0.7.0.jar nella toketi-kafka-connect-iothub-master\target\scala-2.11 directory per il progetto.

Installare il connettore

  1. Caricare il file .jar nel nodo perimetrale del cluster Kafka in HDInsight. Modificare il comando seguente sostituendo CLUSTERNAME con il nome effettivo del cluster. I valori predefiniti per l'account utente SSH e il nome del nodo perimetrale vengono usati e modificati in base alle esigenze.

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. Una volta completata la copia del file, connettersi al nodo perimetrale mediante SSH:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Per installare il connettore nella directory libs di Kafka, usare il comando seguente:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

Mantenere attiva la connessione SSH per i passaggi rimanenti.

Configurare Apache Kafka

Dalla connessione SSH al nodo perimetrale, seguire questa procedura per configurare Kafka per eseguire il connettore in modalità autonoma:

  1. Configurare la variabile di password. Sostituire PASSWORD con la password di accesso del cluster, quindi immettere il comando :

    export password='PASSWORD'
    
  2. Installare l'utilità jq . jq semplifica l'elaborazione dei documenti JSON restituiti dalle query Ambari. Immettere il comando seguente:

    sudo apt -y install jq
    
  3. Ottenere l'indirizzo dei broker Kafka. Nel cluster possono esserci molti broker, ma è sufficiente fare riferimento a uno o due di essi. Per ottenere l'indirizzo di due host di broker, usare il comando seguente:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    Copiare i valori per usarli in un secondo momento. Il valore restituito è simile al testo seguente:

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. Ottenere l'indirizzo dei nodi Apache ZooKeeper. Esistono diversi nodi Zookeeper nel cluster, ma è sufficiente fare riferimento a uno o due di essi. Usare il comando seguente per archiviare gli indirizzi nella variabile KAFKAZKHOSTS:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. Quando si esegue il connettore in modalità autonoma, viene usato il file /usr/hdp/current/kafka-broker/config/connect-standalone.properties per comunicare con i broker Kafka. Per modificare il file connect-standalone.properties, usare il comando seguente:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. Apportare le modifiche seguenti:

    Valore corrente Nuovo valore Commento
    bootstrap.servers=localhost:9092 Sostituire il localhost:9092 valore con gli host broker del passaggio precedente Configura la configurazione autonoma per il nodo perimetrale per trovare i broker Kafka.
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter La modifica consente di eseguire test usando il producer di console incluso in Kafka. Potrebbero essere necessari diversi convertitori per altri producer e consumer. Per informazioni sull'uso di altri valori del convertitore, vedere https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter Uguale a quanto specificato.
    N/D consumer.max.poll.records=10 Aggiungi alla fine del file. Questa modifica è finalizzata a evitare i timeout nel connettore sink mediante l'impostazione di un limite di 10 record alla volta. Per ulteriori informazioni, vedere https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
  7. Per salvare il file, usare Ctrl + X, Y e INVIO.

  8. Per creare gli argomenti usati dal connettore, usare i comandi seguenti:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    Per verificare che gli argomenti iotin e iotout esistano, usare il comando seguente:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    L'argomento iotin viene usato per ricevere messaggi dall'hub IoT. L'argomento iotout viene usato per inviare messaggi all'hub IoT.

Ottenere informazioni di connessione dell'hub IoT

Per recuperare informazioni sull'hub IoT usato dal connettore, attenersi alla procedura seguente:

  1. Ottenere l'endpoint compatibile con hub eventi e il nome dell'endpoint compatibile con hub eventi per l'hub IoT. Per ottenere tali informazioni, usare uno dei metodi seguenti:

    • Dal portale di Azure, attenersi alla procedura seguente:

      1. Passare all'hub IoT e selezionare Endpoint.

      2. Da Endpoint predefiniti, selezionare Eventi.

      3. Da Proprietà, copiare il valore dei campi seguenti:

        • Nome compatibile con l'hub eventi
        • Endpoint compatibile con l'hub eventi
        • Partitions

        Importante

        Il valore dell'endpoint del portale può contenere testo aggiuntivo che non è necessario in questo esempio. Estrarre il testo che corrisponde al criterio sb://<randomnamespace>.servicebus.windows.net/.

    • Dall'interfaccia della riga di comando di Azure, usare il comando seguente:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      Sostituire myhubname con il nome dell'hub IoT. La risposta restituita è simile al testo seguente:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. Ottenere i criteri di accesso condiviso e la chiave. Per questo esempio, usare la chiave service. Per ottenere tali informazioni, usare uno dei metodi seguenti:

    • Dal portale di Azure, attenersi alla procedura seguente:

      1. Selezionare Criteri di accesso condiviso e quindi service.
      2. Copiare il valore di Chiave primaria.
      3. Copiare il valore di Stringa di connessione - chiave primaria.
    • Dall'interfaccia della riga di comando di Azure, usare il comando seguente:

      1. Per ottenere il valore della chiave primaria, usare il comando seguente:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        Sostituire myhubname con il nome dell'hub IoT. La risposta è la chiave primaria dei criteri service per l'hub.

      2. Per ottenere la stringa di connessione per i criteri service, usare il comando seguente:

        az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
        

        Sostituire myhubname con il nome dell'hub IoT. La risposta è la stringa di connessione per i criteri service.

Configurare la connessione di origine

Per configurare l'origine per l'hub IoT, eseguire le azioni seguenti da una connessione SSH al nodo perimetrale:

  1. Creare una copia del file connect-iot-source.properties nella directory /usr/hdp/current/kafka-broker/config/. Per scaricare il file dal progetto toketi-kafka-connect-iothub, usare il comando seguente:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. Per modificare il file connect-iot-source.properties e aggiungere le informazioni dell'hub IoT, usare il comando seguente:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. Nell'editor individuare e modificare le voci seguenti:

    Valore corrente Modifica
    Kafka.Topic=PLACEHOLDER Sostituisci PLACEHOLDER con iotin. I messaggi ricevuti dall'hub IoT vengono salvati nell'argomento iotin.
    IotHub.EventHubCompatibleName=PLACEHOLDER sostituire PLACEHOLDER con il nome compatibile con l'hub eventi.
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER sostituire PLACEHOLDER con l'endpoint compatibile con l'hub eventi.
    IotHub.AccessKeyName=PLACEHOLDER Sostituisci PLACEHOLDER con service.
    IotHub.AccessKeyValue=PLACEHOLDER sostituire PLACEHOLDER con la chiave primaria dei criteri service.
    IotHub.Partitions=PLACEHOLDER sostituire PLACEHOLDER con il numero di partizioni dei passaggi precedenti.
    IotHub.StartTime=PLACEHOLDER sostituire PLACEHOLDER con una data UTC. Tale data indica il momento in cui il connettore inizia a controllare i messaggi. Il formato della data è yyyy-mm-ddThh:mm:ssZ.
    BatchSize=100 Sostituisci 100 con 5. Questa modifica fa sì che il connettore legga i messaggi in Kafka quando sono presenti cinque nuovi messaggi nell'hub IoT.

    Per un esempio di configurazione, vedere Kafka Connessione Source Connessione or per hub IoT di Azure.

  4. Per salvare le modifiche, usare CTRL + X, Y e quindi INVIO.

Per altre informazioni sulla configurazione dell'origine del connettore, vedere https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.

Configurare la connessione sink

Per configurare la connessione sink per l'hub IoT, eseguire le azioni seguenti da una connessione SSH al nodo perimetrale:

  1. Creare una copia del file connect-iothub-sink.properties nella directory /usr/hdp/current/kafka-broker/config/. Per scaricare il file dal progetto toketi-kafka-connect-iothub, usare il comando seguente:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. Per modificare il file connect-iothub-sink.properties e aggiungere le informazioni dell'hub IoT, usare il comando seguente:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. Nell'editor individuare e modificare le voci seguenti:

    Valore corrente Modifica
    topics=PLACEHOLDER Sostituisci PLACEHOLDER con iotout. I messaggi scritti nell'argomento iotout vengono inoltrati all'hub IoT.
    IotHub.ConnectionString=PLACEHOLDER sostituire PLACEHOLDER con la stringa di connessione per i criteri service.

    Per un esempio di configurazione, vedere Connessione sink kafka Connessione or per hub IoT di Azure.

  4. Per salvare le modifiche, usare CTRL + X, Y e quindi INVIO.

Per altre informazioni sulla configurazione del sink del connettore, vedere https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Avviare il connettore di origine

  1. Per avviare il connettore di origine, usare il comando seguente da una connessione SSH al nodo perimetrale:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    Una volta avviato il connettore, inviare messaggi all'hub IoT dal proprio dispositivo. Quando il connettore legge i messaggi dall'hub IoT e li archivia nell'argomento di Kafka, registra informazioni nella console:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    Nota

    All'avvio del connettore è possibile che vengano visualizzati diversi avvisi. Tali avvisi non causano problemi alla ricezione dei messaggi dall'hub IoT.

  2. Arrestare il connettore dopo alcuni minuti usando CTRL+C due volte. L'arresto del connettore richiede alcuni minuti.

Avviare il connettore sink

Per avviare il connettore sink in modalità autonoma, usare il comando seguente da una connessione SSH al nodo perimetrale:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

Quando viene avviata l'esecuzione del connettore, vengono visualizzate informazioni simili al testo seguente:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Nota

All'avvio del connettore è possibile che vengano visualizzati diversi avvisi. È possibile ignorare questi avvisi.

Inviare messaggi

Per inviare messaggi tramite il connettore, attenersi alla procedura seguente:

  1. Aprire una seconda sessione SSH per il cluster Kafka:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Ottenere l'indirizzo dei broker Kafka per la nuova sessione SSH. Sostituire PASSWORD con la password di accesso del cluster, quindi immettere il comando :

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. Per inviare messaggi all'argomento iotout, usare il comando seguente:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    Questo comando non restituisce il normale prompt di Bash. Invece, invia input da tastiera all'argomento iotout.

  4. Per inviare un messaggio al dispositivo, incollare un documento con estensione json nella sessione SSH per kafka-console-producer.

    Importante

    È necessario impostare il valore della voce "deviceId" sull'ID del dispositivo. Nell'esempio seguente il dispositivo è denominato myDeviceId:

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    Lo schema per il documento con estensione json è descritto in modo più dettagliato all'indirizzo https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Se si usa il dispositivo Raspberry Pi simulato ed è in esecuzione, il dispositivo registra il messaggio seguente:

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

Per altre informazioni sull'uso del connettore sink, vedere https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Passaggi successivi

In questo documento è stato descritto come usare l'API Apache Kafka Connect per avviare il connettore IoT per Kafka in HDInsight. Per trovare altri modi per lavorare con Kafka, vedere i collegamenti seguenti: