练习 - 创建 Kafka 制作者

已完成

现在已部署了 Kafka 和 Spark 群集,接下来可将 Kafka 制作者添加到 Kafka 头节点。 该制作者是股票价格模拟器,它生成人为股票价格。

下载示例

  1. 在 Internet 浏览器中转到 https://github.com/Azure/hdinsight-mslearn,并在本地下载或克隆示例(如果在上一模块中未执行此操作)。
  2. 在本地打开 Spark Structured Streaming\python-producer-simulator-template.py 文件。

检索 Kafka 中转站 URL

接下来,在头节点上使用 ssh 并将 URL 添加到 Python 文件,你需要通过这种方式来检索 Kafka 中转站 URL。

  1. 若要连接到 Apache Kafka 群集的主头节点,需要通过 ssh 连接到该节点。 建议使用 Azure 门户中的 Azure Cloud Shell 进行连接。 在 Azure 门户中,单击顶部工具栏中的“Azure Cloud Shell”按钮,然后选择“Bash”。 还可以使用启用了 ssh 的命令提示符,如 Git Bash。

  2. 如果之前尚未使用过 Azure Cloud Shell,则会显示指出你没有装载存储的通知。 从“订阅”框中选择 Azure 订阅,然后单击“创建存储”。

  3. 在云提示下,粘贴以下命令。 将 sshuser 替换为 SSH 用户名。 将 kafka-mslearn-stock 替换为 Apache Kafka 群集的名称,请注意,必须在群集名称后附上 -ssh。

    ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
    
  4. 首次连接到群集时,SSH 客户端可能会显示一个警告,提示无法验证主机。 当系统提示时,请键入“yes”,然后按 Enter,将主机添加到 SSH 客户端的受信任服务器列表 。

  5. 出现提示时,请输入 SSH 用户名密码。

    连接后,显示的信息类似于以下文本:

        Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64)
    
        * Documentation:  https://help.ubuntu.com
        * Management:     https://landscape.canonical.com
        * Support:        https://ubuntu.com/advantage
    
        * Overheard at KubeCon: "microk8s.status just blew my mind".
    
            https://microk8s.io/docs/commands#microk8s.status
    
        0 packages can be updated.
        0 updates are security updates.
    
    
    
        Welcome to Kafka on HDInsight.
    
    
        The programs included with the Ubuntu system are free software;
        the exact distribution terms for each program are described in the
        individual files in /usr/share/doc/*/copyright.
    
        Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
        applicable law.
    
        To run a command as administrator (user "root"), use "sudo <command>".
        See "man sudo_root" for details.
    
  6. 安装 jq,一个命令行 JSON 处理程序。 此实用程序用于分析 JSON 文档和主机信息。 在打开的 SSH 连接中,输入以下命令以安装 jq

    sudo apt -y install jq
    
  7. 设置密码变量。 将 PASSWORD 替换为群集登录密码,然后输入以下命令:

    export password='PASSWORD'
    
  8. 提取具有正确大小写格式的群集名称。 群集名称的实际大小写格式可能出乎预期,具体取决于群集的创建方式。 此命令将获取实际的大小写,然后将其存储在变量中。 输入以下命令:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    此命令无响应。

  9. 若要使用 Zookeeper 主机信息来设置环境变量,请使用以下命令。 此命令检索所有 Zookeeper 主机,然后仅返回前两个条目。 这是由于某个主机无法访问时,需要一些冗余。

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    注意

    此命令需要 Ambari 访问权限。 如果群集位于 NSG 后面,请在可访问 Ambari 的计算机上运行此命令。

    此命令也没有响应。

  10. 若要验证是否已正确设置了环境变量,请使用以下命令:

    echo $KAFKAZKHOSTS
    

    此命令返回类似于以下文本的信息:

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  11. 若要使用 Apache Kafka 代理主机信息来设置环境变量,请使用以下命令:

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    注意

    此命令需要 Ambari 访问权限。 如果群集位于 NSG 后面,请在可访问 Ambari 的计算机上运行此命令。

    此命令没有输出。

  12. 若要验证是否已正确设置了环境变量,请使用以下命令:

    echo $KAFKABROKERS
    

    此命令返回类似于以下文本的信息:

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

  13. 将上一步返回的某个 Kafka 中转站值复制到第 19 行的 python-producer-simulator-template.py 文件中,并将该值用单引号引起来,例如:

    kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
    
  14. 保存 python-producer-simulator-template-simulator-template.py 文件。

  15. 返回到 ssh 连接窗口,使用以下命令创建主题。

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
    

此命令使用存储在 $KAFKAZKHOSTS 中的主机信息连接到 Zookeeper。 然后创建一个名为 stockVals 的 Apache Kafka 主题,以匹配 python-producer-simulator-template.py 中的主题名称。

将 Python 文件复制到头节点,并运行文件以流式处理数据

  1. 在新的 Git 窗口中,导航到 python-producer-simulator-template.py 文件所在的位置,并使用以下命令将该文件从本地计算机复制到主头节点。 将 kafka-mslearn-stock 替换为 Apache Kafka 群集的名称,请注意,必须在群集名称后附上 -ssh。

    scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
    

    当系统询问是否要继续连接时,请键入“是”。 然后在出现提示时,输入群集的密码。 文件传输后,将显示以下输出。

    python-producer-simulator-template.py    100% 1896    71.9KB/s   00:00
    
  2. 现在切换回检索代理信息的 Azure 命令提示符,并运行以下命令安装 Kafka:

    sudo pip install kafka-python
    

    Kafka 安装成功后,将显示以下输出。

    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.7
    
  3. 在同一个窗口中,使用以下命令安装请求:

    sudo apt-get install python-requests
    
  4. 当系统询问“此操作后,将使用 4,327 kB 的额外磁盘空间。 是否继续? [Y/n]”输入 y。

    请求成功安装后,将显示类似于以下内容的输出。

    Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ...
    Setting up python-requests (2.9.1-3ubuntu0.1) ...
    
  5. 在同一个窗口中,使用以下命令运行 Python 文件

    python python-producer-simulator-template.py
    

    应该会看到与下面类似的输出:

    No loops argument provided. Default loops are 1000
    Running in simulated mode
    [
    {
        "symbol": "MSFT",
        "size": 355,
        "price": 147.205,
        "time": 1578029521022
    },
    {
        "symbol": "BA",
        "size": 345,
        "price": 352.607,
        "time": 1578029521022
    },
    {
        "symbol": "JNJ",
        "size": 58,
        "price": 142.043,
        "time": 1578029521022
    },
    {
        "symbol": "F",
        "size": 380,
        "price": 8.545,
        "time": 1578029521022
    },
    {
        "symbol": "TSLA",
        "size": 442,
        "price": 329.342,
        "time": 1578029521022
    },
    {
        "symbol": "BAC",
        "size": 167,
        "price": 32.921,
        "time": 1578029521022
    },
    {
        "symbol": "GE",
        "size": 222,
        "price": 11.115,
        "time": 1578029521022
    },
    {
        "symbol": "MMM",
        "size": 312,
        "price": 174.643,
        "time": 1578029521022
    },
    {
        "symbol": "INTC",
        "size": 483,
        "price": 54.978,
        "time": 1578029521022
    },
    {
        "symbol": "WMT",
        "size": 387,
        "price": 120.355,
        "time": 1578029521022
    }
    ]
    stockVals
    2
    0
    stockVals
    1
    0
    stockVals
    3
    0
    stockVals
    2
    1
    stockVals
    7
    0
    stockVals
    7
    1
    stockVals
    1
    1
    stockVals
    4
    0
    stockVals
    4
    1
    stockVals
    1
    2
    

此输出提供了 python-producer-simulated-template.py 文件中列出的股票的模拟股票价格,然后是主题、分区和主题中消息的偏移量。 你可以看到,每次触发制作者时(每秒),都会生成新一批股票价格,并且每个新消息都在特定偏移量处被添加到分区中。