Cvičení – vytvoření producenta Kafka

Dokončeno

Teď, když jsou clustery Kafka a Spark nasazené, umožňuje přidat do hlavního uzlu Kafka producenta Kafka. Tento producent je akciový stimulátor, který vytváří ceny umělých akcií.

Stažení ukázky

  1. Pokud jste to ještě neudělali v předchozím modulu, přejděte do https://github.com/Azure/hdinsight-mslearn internetového prohlížeče a stáhněte nebo naklonujte ukázku místně.
  2. Místně otevřete soubor Spark Structured Streaming\python-producer-simulator-template.py.

Načtení adres URL zprostředkovatele Kafka

Dále je potřeba načíst adresy URL zprostředkovatele Kafka pomocí ssh na hlavním uzlu a přidáním adres URL do souboru Python.

  1. Pokud se chcete připojit k primárnímu hlavnímu uzlu clusteru Apache Kafka, musíte se k uzlu připojit pomocí SSH. Azure Cloud Shell na webu Azure Portal se doporučuje připojit. Na webu Azure Portal klikněte na tlačítko Azure Cloud Shellu na horním panelu nástrojů a vyberte Bash. Můžete také použít příkazový řádek ssh s povoleným příkazem, jako je Git Bash.

  2. Pokud jste azure Cloud Shell ještě nepoužívali, zobrazí se oznámení, že nemáte připojené úložiště. V poli Předplatné vyberte své předplatné Azure a klikněte na Vytvořit úložiště.

  3. Na příkazovém řádku cloudu vložte následující příkaz. Nahraďte sshuser uživatelským jménem SSH. Nahraďte kafka-mslearn-stock názvem clusteru Apache Kafka a všimněte si, že za název clusteru musíte zahrnout -ssh.

    ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
    
  4. Když se ke clusteru poprvé připojíte, ve vašem klientovi SSH se může zobrazit upozornění na nemožnost potvrzení pravosti hostitele. Po zobrazení výzvy zadejte yes (ano) a pak stisknutím klávesy Enter přidejte hostitele na seznam důvěryhodných serverů vašeho klienta SSH.

  5. Po zobrazení výzvy zadejte heslo uživatele SSH.

    Po připojení se zobrazí informace podobné tomuto textu:

        Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64)
    
        * Documentation:  https://help.ubuntu.com
        * Management:     https://landscape.canonical.com
        * Support:        https://ubuntu.com/advantage
    
        * Overheard at KubeCon: "microk8s.status just blew my mind".
    
            https://microk8s.io/docs/commands#microk8s.status
    
        0 packages can be updated.
        0 updates are security updates.
    
    
    
        Welcome to Kafka on HDInsight.
    
    
        The programs included with the Ubuntu system are free software;
        the exact distribution terms for each program are described in the
        individual files in /usr/share/doc/*/copyright.
    
        Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
        applicable law.
    
        To run a command as administrator (user "root"), use "sudo <command>".
        See "man sudo_root" for details.
    
  6. Nainstalujte jq, procesor JSON příkazového řádku. Tento nástroj slouží k analýze dokumentů JSON a je užitečný při analýze informací o hostiteli. Z otevřeného připojení SSH zadejte následující příkaz, který chcete nainstalovat jq:

    sudo apt -y install jq
    
  7. Nastavte proměnnou hesla. Nahraďte PASSWORD přihlašovacím heslem clusteru a pak zadejte příkaz:

    export password='PASSWORD'
    
  8. Extrahujte název clusteru se správnými písmeny. Skutečná velikost výskytu názvu clusteru se může lišit od očekávání podle toho, jak byl cluster vytvořen. Tento příkaz získá skutečné velikostí a uloží ho do proměnné. Zadejte tento příkaz:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Tento příkaz nemá odpověď.

  9. Pokud chcete nastavit proměnnou prostředí s informacemi o hostiteli Zookeeper, použijte následující příkaz. Příkaz načte všechny hostitele Zookeeper a pak vrátí pouze první dvě položky. Je to proto, že chcete určitou redundanci pro případ, že jeden hostitel bude nedosažitelný.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    Poznámka:

    Tento příkaz vyžaduje přístup k Ambari. Pokud je váš cluster za skupinou zabezpečení sítě, spusťte tento příkaz z počítače, který má přístup k Ambari.

    Tento příkaz také nemá žádnou odpověď.

  10. Pokud chcete ověřit správné nastavení proměnné prostředí, použijte následující příkaz:

    echo $KAFKAZKHOSTS
    

    Tento příkaz by měl vrátit informace podobné následujícímu textu:

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  11. K nastavení proměnné prostředí s použitím informací o hostiteli zprostředkovatele Apache Kafka použijte následující příkaz:

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Poznámka:

    Tento příkaz vyžaduje přístup k Ambari. Pokud je váš cluster za skupinou zabezpečení sítě, spusťte tento příkaz z počítače, který má přístup k Ambari.

    Tento příkaz nemá žádný výstup.

  12. Pokud chcete ověřit správné nastavení proměnné prostředí, použijte následující příkaz:

    echo $KAFKABROKERS
    

    Tento příkaz by měl vrátit informace podobné následujícímu textu:

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

  13. Zkopírujte jednu z hodnot zprostředkovatele Kafka vrácenou v předchozím kroku do souboru python-producer-simulator-template.py na řádku 19 a uveďte jednoduché uvozovky kolem hodnoty, například:

    kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
    
  14. Uložte soubor python-producer-simulator-template-simulator-template.py.

  15. V okně připojení SSH vytvořte téma pomocí následujícího příkazu.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
    

Tento příkaz se připojí k Zookeeperu pomocí informací o hostiteli uložených v $KAFKAZKHOSTS. Potom vytvoří téma Apache Kafka s názvem stockVals, které bude odpovídat názvu tématu v python-producer-simulator-template.py.

Zkopírujte soubor Pythonu do hlavního uzlu a spusťte soubor pro streamování dat.

  1. V novém okně Git přejděte do umístění souboru python-producer-simulator-template.py a pomocí následujícího příkazu zkopírujte soubor z místního počítače do primárního hlavního uzlu. Nahraďte kafka-mslearn-stock názvem clusteru Apache Kafka a všimněte si, že za název clusteru musíte zahrnout -ssh.

    scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
    

    Když se zobrazí dotaz, jestli chcete pokračovat v připojování, zadejte ano. Pak na příkazovém řádku zadejte heslo clusteru. Po přenosu souboru se zobrazí následující výstup.

    python-producer-simulator-template.py    100% 1896    71.9KB/s   00:00
    
  2. Teď přepněte zpět na příkazový řádek Azure, kde jste načetli informace o zprostředkovatele, a spuštěním následujícího příkazu nainstalujte Kafka:

    sudo pip install kafka-python
    

    Po úspěšné instalaci Kafka se zobrazí následující výstup.

    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.7
    
  3. Ve stejném okně nainstalujte požadavky pomocí následujícího příkazu:

    sudo apt-get install python-requests
    
  4. Po zobrazení výzvy "Po této operaci se použije 4 327 kB dalšího místa na disku. Chcete pokračovat? [Y/n]" zadejte y.

    Když se požadavky úspěšně nainstalují, zobrazí se výstup podobný následujícímu.

    Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ...
    Setting up python-requests (2.9.1-3ubuntu0.1) ...
    
  5. Ve stejném okně spusťte soubor Python pomocí následujícího příkazu.

    python python-producer-simulator-template.py
    

    Zobrazený výstup by měl vypadat přibližně takto:

    No loops argument provided. Default loops are 1000
    Running in simulated mode
    [
    {
        "symbol": "MSFT",
        "size": 355,
        "price": 147.205,
        "time": 1578029521022
    },
    {
        "symbol": "BA",
        "size": 345,
        "price": 352.607,
        "time": 1578029521022
    },
    {
        "symbol": "JNJ",
        "size": 58,
        "price": 142.043,
        "time": 1578029521022
    },
    {
        "symbol": "F",
        "size": 380,
        "price": 8.545,
        "time": 1578029521022
    },
    {
        "symbol": "TSLA",
        "size": 442,
        "price": 329.342,
        "time": 1578029521022
    },
    {
        "symbol": "BAC",
        "size": 167,
        "price": 32.921,
        "time": 1578029521022
    },
    {
        "symbol": "GE",
        "size": 222,
        "price": 11.115,
        "time": 1578029521022
    },
    {
        "symbol": "MMM",
        "size": 312,
        "price": 174.643,
        "time": 1578029521022
    },
    {
        "symbol": "INTC",
        "size": 483,
        "price": 54.978,
        "time": 1578029521022
    },
    {
        "symbol": "WMT",
        "size": 387,
        "price": 120.355,
        "time": 1578029521022
    }
    ]
    stockVals
    2
    0
    stockVals
    1
    0
    stockVals
    3
    0
    stockVals
    2
    1
    stockVals
    7
    0
    stockVals
    7
    1
    stockVals
    1
    1
    stockVals
    4
    0
    stockVals
    4
    1
    stockVals
    1
    2
    

Tento výstup poskytuje simulované ceny akcií pro akcie uvedené v souboru python-producer-simulated-template.py následované tématem, oddílem a posunem zprávy v tématu. Uvidíte, že pokaždé, když se producent aktivuje (každou sekundu), se vygeneruje nová dávka cen akcií a každá nová zpráva se přidá do oddílu v určitém posunu.