Compartilhar via


Use o Apache Kafka no HDInsight com o Microsoft Azure Hub IoT

Aprenda como usar o conector do Hub IoT do Azure do Apache Kafka Connect para mover dados entre o Apache Kafka no HDInsight e o Azure IoT Hub. Neste documento, você aprenderá a executar o conector do Hub IoT de um nó de borda do cluster.

A API de conexão do Kafka permite que você implemente os conectores que continuamente extraem dados para Kafka ou enviam dados do Kafka para outro sistema. O Hub Apache Kafka Connect do Azure IoT é um conector que extrai dados do Hub do Azure IoT para o Kafka. Também pode enviar dados do Kafka para o Hub IoT.

Quando efetuar pull do Hub IoT, você usa um conector de fonte. Quando efetuar pull do Hub IoT, você usa um conector de coletor. O conector de Hub IoT fornece a origem e os conectores de coletor.

O diagrama a seguir mostra o fluxo de dados entre o Hub IoT do Microsoft zure e Kafka no HDInsight ao usar o conector.

Image showing data flowing from IoT Hub to Kafka through the connector.

Para obter mais informações sobre a estrutura a API de Conexão, consulte https://kafka.apache.org/documentation/#connect.

Pré-requisitos

Compilar o conector

  1. Baixe a fonte para o conector em https://github.com/Azure/toketi-kafka-connect-iothub/ para seu ambiente local.

  2. Em um prompt de comando, navegue até o diretório toketi-kafka-connect-iothub-master. Em seguida, use o seguinte comando para criar e empacotar o projeto:

    sbt assembly
    

    A compilação leva alguns minutos para ser concluído. Esse comando cria um arquivo chamado kafka-connect-iothub-assembly_2.11-0.7.0.jar no diretório toketi-kafka-connect-iothub-master\target\scala-2.11 do projeto.

Instalar o conector

  1. Carregue o arquivo .jar no nó de borda do Kafka no cluster do HDInsight. Edite o comando a seguir substituindo CLUSTERNAME pelo nome real do cluster. Os valores padrão para a conta de usuário SSH e o nome do nó de borda são usados e modificados conforme necessário.

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. Uma vez concluída a cópia do arquivo, conecte-se ao nó de borda usando SSH:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Para instalar o conector ao diretório do Kafka libs, use o seguinte comando:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

Mantenha a conexão SSH ativa para as etapas restantes.

Configure Apache Kafka

De uma conexão SSH para o nó de borda, use as etapas a seguir para configurar o Kafka para executar o conector no modo autônomo:

  1. Configurar variável de senha. Substitua PASSWORD pela senha de logon do cluster e insira o comando:

    export password='PASSWORD'
    
  2. Instale o utilitário jq. O utilitário jq torna mais fácil processar documentos JSON retornados de consultas do Ambari. Insira o seguinte comando:

    sudo apt -y install jq
    
  3. Coletar endereço IP do agente do Kafka. Pode haver muitos agentes no seu cluster, mas você só precisa fazer referência a um ou dois. Para obter o endereço dos dois hosts de agente, use o seguinte comando:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    Copie os valores para uso posterior. O valor retornado é semelhante ao seguinte texto:

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. Obter o endereço de nós de Apache Zookeeper. Há vários nós do Zookeeper no cluster, mas você só precisa referenciar um ou dois. Use o seguinte comando para armazenar os endereços na variável KAFKAZKHOSTS:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. Ao executar o conector no modo autônomo, o /usr/hdp/current/kafka-broker/config/connect-standalone.properties arquivo é usado para se comunicar com os agentes de Kafka. Para editar esse arquivo connect-standalone.properties, use o seguinte comando:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. Faça as seguintes edições:

    Valor atual Novo valor Comentário
    bootstrap.servers=localhost:9092 Substitua o valor localhost:9092 pelos hosts de agente da etapa anterior Define a configuração autônoma para o nó de borda para localizar os agentes Kafka.
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter Essa alteração permite que você teste usando o produtor do console incluído com Kafka. Talvez seja necessário conversores diferentes para outros produtores e consumidores. Para obter informações sobre como usar outros valores de conversor, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter O mesmo que foi dado.
    N/D consumer.max.poll.records=10 Adicione ao final do arquivo. Essa alteração é evitar os tempos limite no conector do coletor limitando a 10 registros por vez. Para obter mais informações, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
  7. Para salvar o arquivo, use Ctrl + X, Y e, em seguida, Enter.

  8. Para criar os tópicos usados pela conector use os comandos a seguir:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    Para verificar que os tópicos iotin e iotout existem, use o comando a seguir:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    O tópico iotin é usado para receber mensagens de Hub IoT. O tópico iotout é usado para enviar mensagens de Hub IoT.

Obter a cadeia de conexão do Hub IoT

Para recuperar informações de Hub IoT usadas pelo conector, use as seguintes etapas:

  1. Obtenha o ponto de extremidade compatível com o evento de Hub e nome de ponto de extremidade compatível para o seu Hub IoT. Para obter essas informações, use um dos métodos a seguir:

    • Do portal do Azure, use as etapas a seguir:

      1. Navegue até seu Hub IoT e clique em Pontos de Extremidade.

      2. Em Pontos de extremidade internos, selecione Eventos.

      3. De Propriedades, copie o valor dos campos a seguir:

        • Nome compatível com o Hub de eventos
        • Ponto de extremidade compatível com o hub de eventos
        • Partições

        Importante

        O valor de ponto de extremidade do portal pode conter texto extra que não é necessário neste exemplo. Extrair o texto que corresponde a esse padrão sb://<randomnamespace>.servicebus.windows.net/.

    • Na CLI do Azure digite o seguinte comando:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      Substitua myhubname pelo nome do seu Hub IoT. A resposta é semelhante ao texto a seguir:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. Obtenha a política de acesso compartilhado e chave. Para este exemplo, use a chave de serviço. Para obter essas informações, use um dos métodos a seguir:

    • Do portal do Azure, use as etapas a seguir:

      1. Selecione Políticas de acesso compartilhado e selecione serviço.
      2. Copie o valor da chave primária.
      3. Copie o valor de Cadeia de conexão – chave primária.
    • Na CLI do Azure digite o seguinte comando:

      1. Para obter o valor de chave primária, use o seguinte comando:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        Substitua myhubname pelo nome do seu Hub IoT. A resposta é a chave primária para o service política para esse hub.

      2. Para obter a cadeia de conexão para a service política, use o seguinte comando:

        az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
        

        Substitua myhubname pelo nome do seu Hub IoT. A resposta e a string de conexão para a política service.

Configurar a conexão da fonte

Para configurar a fonte para trabalhar com o Hub IoT, execute as seguintes ações de uma conexão SSH para o nó de borda:

  1. Criar uma cópia do arquivo connect-iot-source.properties no /usr/hdp/current/kafka-broker/config/ diretório. Para baixar o arquivo do projeto toketi-kafka-connect-iothub, use o seguinte comando:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. Para editar o arquivo connect-iot-source.properties e adicionar as informações de Hub IoT, use o seguinte comando:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. No editor, localize e altere as seguintes entradas:

    Valor atual Editar
    Kafka.Topic=PLACEHOLDER Substitua PLACEHOLDER por iotin. Mensagens recebidas do Hub IoT são colocadas no iotin tópico.
    IotHub.EventHubCompatibleName=PLACEHOLDER Substitua PLACEHOLDER pelo Hub de Eventos - nome compatível.
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER Substitua PLACEHOLDER pelo Hub de Eventos - ponto de extremidade compatível.
    IotHub.AccessKeyName=PLACEHOLDER Substitua PLACEHOLDER por service.
    IotHub.AccessKeyValue=PLACEHOLDER Substitua PLACEHOLDER pela chave primária da service política.
    IotHub.Partitions=PLACEHOLDER Substitua PLACEHOLDER pelo número de partições nas etapas anteriores.
    IotHub.StartTime=PLACEHOLDER Substitua PLACEHOLDER por uma data UTC. Esta data é quando o conector inicia a verificação de mensagens. O formato de data é yyyy-mm-ddThh:mm:ssZ.
    BatchSize=100 Substitua 100 por 5. Essa alteração faz com que o conector lei as mensagens em Kafka, uma vez que há cinco novas mensagens no Hub IoT.

    Para obter um exemplo de configuração, veja Conector de origem do Kafka Connect para o Hub IoT do Azure.

  4. Para salvar as alterações, use Ctrl + X, Y e, em seguida, Enter.

Para obter mais informações sobre como configurar a fonte do connector, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.

Configurar a conexão do coletor

Para configurar a conexão do coletor para trabalhar com o Hub IoT, execute as seguintes ações de uma conexão SSH para o nó de borda:

  1. Criar uma cópia do arquivo connect-iothub-sink.properties no /usr/hdp/current/kafka-broker/config/ diretório. Para baixar o arquivo do projeto toketi-kafka-connect-iothub, use o seguinte comando:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. Para editar o arquivo connect-iothub-sink.properties e adicionar as informações de Hub IoT, use o seguinte comando:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. No editor, localize e altere as seguintes entradas:

    Valor atual Editar
    topics=PLACEHOLDER Substitua PLACEHOLDER por iotout. Mensagens gravadas no iotout tópico são encaminhadas para o Hub IoT.
    IotHub.ConnectionString=PLACEHOLDER Substitua PLACEHOLDER pela cadeia de conexão obtida para a service política.

    Para obter um exemplo de configuração, veja Conector do coletor do Kafka Connect para o Hub IoT do Azure.

  4. Para salvar as alterações, use Ctrl + X, Y e, em seguida, Enter.

Para obter mais informações sobre como configurar a fonte do conector, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Iniciar o conector de origem

  1. Para iniciar o conector de origem, use o comando a seguir a partir de uma conexão SSH para o nó de borda:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    Depois que o conector é iniciado, envie mensagens para o Hub IoT do seu dispositivo. Como o conector lê mensagens do Hub IoT e os armazena no tópico Kafka, ele registra informações no console:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    Observação

    Você pode ver avisos que o conector for iniciado. Esses avisos não causam problemas para receber mensagens de hub IoT.

  2. Pare o conector após alguns minutos usando Ctrl + C duas vezes. Leva alguns minutos para que o conector pare.

Iniciar o coletor do conector

Para uma conexão SSH ao nó de conexão, use o comando a seguir para iniciar o conector do coletor no modo autônomo:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

Informações semelhantes ao seguinte texto serão exibidas são exibidas:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Observação

Você pode ver vários avisos que o conector foi iniciado. Você pode ignorá-los com segurança.

Enviar mensagens

Para enviar mensagens por meio do conector, use as seguintes etapas:

  1. Abra uma segunda sessão SSH para o cluster Kafka:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Obtenha o endereço dos agentes Kafka para a nova sessão SSH. Substitua PASSWORD pela senha de logon do cluster e insira o comando:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. Para enviar mensagens ao tópico iotout, use o comando a seguir:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    Esse comando não retorna ao prompt do Bash normal. Em vez disso, envia a entrada do teclado para o tópico iotout.

  4. Para enviar uma mensagem para o seu dispositivo, colar um documento JSON para a sessão SSH para a kafka-console-producer.

    Importante

    Você deve definir o valor de "deviceId" entrada para a ID do dispositivo. O exemplo a seguir, o dispositivo é denominado myDeviceId:

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    O esquema para este documento JSON é descrito mais detalhadamente no https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Se você estiver usando o dispositivo Raspberry Pi simulado e ele estiver em execução, o dispositivo registrará a seguinte mensagem:

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

Para obter mais informações sobre como usar o conector do coletor, confira https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Próximas etapas

Neste documento, você aprendeu a usar a API do Apache Kafka Connect para iniciar o IoT Kafka Connector no HDInsight. Use os links a seguir para descobrir outras maneiras de trabalhar com Kafka: