練習 - 建立 Kafka 生產者

已完成

既然已部署 Kafka 和 Spark 叢集,那麼讓我們將 Kafka 生產者新增至 Kafka 前端節點。 此生產者是股票價格模擬器,其會產生人工股票價格。

下載範例

  1. 在您的網際網路瀏覽器中,前往 https://github.com/Azure/hdinsight-mslearn,並在本機下載或複製範例,如果您尚未在先前的課程模組中這樣做的話。
  2. 在本機開啟 Spark 結構化串流\python-producer-simulator-template.py 檔案。

取出 Kafka 訊息代理程式 URL

接下來,您需要取得 Kafka 訊息代理程式 URL,方法是在前端節點上使用 ssh,並將這些 URL 新增至 python 檔案。

  1. 若要連線到 Apache Kafka 叢集的主要前端節點,您需要透過 ssh 連線到節點。 Azure 入口網站中的 Azure Cloud Shell 是建議的連線方式。 在 Azure 入口網站中,按一下頂端工具列中的 [Azure Cloud Shell] 按鈕,然後選取 [Bash]。 您也可以使用已啟用 ssh 的命令提示字元,例如 Git Bash。

  2. 如果您之前未使用過 Azure Cloud Shell,則會顯示一則通知,指出您沒有裝載任何儲存體。 從 [訂用帳戶] 方塊中選取您的 Azure 訂用帳戶,然後按一下 [建立儲存體]。

  3. 在雲端提示中,貼上下列命令。 將 sshuser 取代為 SSH 使用者名稱。 將 kafka-mslearn-stock 取代為 Apache Kafka 叢集的名稱,並請注意,您必須在叢集名稱之後包含 -ssh。

    ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
    
  4. 當您初次連線到叢集時,您的 SSH 用戶端可能會顯示警告,指出無法確認主機的真確性。 在系統提示時,輸入 yes,然後按 Enter 鍵,以將主機新增至 SSH 用戶端信任的伺服器清單。

  5. 出現提示時,請輸入 SSH 使用者的密碼。

    連線之後,您會看到類似下列文字的資訊:

        Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64)
    
        * Documentation:  https://help.ubuntu.com
        * Management:     https://landscape.canonical.com
        * Support:        https://ubuntu.com/advantage
    
        * Overheard at KubeCon: "microk8s.status just blew my mind".
    
            https://microk8s.io/docs/commands#microk8s.status
    
        0 packages can be updated.
        0 updates are security updates.
    
    
    
        Welcome to Kafka on HDInsight.
    
    
        The programs included with the Ubuntu system are free software;
        the exact distribution terms for each program are described in the
        individual files in /usr/share/doc/*/copyright.
    
        Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
        applicable law.
    
        To run a command as administrator (user "root"), use "sudo <command>".
        See "man sudo_root" for details.
    
  6. 安裝 jq,這是命令列 JSON 處理器。 此公用程式可用來剖析 JSON 文件,而且在剖析主機資訊時很有用。 從開啟的 SSH 連線,輸入下列命令來安裝 jq

    sudo apt -y install jq
    
  7. 設定密碼變數。 請將 PASSWORD 取代為叢集登入密碼,然後輸入下列命令:

    export password='PASSWORD'
    
  8. 擷取正確大小寫的叢集名稱。 視叢集的建立方式而定,叢集名稱的實際大小寫可能與您預期的不同。 此命令會取得實際的大小寫,然後將其儲存在變數中。 輸入下列命令:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    此命令沒有回應。

  9. 若要使用 Zookeeper 主機資訊設定環境變數,請使用以下命令。 此命令會擷取所有的 Zookeeper 主機,然後只傳回前兩個項目。 這是因為考量備援之故,以防某一部主機無法連線。

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    注意

    此命令需要 Ambari 存取權。 如果您的叢集位於 NSG 後方,請從可存取 Ambari 的機器執行此命令。

    此命令也沒有回應。

  10. 若要確認是否已正確設定環境變數,請使用下列命令:

    echo $KAFKAZKHOSTS
    

    此命令會傳回類似以下文字的資訊:

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  11. 若要使用 Apache Kafka 訊息代理程式主機資訊來設定環境變數,請使用下列命令:

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    注意

    此命令需要 Ambari 存取權。 如果您的叢集位於 NSG 後方,請從可存取 Ambari 的機器執行此命令。

    此命令沒有輸出。

  12. 若要確認是否已正確設定環境變數,請使用下列命令:

    echo $KAFKABROKERS
    

    此命令會傳回類似以下文字的資訊:

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

  13. 將上一個步驟中所傳回的其中一個 Kafka 值複製到第 19 行上的 python-producer-simulator-template.py 檔案,並在該值前後加上單引號,例如:

    kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
    
  14. 儲存 python-producer-simulator-template-simulator-template.py 檔案。

  15. 回到 ssh 連線視窗,請使用下列命令來建立主題。

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
    

此命令會使用 KAFKAZKHOSTS 中儲存的主機資訊連線到 Zookeeper。 然後,其會建立名為 stockVals 的 Apache Kafka 主題,以符合 python-producer-simulator-template.py 中的主題名稱。

將 python 檔案複製到前端節點,並執行該檔案以串流資料

  1. 在新的 git 視窗中,瀏覽至 python-producer-simulator-template.py 檔案的位置,然後使用下列命令,將該檔案從本機電腦複製到主要前端節點。 將 kafka-mslearn-stock 取代為 Apache Kafka 叢集的名稱,並請注意,您必須在叢集名稱之後包含 -ssh。

    scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
    

    當系統詢問您是否要繼續連線時,請鍵入 yes。 然後,在提示中輸入叢集的密碼。 在檔案傳輸之後,會顯示下列輸出。

    python-producer-simulator-template.py    100% 1896    71.9KB/s   00:00
    
  2. 現在會切回 Azure 命令提示字元,您可在其中取出訊息代理程式資訊,並執行下列命令來安裝 Kafka:

    sudo pip install kafka-python
    

    在 Kafka 成功安裝之後,就會顯示下列輸出。

    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.7
    
  3. 在相同的視窗中,使用下列命令來安裝要求:

    sudo apt-get install python-requests
    
  4. 當系統詢問「在此作業之後,將會使用 4327 kB 的額外磁碟空間。 您要繼續嗎? [Y/n]」時,請鍵入 y。

    當要求安裝成功時,就會顯示如下的輸出。

    Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ...
    Setting up python-requests (2.9.1-3ubuntu0.1) ...
    
  5. 在同一視窗中,使用下列命令來執行 python 檔案

    python python-producer-simulator-template.py
    

    您應該會看到如下輸出:

    No loops argument provided. Default loops are 1000
    Running in simulated mode
    [
    {
        "symbol": "MSFT",
        "size": 355,
        "price": 147.205,
        "time": 1578029521022
    },
    {
        "symbol": "BA",
        "size": 345,
        "price": 352.607,
        "time": 1578029521022
    },
    {
        "symbol": "JNJ",
        "size": 58,
        "price": 142.043,
        "time": 1578029521022
    },
    {
        "symbol": "F",
        "size": 380,
        "price": 8.545,
        "time": 1578029521022
    },
    {
        "symbol": "TSLA",
        "size": 442,
        "price": 329.342,
        "time": 1578029521022
    },
    {
        "symbol": "BAC",
        "size": 167,
        "price": 32.921,
        "time": 1578029521022
    },
    {
        "symbol": "GE",
        "size": 222,
        "price": 11.115,
        "time": 1578029521022
    },
    {
        "symbol": "MMM",
        "size": 312,
        "price": 174.643,
        "time": 1578029521022
    },
    {
        "symbol": "INTC",
        "size": 483,
        "price": 54.978,
        "time": 1578029521022
    },
    {
        "symbol": "WMT",
        "size": 387,
        "price": 120.355,
        "time": 1578029521022
    }
    ]
    stockVals
    2
    0
    stockVals
    1
    0
    stockVals
    3
    0
    stockVals
    2
    1
    stockVals
    7
    0
    stockVals
    7
    1
    stockVals
    1
    1
    stockVals
    4
    0
    stockVals
    4
    1
    stockVals
    1
    2
    

此輸出會提供 python-producer-simulated-template.py 檔中所列股票的模擬股票價格,後面接著主題、分割區,以及主題中訊息的位移。 您可以看到每次觸發生產者 (每秒),就會產生新的一批股票價格,並將每則新訊息新增至特定位移的分割區。