Exercício – Criar o produtor do Kafka

Concluído

Agora que os clusters Kafka e Spark foram implantados, vamos adicionar um produtor do Kafka ao nó de cabeçalho do Kafka. Este produtor é um estimulador de preço de estoque, que produz preços de estoque artificiais.

Baixar o exemplo

  1. No navegador da Internet, acesse https://github.com/Azure/hdinsight-mslearn e baixe ou clone o exemplo localmente se você ainda não fez isso em um módulo anterior.
  2. Abra o arquivo Streaming\python-producer-simulator-template.py Estruturado do Spark localmente.

Recuperar as URLs do agente Kafka

A seguir, você precisa recuperar as URLs do agente Kafka usando o ssh no cabeçalho e adicionando as URLs ao arquivo Python.

  1. Para conectar-se ao nó principal primário do cluster Apache Kafka, você precisará realizar ssh no nó. A Azure Cloud Shell no portal do Azure é a maneira recomendada de conectar-se. No portal do Azure, clique no botão Azure Cloud Shell na barra de ferramentas superior e selecione Bash. Você também pode usar um prompt de comando habilitado para ssh, como o Git Bash.

  2. Se você não tiver usado o Azure Cloud Shell antes, será exibida uma notificação informando de que não há nenhum armazenamento montado. Selecione sua assinatura do Azure na caixa Assinatura e clique em Criar Armazenamento.

  3. No prompt de nuvem, cole o comando a seguir. Substitua sshuser pelo nome de usuário SSH. Substitua kafka-mslearn-stock pelo nome do cluster de Apache Kafka e observe que você deve incluir -ssh após o nome do cluster.

    ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
    
  4. Quando você se conectar pela primeira vez ao cluster, seu cliente SSH poderá exibir um aviso de que a autenticidade do host não pode ser estabelecida. Quando for solicitado, digite sim e pressione Enter para adicionar o host à lista de servidores confiáveis do cliente SSH.

  5. Quando solicitado, insira a senha do usuário SSH.

    Após a conexão, você verá informações semelhantes ao seguinte texto:

        Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64)
    
        * Documentation:  https://help.ubuntu.com
        * Management:     https://landscape.canonical.com
        * Support:        https://ubuntu.com/advantage
    
        * Overheard at KubeCon: "microk8s.status just blew my mind".
    
            https://microk8s.io/docs/commands#microk8s.status
    
        0 packages can be updated.
        0 updates are security updates.
    
    
    
        Welcome to Kafka on HDInsight.
    
    
        The programs included with the Ubuntu system are free software;
        the exact distribution terms for each program are described in the
        individual files in /usr/share/doc/*/copyright.
    
        Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
        applicable law.
    
        To run a command as administrator (user "root"), use "sudo <command>".
        See "man sudo_root" for details.
    
  6. Instale jq, um processador JSON de linha de comando. Esse utilitário é usado para analisar documentos JSON e é útil para analisar as informações do host. Na conexão SSH aberta, digite o seguinte comando para instalar o jq:

    sudo apt -y install jq
    
  7. Configurar variável de senha. Substitua PASSWORD pela senha de logon do cluster e insira o comando:

    export password='PASSWORD'
    
  8. Extraia o nome do cluster com grafia correta de maiúsculas e minúsculas. A grafia de maiúsculas e minúsculas real do nome do cluster pode ser diferente do esperado, dependendo de como o cluster foi criado. Esse comando obterá a grafia de maiúsculas e minúsculas real e a armazenará em uma variável. Insira o seguinte comando:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Esse comando não tem resposta.

  9. Para definir uma variável de ambiente com informações de host Zookeeper, use o comando especificado abaixo. O comando recupera todos os hosts Zookeeper e retorna apenas as duas primeiras entradas. Isso ocorre porque você deseja certa redundância no caso de um host ficar inacessível.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    Observação

    Este comando requer acesso do Ambari. Se o cluster estiver atrás de um NSG, execute esse comando em um computador que possa acessar o Ambari.

    Esse comando também não tem resposta.

  10. Para verificar se a variável de ambiente é definida corretamente, use o seguinte comando:

    echo $KAFKAZKHOSTS
    

    Esse comando retorna informações semelhantes ao seguinte texto:

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  11. Para definir uma variável de ambiente com informações de host agente do Apache Kafka, use o seguinte comando:

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Observação

    Este comando requer acesso do Ambari. Se o cluster estiver atrás de um NSG, execute esse comando em um computador que possa acessar o Ambari.

    Esse comando não tem nenhuma saída.

  12. Para verificar se a variável de ambiente é definida corretamente, use o seguinte comando:

    echo $KAFKABROKERS
    

    Esse comando retorna informações semelhantes ao seguinte texto:

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

  13. Copie um dos valores do agente Kafka retornados na etapa anterior para o arquivo python-producer-simulator-template.py na linha 19 e inclua aspas simples em torno do valor, por exemplo:

    kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
    
  14. Salve o arquivo python-producer-simulator-template-simulator-template.py.

  15. De volta à janela conexão SSH, use o comando a seguir para criar um tópico.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
    

Esse comando se conecta ao ZooKeeper usando as informações de host armazenadas em $KAFKAZKHOSTS. A seguir, ele cria um tópico de Apache Kafka chamado stockVals, para corresponder ao nome do tópico em python-producer-simulator-template.py.

Copie o arquivo do Python para o nó principal e execute o arquivo para transmitir dados

  1. Em uma nova janela do Git, navegue até a localização do arquivo python-producer-simulator-template.py e copie-o do computador local para o nó principal primário usando o comando a seguir. Substitua kafka-mslearn-stock pelo nome do cluster de Apache Kafka e observe que você deve incluir -ssh após o nome do cluster.

    scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
    

    Quando for perguntado se você deseja continuar a conexão, digite yes. No prompt, insira a senha do cluster. Depois que o arquivo é transferido, a saída a seguir é exibida.

    python-producer-simulator-template.py    100% 1896    71.9KB/s   00:00
    
  2. Agora, volte para o prompt de comando do Azure em que você recuperou as informações do agente e execute o seguinte comando para instalar o Kafka:

    sudo pip install kafka-python
    

    Depois que o Kafka for instalado com êxito, a saída a seguir será exibida.

    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.7
    
  3. Na mesma janela, instale as solicitações usando o seguinte comando:

    sudo apt-get install python-requests
    
  4. Quando perguntado "Após esta operação, serão usados 4.327 kB de espaço em disco adicional. Deseja continuar? [Y/n]”, digite y.

    Quando as solicitações estiverem instaladas com êxito, uma saída semelhante à mostrada abaixo será exibida.

    Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ...
    Setting up python-requests (2.9.1-3ubuntu0.1) ...
    
  5. Na mesma janela, use o comando a seguir para executar o arquivo do Python

    python python-producer-simulator-template.py
    

    Será exibida uma saída semelhante à seguinte:

    No loops argument provided. Default loops are 1000
    Running in simulated mode
    [
    {
        "symbol": "MSFT",
        "size": 355,
        "price": 147.205,
        "time": 1578029521022
    },
    {
        "symbol": "BA",
        "size": 345,
        "price": 352.607,
        "time": 1578029521022
    },
    {
        "symbol": "JNJ",
        "size": 58,
        "price": 142.043,
        "time": 1578029521022
    },
    {
        "symbol": "F",
        "size": 380,
        "price": 8.545,
        "time": 1578029521022
    },
    {
        "symbol": "TSLA",
        "size": 442,
        "price": 329.342,
        "time": 1578029521022
    },
    {
        "symbol": "BAC",
        "size": 167,
        "price": 32.921,
        "time": 1578029521022
    },
    {
        "symbol": "GE",
        "size": 222,
        "price": 11.115,
        "time": 1578029521022
    },
    {
        "symbol": "MMM",
        "size": 312,
        "price": 174.643,
        "time": 1578029521022
    },
    {
        "symbol": "INTC",
        "size": 483,
        "price": 54.978,
        "time": 1578029521022
    },
    {
        "symbol": "WMT",
        "size": 387,
        "price": 120.355,
        "time": 1578029521022
    }
    ]
    stockVals
    2
    0
    stockVals
    1
    0
    stockVals
    3
    0
    stockVals
    2
    1
    stockVals
    7
    0
    stockVals
    7
    1
    stockVals
    1
    1
    stockVals
    4
    0
    stockVals
    4
    1
    stockVals
    1
    2
    

Essa saída fornece os preços de ações simulados para as ações listadas no arquivo python-producer-simulated-template.py seguidos do tópico, da partição e do deslocamento da mensagem no tópico. Você pode ver que sempre que o produtor é disparado (a cada segundo), um novo lote de preços de estoque é gerado e cada nova mensagem é adicionada a uma partição em um determinado deslocamento.