Esercizio - Creare il producer Kafka

Completato

Ora che sono stati distribuiti i cluster Kafka e Spark, è possibile aggiungere un producer Kafka al nodo head Kafka. Questo producer è un simulatore di prezzi di titoli azionari, che genera prezzi fittizi.

Scaricare l'esempio

  1. Nel browser Internet passare a https://github.com/Azure/hdinsight-mslearn e scaricare o clonare l'esempio in locale, se non è già stato fatto in un modulo precedente.
  2. Aprire il file Spark Structured Streaming\python-producer-simulator-template.py in locale.

Recuperare gli URL dei broker Kafka

A questo punto, è necessario recuperare gli URL dei broker Kafka usando ssh sul nodo head e aggiungendo gli URL al file python.

  1. Per connettersi al nodo head primario del cluster di Apache Kafka, è necessario eseguire ssh nel nodo. Per connettersi, è consigliabile usare Azure Cloud Shell nel portale di Azure. Nel portale di Azure fare clic sul pulsante Azure Cloud Shell sulla barra degli strumenti superiore e selezionare Bash. È anche possibile usare un prompt dei comandi abilitato per ssh, ad esempio Git Bash.

  2. Se non si è usato Azure Cloud Shell in precedenza, viene visualizzata una notifica per segnalare che non è stata montata alcuna risorsa di archiviazione. Selezionare la sottoscrizione di Azure nella casella Sottoscrizione e fare clic su Crea archiviazione.

  3. Al prompt dei comandi cloud incollare il comando seguente. Sostituire sshuser con il nome utente SSH. Sostituire kafka-mslearn-stock con il nome del cluster di Apache Kafka. Si noti che è necessario includere -ssh dopo il nome.

    ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
    
  4. Alla prima connessione al cluster, è possibile che il client SSH mostri un avviso relativo all'impossibilità di confermare l'autenticità dell'host. Al prompt digitare yese quindi premere INVIO per aggiungere l'host all'elenco dei server attendibili del client SSH.

  5. Quando richiesto, immettere la password per l'utente SSH.

    Dopo avere eseguito la connessione, vengono visualizzate informazioni simili al testo seguente:

        Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64)
    
        * Documentation:  https://help.ubuntu.com
        * Management:     https://landscape.canonical.com
        * Support:        https://ubuntu.com/advantage
    
        * Overheard at KubeCon: "microk8s.status just blew my mind".
    
            https://microk8s.io/docs/commands#microk8s.status
    
        0 packages can be updated.
        0 updates are security updates.
    
    
    
        Welcome to Kafka on HDInsight.
    
    
        The programs included with the Ubuntu system are free software;
        the exact distribution terms for each program are described in the
        individual files in /usr/share/doc/*/copyright.
    
        Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
        applicable law.
    
        To run a command as administrator (user "root"), use "sudo <command>".
        See "man sudo_root" for details.
    
  6. Installare jq, un processore JSON da riga di comando. Questa utilità consente di analizzare i documenti JSON ed è utile nell'analisi delle informazioni sull'host. Dalla connessione SSH aperta, immettere il comando seguente per installare jq:

    sudo apt -y install jq
    
  7. Configurare la variabile di password. Sostituire PASSWORD con la password di accesso al cluster e quindi immettere il comando:

    export password='PASSWORD'
    
  8. Estrarre il nome del cluster con l'uso corretto di maiuscole e minuscole. L'uso effettivo di maiuscole e minuscole nel nome del cluster può differire dal previsto, a seconda della modalità di creazione del cluster. Questo comando otterrà la combinazione di maiuscole e minuscole effettiva e quindi la archivierà in una variabile. Immettere il comando seguente:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Questo comando non restituisce risposte.

  9. Per impostare una variabile di ambiente con le informazioni degli host Zookeeper, usare il comando seguente. Il comando recupera tutti gli host Zookeeper, quindi restituisce solo le prime due voci. per mantenere un certo livello di ridondanza nel caso in cui un host fosse irraggiungibile.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    Nota

    Questo comando richiede l'accesso ad Ambari. Se il cluster è protetto da un gruppo NSG, eseguire questo comando da un computer in grado di accedere ad Ambari.

    Anche questo comando non restituisce risposte.

  10. Usare il comando seguente per verificare che la variabile di ambiente sia impostata correttamente:

    echo $KAFKAZKHOSTS
    

    Questo comando restituisce informazioni simili al testo seguente:

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  11. Per impostare una variabile di ambiente con le informazioni degli host broker Apache Kafka, usare il comando seguente:

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Nota

    Questo comando richiede l'accesso ad Ambari. Se il cluster è protetto da un gruppo NSG, eseguire questo comando da un computer in grado di accedere ad Ambari.

    Questo comando non restituisce output.

  12. Usare il comando seguente per verificare che la variabile di ambiente sia impostata correttamente:

    echo $KAFKABROKERS
    

    Questo comando restituisce informazioni simili al testo seguente:

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

  13. Copiare uno dei valori del broker Kafka restituiti nel passaggio precedente alla riga 19 del file python-producer-simulator-template.py e racchiudere il valore tra virgolette singole, ad esempio:

    kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
    
  14. Salvare il file python-producer-simulator-template-simulator-template.py.

  15. Tornare alla finestra di connessione ssh e usare il comando seguente per creare un argomento.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
    

Questo comando si connette a Zookeeper usando le informazioni sugli host archiviate in $KAFKAZKHOSTS. Crea quindi un argomento di Apache Kafka denominato stockVals, in modo che corrisponda al nome dell'argomento in python-producer-simulator-template.py.

Copiare il file Python nel nodo head ed eseguire il file per trasmettere i dati

  1. In una nuova finestra git passare al percorso del file python-producer-simulator-template.py e copiare il file dal computer locale al nodo head primario usando il comando seguente. Sostituire kafka-mslearn-stock con il nome del cluster di Apache Kafka. Si noti che è necessario includere -ssh dopo il nome.

    scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
    

    Quando viene chiesto se si vuole continuare la connessione, digitare yes. Al prompt, immettere la password per il cluster. Al termine del trasferimento dei file, viene visualizzato l'output seguente.

    python-producer-simulator-template.py    100% 1896    71.9KB/s   00:00
    
  2. Tornare ora al prompt dei comandi di Azure in cui sono state recuperate le informazioni del broker ed eseguire il comando seguente per installare Kafka:

    sudo pip install kafka-python
    

    Al termine dell'installazione di Kafka, viene visualizzato l'output seguente.

    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.7
    
  3. Nella stessa finestra installare le richieste usando il comando seguente:

    sudo apt-get install python-requests
    
  4. Quando viene visualizzato un messaggio simile al seguente: "Al termine dell'operazione verranno utilizzati 4.327 kB di spazio su disco aggiuntivo. Vuoi continuare? [Y/n]" digitare y.

    Dopo l'installazione delle richieste, viene visualizzato un output simile al seguente.

    Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ...
    Setting up python-requests (2.9.1-3ubuntu0.1) ...
    
  5. Nella stessa finestra usare il comando seguente per eseguire il file python.

    python python-producer-simulator-template.py
    

    L'output dovrebbe essere simile al seguente:

    No loops argument provided. Default loops are 1000
    Running in simulated mode
    [
    {
        "symbol": "MSFT",
        "size": 355,
        "price": 147.205,
        "time": 1578029521022
    },
    {
        "symbol": "BA",
        "size": 345,
        "price": 352.607,
        "time": 1578029521022
    },
    {
        "symbol": "JNJ",
        "size": 58,
        "price": 142.043,
        "time": 1578029521022
    },
    {
        "symbol": "F",
        "size": 380,
        "price": 8.545,
        "time": 1578029521022
    },
    {
        "symbol": "TSLA",
        "size": 442,
        "price": 329.342,
        "time": 1578029521022
    },
    {
        "symbol": "BAC",
        "size": 167,
        "price": 32.921,
        "time": 1578029521022
    },
    {
        "symbol": "GE",
        "size": 222,
        "price": 11.115,
        "time": 1578029521022
    },
    {
        "symbol": "MMM",
        "size": 312,
        "price": 174.643,
        "time": 1578029521022
    },
    {
        "symbol": "INTC",
        "size": 483,
        "price": 54.978,
        "time": 1578029521022
    },
    {
        "symbol": "WMT",
        "size": 387,
        "price": 120.355,
        "time": 1578029521022
    }
    ]
    stockVals
    2
    0
    stockVals
    1
    0
    stockVals
    3
    0
    stockVals
    2
    1
    stockVals
    7
    0
    stockVals
    7
    1
    stockVals
    1
    1
    stockVals
    4
    0
    stockVals
    4
    1
    stockVals
    1
    2
    

Questo output fornisce i prezzi simulati per i titoli azionari elencati nel file python-producer-simulated-template.py, seguiti dall'argomento, dalla partizione e dall'offset del messaggio nell'argomento. Si noterà che ogni volta che il producer viene attivato (ogni secondo), viene generato un nuovo batch di prezzi e ogni nuovo messaggio viene aggiunto a una partizione in base a un determinato offset.