Share via


Usar o Apache Kafka no HDInsight com o Hub IoT do Azure

Saiba como usar o conector Apache Kafka Connect do Hub IoT do Azure para mover dados entre o Apache Kafka no HDInsight e o Hub IoT do Azure. Neste documento, você aprenderá a executar o conector do Hub IoT a partir de um nó de borda no cluster.

A API do Kafka Connect permite implementar conectores que extraem dados continuamente para o Kafka ou enviam dados do Kafka para outro sistema. O Apache Kafka Connect Azure IoT Hub é um conector que extrai dados do Hub IoT do Azure para o Kafka. Ele também pode enviar dados de Kafka para o Hub IoT.

Ao extrair do Hub IoT, você usa um conector de origem . Ao enviar por push para o Hub IoT, você usa um conector de coletor . O conector do Hub IoT fornece os conectores de origem e coletor.

O diagrama a seguir mostra o fluxo de dados entre o Hub IoT do Azure e o Kafka no HDInsight ao usar o conector.

Image showing data flowing from IoT Hub to Kafka through the connector.

Para obter mais informações sobre a API Connect, consulte https://kafka.apache.org/documentation/#connect.

Pré-requisitos

Construa o conector

  1. Transfira a origem do https://github.com/Azure/toketi-kafka-connect-iothub/ conector para o seu ambiente local.

  2. Em um prompt de comando, navegue até o toketi-kafka-connect-iothub-master diretório. Em seguida, use o seguinte comando para criar e empacotar o projeto:

    sbt assembly
    

    A compilação leva alguns minutos para ser concluída. O comando cria um arquivo nomeado kafka-connect-iothub-assembly_2.11-0.7.0.jar no toketi-kafka-connect-iothub-master\target\scala-2.11 diretório para o projeto.

Instale o conector

  1. Carregue o arquivo .jar para o nó de borda do seu cluster Kafka no HDInsight. Edite o comando a seguir substituindo CLUSTERNAME pelo nome real do cluster. Os valores padrão para a conta de usuário SSH e o nome do nó de borda são usados e modificados conforme necessário.

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. Quando a cópia do arquivo for concluída, conecte-se ao nó de borda usando SSH:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Para instalar o conector no diretório Kafka libs , use o seguinte comando:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

Mantenha sua conexão SSH ativa para as etapas restantes.

Configurar o Apache Kafka

Da sua conexão SSH para o nó de borda, use as seguintes etapas para configurar o Kafka para executar o conector no modo autônomo:

  1. Configure a variável de senha. Substitua PASSWORD pela senha de login do cluster e digite o comando:

    export password='PASSWORD'
    
  2. Instale o utilitário jq . jq facilita o processamento de documentos JSON retornados de consultas Ambari. Introduza o seguinte comando:

    sudo apt -y install jq
    
  3. Obtenha o endereço dos corretores Kafka. Pode haver muitos corretores em seu cluster, mas você só precisa fazer referência a um ou dois. Para obter o endereço de dois hosts de broker, use o seguinte comando:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    Copie os valores para uso posterior. O valor devolvido é semelhante ao seguinte texto:

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. Obtenha o endereço dos nós do Apache Zookeeper. Há vários nós do Zookeeper no cluster, mas você só precisa fazer referência a um ou dois. Use o seguinte comando para armazenar os endereços na variável KAFKAZKHOSTS:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. Ao executar o conector no modo autônomo, o /usr/hdp/current/kafka-broker/config/connect-standalone.properties arquivo é usado para se comunicar com os corretores Kafka. Para editar o connect-standalone.properties arquivo, use o seguinte comando:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. Faça as seguintes edições:

    Valor atual Novo valor Comentário
    bootstrap.servers=localhost:9092 Substitua o localhost:9092 valor pelos hosts do broker da etapa anterior Configura a configuração autônoma para o nó de borda para localizar os corretores Kafka.
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter Essa alteração permite que você teste usando o produtor do console incluído no Kafka. Pode necessitar de fabricantes de embalagens diferentes para outros produtores e consumidores. Para obter informações sobre como usar outros valores do conversor, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter O mesmo que dado.
    N/A consumer.max.poll.records=10 Adicionar ao final do arquivo. Essa alteração é para evitar tempos limite no conector do coletor, limitando-o a 10 registros de cada vez. Para obter mais informações, veja https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
  7. Para guardar o ficheiro, utilize Ctrl + X, Y e, em seguida, Enter.

  8. Para criar os tópicos usados pelo conector, use os seguintes comandos:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    Para verificar se os iotin tópicos e iotout existem, use o seguinte comando:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    O iotin tópico é usado para receber mensagens do Hub IoT. O iotout tópico é usado para enviar mensagens para o Hub IoT.

Obter informações de conexão do Hub IoT

Para recuperar informações do hub IoT usadas pelo conector, use as seguintes etapas:

  1. Obtenha o ponto de extremidade compatível com o Hub de Eventos e o nome do ponto de extremidade compatível com o Hub de Eventos para seu hub IoT. Para obter essas informações, use um dos seguintes métodos:

    • No portal do Azure, use as seguintes etapas:

      1. Navegue até o Hub IoT e selecione Pontos de extremidade.

      2. Em Pontos de extremidade internos, selecione Eventos.

      3. Em Propriedades, copie o valor dos seguintes campos:

        • Nome compatível com o Hub de Eventos
        • Ponto de extremidade compatível com o Hub de Eventos
        • Partições

        Importante

        O valor do ponto de extremidade do portal pode conter texto extra que não é necessário neste exemplo. Extraia o texto que corresponde a este padrão sb://<randomnamespace>.servicebus.windows.net/.

    • Na CLI do Azure, use o seguinte comando:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      Substitua myhubname pelo nome do seu hub IoT. A resposta é semelhante ao seguinte texto:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. Obtenha a política e a chave de acesso compartilhado. Para este exemplo, use a chave de serviço . Para obter essas informações, use um dos seguintes métodos:

    • No portal do Azure, use as seguintes etapas:

      1. Selecione Políticas de acesso compartilhado e, em seguida, selecione serviço.
      2. Copie o valor da chave primária.
      3. Copie a cadeia de conexão - valor da chave primária.
    • Na CLI do Azure, use o seguinte comando:

      1. Para obter o valor da chave primária, use o seguinte comando:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        Substitua myhubname pelo nome do seu hub IoT. A resposta é a chave primária para a service política para este hub.

      2. Para obter a cadeia de conexão para a service política, use o seguinte comando:

        az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
        

        Substitua myhubname pelo nome do seu hub IoT. A resposta é a cadeia de conexão para a service política.

Configurar a conexão de origem

Para configurar a origem para trabalhar com seu Hub IoT, execute as seguintes ações de uma conexão SSH com o nó de borda:

  1. Crie uma cópia do connect-iot-source.properties arquivo no /usr/hdp/current/kafka-broker/config/ diretório. Para baixar o arquivo do projeto toketi-kafka-connect-iothub, use o seguinte comando:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. Para editar o connect-iot-source.properties arquivo e adicionar as informações do hub IoT, use o seguinte comando:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. No editor, localize e altere as seguintes entradas:

    Valor atual Editar
    Kafka.Topic=PLACEHOLDER Substituir PLACEHOLDER por iotin. As mensagens recebidas do hub IoT são colocadas no iotin tópico.
    IotHub.EventHubCompatibleName=PLACEHOLDER Substitua pelo nome compatível com o Hub de Eventos PLACEHOLDER .
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER Substitua pelo ponto de extremidade compatível com o Hub de Eventos PLACEHOLDER .
    IotHub.AccessKeyName=PLACEHOLDER Substituir PLACEHOLDER por service.
    IotHub.AccessKeyValue=PLACEHOLDER Substitua PLACEHOLDER pela chave primária da service política.
    IotHub.Partitions=PLACEHOLDER Substitua PLACEHOLDER pelo número de partições das etapas anteriores.
    IotHub.StartTime=PLACEHOLDER Substitua PLACEHOLDER por uma data UTC. Esta data é quando o conector começa a verificar se há mensagens. O formato de data é yyyy-mm-ddThh:mm:ssZ.
    BatchSize=100 Substituir 100 por 5. Essa alteração faz com que o conector leia mensagens no Kafka quando houver cinco novas mensagens no hub IoT.

    Para obter um exemplo de configuração, consulte Kafka Connect Source Connector for Azure IoT Hub.

  4. Para guardar as alterações, utilize Ctrl + X, Y e, em seguida, Enter.

Para obter mais informações sobre como configurar a origem do conector, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.

Configurar a conexão do coletor

Para configurar a conexão do coletor para funcionar com seu Hub IoT, execute as seguintes ações de uma conexão SSH com o nó de borda:

  1. Crie uma cópia do connect-iothub-sink.properties arquivo no /usr/hdp/current/kafka-broker/config/ diretório. Para baixar o arquivo do projeto toketi-kafka-connect-iothub, use o seguinte comando:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. Para editar o connect-iothub-sink.properties arquivo e adicionar as informações do hub IoT, use o seguinte comando:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. No editor, localize e altere as seguintes entradas:

    Valor atual Editar
    topics=PLACEHOLDER Substituir PLACEHOLDER por iotout. As mensagens gravadas no iotout tópico são encaminhadas para o hub IoT.
    IotHub.ConnectionString=PLACEHOLDER Substitua PLACEHOLDER pela cadeia de conexão da service política.

    Para obter um exemplo de configuração, consulte Kafka Connect Sink Connector for Azure IoT Hub.

  4. Para guardar as alterações, utilize Ctrl + X, Y e, em seguida, Enter.

Para obter mais informações sobre como configurar o coletor de conectores, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Inicie o conector de origem

  1. Para iniciar o conector de origem, use o seguinte comando de uma conexão SSH para o nó de borda:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    Assim que o conector for iniciado, envie mensagens para o hub IoT a partir do(s) seu(s) dispositivo(s). À medida que o conector lê mensagens do hub IoT e as armazena no tópico Kafka, ele registra informações no console:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    Nota

    Você pode ver vários avisos quando o conector é iniciado. Esses avisos não causam problemas com o recebimento de mensagens do hub IoT.

  2. Pare o conector após alguns minutos usando Ctrl + C duas vezes. Demora alguns minutos para o conector parar.

Inicie o conector do coletor

De uma conexão SSH para o nó de borda, use o seguinte comando para iniciar o conector do coletor no modo autônomo:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

À medida que o conector é executado, informações semelhantes ao seguinte texto são exibidas:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Nota

Você pode notar vários avisos quando o conector é iniciado. Pode ignorar com segurança estes avisos.

Enviar mensagens

Para enviar mensagens através do conector, use as seguintes etapas:

  1. Abra uma segunda sessão SSH para o cluster Kafka:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Obtenha o endereço dos corretores Kafka para a nova sessão ssh. Substitua PASSWORD pela senha de login do cluster e digite o comando:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. Para enviar mensagens para o iotout tópico, use o seguinte comando:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    Este comando não retorna ao prompt Bash normal. Em vez disso, ele envia a entrada do teclado para o iotout tópico.

  4. Para enviar uma mensagem para o seu dispositivo, cole um documento JSON na sessão SSH do kafka-console-producer.

    Importante

    Você deve definir o valor da "deviceId" entrada para o ID do seu dispositivo. No exemplo a seguir, o dispositivo é chamado myDeviceId:

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    O esquema para este documento JSON é descrito com mais detalhes em https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Se você estiver usando o dispositivo Raspberry Pi simulado e ele estiver em execução, o dispositivo registrará a seguinte mensagem.:

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

Para obter mais informações sobre como usar o conector do coletor, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Próximos passos

Neste documento, você aprendeu como usar a API Apache Kafka Connect para iniciar o IoT Kafka Connector no HDInsight. Use os links a seguir para descobrir outras maneiras de trabalhar com Kafka: