你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

快速入门:使用 PowerShell 在 Azure HDInsight 中创建 Apache Kafka 群集

Apache Kafka 是开源分布式流式处理平台。 通常用作消息代理,因为它可提供类似于发布-订阅消息队列的功能。

在此快速入门中,了解如何使用 Azure PowerShell 创建 Apache Kafka 群集。 还介绍了如何使用已包含的实用工具发送并接收使用 Kafka 的信息。

警告

HDInsight 群集是基于分钟按比例计费,而不管用户是否使用它们。 请务必在使用完群集之后将其删除。 请参阅如何删除 HDInsight 群集

只有同一虚拟网络中的资源可以访问 Kafka API。 本快速入门使用 SSH 直接访问群集。 若要将其他服务、网络或虚拟机连接到 Kafka,则必须首先创建虚拟机,然后才能在网络中创建资源。 有关详细信息,请参阅使用虚拟网络连接到 Apache Kafka 文档。

如果没有 Azure 订阅,请在开始之前创建一个免费帐户

先决条件

注意

建议使用 Azure Az PowerShell 模块与 Azure 交互。 请参阅安装 Azure PowerShell 以开始使用。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az

登录 Azure

使用 Connect-AzAccount cmdlet 登录到 Azure 订阅,并按照屏幕上的说明进行操作。

# Login to your Azure subscription
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

创建资源组

使用 New-AzResourceGroup 创建 Azure 资源组。 资源组是在其中部署和管理 Azure 资源的逻辑容器。 以下示例将提示输入名称和地址,然后创建新的资源组:

$resourceGroup = Read-Host -Prompt "Enter the resource group name"
$location = Read-Host -Prompt "Enter the Azure region to use"

New-AzResourceGroup -Name $resourceGroup -Location $location

创建存储帐户

当 Kafka on HDInsight 使用 Azure 托管磁盘存储 Kafka 数据时,该群集还使用 Azure 存储来存储信息,例如日志。 使用 New-AzStorageAccount 创建新的存储帐户。

重要

存储帐户类型 BlobStorage 仅可用作 HDInsight 群集的辅助存储器。

$storageName = Read-Host -Prompt "Enter the storage account name"

New-AzStorageAccount `
    -ResourceGroupName $resourceGroup `
    -Name $storageName `
    -Location $location `
    -SkuName Standard_LRS `
    -Kind StorageV2 `
    -EnableHttpsTrafficOnly 1

HDInsight 在 blob 容器中的存储帐户中存储数据。 使用 New-AzStorageContainer 创建新容器。

$containerName = Read-Host -Prompt "Enter the container name"

$storageKey = (Get-AzStorageAccountKey `
                -ResourceGroupName $resourceGroup `
                -Name $storageName)[0].Value
$storageContext = New-AzStorageContext `
                    -StorageAccountName $storageName `
                    -StorageAccountKey $storageKey

New-AzStorageContainer -Name $containerName -Context $storageContext

创建 Apache Kafka 群集

通过 New-AzHDInsightCluster 创建 Apache Kafka on HDInsight 群集。

# Create a Kafka 2.4.1 cluster
$clusterName = Read-Host -Prompt "Enter the name of the Kafka cluster"
$httpCredential = Get-Credential -Message "Enter the cluster login credentials" -UserName "admin"
$sshCredentials = Get-Credential -Message "Enter the SSH user credentials" -UserName "sshuser"

$numberOfWorkerNodes = "4"
$clusterVersion = "5.0"
$clusterType="Kafka"
$disksPerNode=2

$kafkaConfig = New-Object "System.Collections.Generic.Dictionary``2[System.String,System.String]"
$kafkaConfig.Add("kafka", "2.4.1")

New-AzHDInsightCluster `
        -ResourceGroupName $resourceGroup `
        -ClusterName $clusterName `
        -Location $location `
        -ClusterSizeInNodes $numberOfWorkerNodes `
        -ClusterType $clusterType `
        -OSType "Linux" `
        -Version $clusterVersion `
        -ComponentVersion $kafkaConfig `
        -HttpCredential $httpCredential `
        -DefaultStorageAccountName "$storageName.blob.core.windows.net" `
        -DefaultStorageAccountKey $storageKey `
        -DefaultStorageContainer $clusterName `
        -SshCredential $sshCredentials `
        -DisksPerWorkerNode $disksPerNode

创建 HDInsight 群集可能需要最多 20 分钟。

-DisksPerWorkerNode 参数配置 Kafka on HDInsight 的可伸缩性。 Kafka on HDInsight 在群集中使用虚拟机的本地磁盘来存储数据。 由于 Kafka 的 I/O 很高,因此会使用 Azure 托管磁盘为每个节点提供高吞吐量和更多存储。

托管磁盘的类型可以为“标准”(HDD) 或“高级”(SSD)。 磁盘类型取决于辅助角色节点(Kafka 代理)所使用的 VM 大小。 高级磁盘可自动与 DS 和 GS 系列 VM 一起使用。 所有其他的 VM 类型使用“标准”。 可以使用 -WorkerNodeSize 参数设置 VM 类型。 有关参数的详细信息,请参阅 New-AzHDInsightCluster 文档。

如果计划使用 32 个以上的辅助角色节点(在创建群集时配置或者是在创建之后通过扩展群集来配置),则必须使用 -HeadNodeSize 参数指定至少具有 8 个核心和 14 GB RAM 的 VM 大小。 有关节点大小和相关费用的详细信息,请参阅 HDInsight 定价

连接至群集

  1. 若要连接到 Kafka 群集的主要头节点,请使用以下命令。 将 sshuser 替换为 SSH 用户名。 将 mykafka 替换为 Kafka 群集的名称

    ssh sshuser@mykafka-ssh.azurehdinsight.net
    
  2. 首次连接到群集时,SSH 客户端可能会显示一个警告,提示无法验证主机。 当系统提示时,请键入“yes”,然后按 Enter,将主机添加到 SSH 客户端的受信任服务器列表 。

  3. 出现提示时,请输入 SSH 用户名密码。

连接后,显示的信息类似于以下文本:

Authorized uses only. All activity may be monitored and reported.
Welcome to Ubuntu 16.04.4 LTS (GNU/Linux 4.13.0-1011-azure x86_64)

 * Documentation:  https://help.ubuntu.com
 * Management:     https://landscape.canonical.com
 * Support:        https://ubuntu.com/advantage

  Get cloud support with Ubuntu Advantage Cloud Guest:
    https://www.ubuntu.com/business/services/cloud

83 packages can be updated.
37 updates are security updates.



Welcome to Kafka on HDInsight.

Last login: Thu Mar 29 13:25:27 2018 from 108.252.109.241

获取 Apache Zookeeper 主机和代理主机信息

使用 Kafka 时,必须了解 Apache Zookeeper 和代理主机 。 Kafka API 以及 Kafka 随附的许多实用工具都使用这些主机。

在本部分中,可以从群集上的 Apache Ambari REST API 获取主机信息。

  1. 从 SSH 连接到群集,使用以下命令安装 jq 实用工具。 此实用工具用于分析 JSON 文档且有助于检索主机的信息:

    sudo apt -y install jq
    
  2. 若要将环境变量设置为群集名称,请使用以下命令:

    read -p "Enter the Kafka on HDInsight cluster name: " CLUSTERNAME
    

    出现提示时,请输入 Kafka 群集的名称。

  3. 若要使用 Zookeeper 主机信息来设置环境变量,请使用以下命令。 此命令检索所有 Zookeeper 主机,然后仅返回前两个条目。 这是由于某个主机无法访问时,需要一些冗余。

    export KAFKAZKHOSTS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    

    出现提示时,请输入群集登录帐户(不是 SSH 帐户)的密码。

  4. 若要验证是否已正确设置了环境变量,请使用以下命令:

     echo '$KAFKAZKHOSTS='$KAFKAZKHOSTS
    

    此命令返回类似于以下文本的信息:

    <zookeepername1>.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,<zookeepername2>.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  5. 若要使用 Kafka 代理主机信息来设置环境变量,请使用以下命令:

    export KAFKABROKERS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    

    出现提示时,请输入群集登录帐户(不是 SSH 帐户)的密码。

  6. 若要验证是否已正确设置了环境变量,请使用以下命令:

    echo '$KAFKABROKERS='$KAFKABROKERS
    

    此命令返回类似于以下文本的信息:

    <brokername1>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,<brokername2>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

管理 Apache Kafka 主题

Kafka 在主题中存储数据流。 可以使用 kafka-topics.sh 实用工具来管理主题。

  • 若要创建主题,请在 SSH 连接中使用以下命令:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    

    此命令使用存储在 $KAFKAZKHOSTS 中的主机信息连接到 Zookeeper, 然后创建名为 test 的 Kafka 主题。

    • 本主题中存储的数据已分区到八个分区。

    • 每个分区在群集中的三个辅助角色节点上进行复制。

      如果在 Azure 区域中已创建提供三个容错域的群集,则复制因子使用 3。 否则,复制因子使用 4.

      在具有三个容错域的区域中,复制因子为 3 可让副本分布在容错域中。 在具有两个容错域的区域中,复制因子为 4 可将副本均匀分布在域中。

      有关区域中容错域数的信息,请参阅 Linux 虚拟机的可用性文档。

      Kafka 不识别 Azure 容错域。 在创建主题的分区副本时,它可能未针对高可用性正确分发副本。

      若要确保高可用性,请使用 Apache Kafka 分区重新均衡工具。 必须通过 SSH 连接运行此工具,以便连接到 Kafka 群集的头节点。

      为确保 Kafka 数据的最高可用性,应在出现以下情况时为主题重新均衡分区副本:

      • 创建新主题或分区

      • 纵向扩展群集

  • 若要列出主题,请使用以下命令:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    此命令列出 Kafka 群集上可用的主题。

  • 若要删除主题,使用以下命令:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic topicname --zookeeper $KAFKAZKHOSTS
    

    此命令删除名为 topicname 的主题。

    警告

    如果删除了之前创建的 test 主题,则必须重新创建。 稍后会在本文档中使用此主题。

有关适用于 kafka-topics.sh 实用工具的命令的详细信息,请使用以下命令:

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh

生成和使用记录

Kafka 将记录存储在主题中。 记录由生成者生成,由使用者使用。 生产者与使用者通过 Kafka 代理服务通信。 HDInsight 群集中的每个工作节点都是 Kafka 代理主机。

若要将记录存储到之前创建的测试主题,并通过使用者对其进行读取,请使用以下步骤:

  1. 若要为该主题写入记录,请从 SSH 连接使用 kafka-console-producer.sh 实用工具:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic test
    

    此命令之后是一个空行。

  2. 在空行中键入文本消息,然后点击 Enter。 以这种方式输入一些消息,然后使用 Ctrl + C 返回到正常的提示符处。 每行均作为单独的记录发送到 Kafka 主题。

  3. 若要读取该主题的记录,请从 SSH 连接使用 kafka-console-consumer.sh 实用工具:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test --from-beginning
    

    此命令从主题中检索并显示记录。 使用 --from-beginning 告知使用者从流的开头开始,以检索所有记录。

    如果使用的是较旧版本的 Kafka,请将 --bootstrap-server $KAFKABROKERS 替换为 --zookeeper $KAFKAZKHOSTS

  4. 使用 Ctrl + C 阻止使用者。

还可以以编程方式创建生产者和使用者。 有关如何使用此 API 的示例,请参阅将 Apache Kafka 生产者和使用者 API 与 HDInsight 配合使用文档。

清理资源

如果不再需要资源组、HDInsight 和所有相关的资源,可以使用 Remove-AzResourceGroup 命令将其删除。

Remove-AzResourceGroup -Name $resourceGroup

警告

HDInsight 群集计费在创建群集之后便会开始,删除群集后才会停止。 HDInsight 群集按分钟收费,因此不再需要使用群集时,应将其删除。

删除 Kafka on HDInsight 群集会删除存储在 Kafka 中的任何数据。

后续步骤