透過 Azure 虛擬網絡,連線 HDInsight 上的 Apache Kafka

瞭解如何透過 Azure 虛擬網絡 直接連線到 HDInsight 上的 Apache Kafka。 本檔提供使用下列組態連線到 Kafka 的資訊:

  • 從內部部署網路中的資源。 此連線是使用局域網路上的 VPN 裝置(軟體或硬體)來建立的。
  • 從使用 VPN 軟體客戶端的開發環境。

注意

建議您使用 Azure Az PowerShell 模組來與 Azure 互動。 請參閱安裝 Azure PowerShell 以開始使用。 若要了解如何移轉至 Az PowerShell 模組,請參閱將 Azure PowerShell 從 AzureRM 移轉至 Az

架構與規劃

HDInsight 不允許透過公用因特網直接連線到 Kafka。 相反地,Kafka 用戶端(產生者和取用者)必須使用下列其中一種連接方法:

  • 在 HDInsight 上與 Kafka 相同的虛擬網路中執行用戶端。 此設定用於 HDInsight 上的 Apache Kafka 開頭檔。 用戶端會直接在 HDInsight 叢集節點上或在相同網路中的另一部虛擬機上執行。

  • 連線 專用網,例如內部部署網路,到虛擬網路。 此設定可讓內部部署網路中用戶端直接使用 Kafka。 若要啟用此設定,請執行下列工作:

    1. 建立虛擬網路。

    2. 建立使用站對站組態的 VPN 閘道。 本檔中所使用的組態會連線到內部部署網路中 VPN 閘道裝置。

    3. 在虛擬網路中建立 DNS 伺服器。

    4. 設定每個網路中 DNS 伺服器之間的轉送。

    5. 在虛擬網路中的 HDInsight 叢集上建立 Kafka。

      如需詳細資訊,請參閱從內部部署網路 連線 至 Apache Kafka 一節。

  • 連線 使用 VPN 閘道和 VPN 用戶端將個別機器連線到虛擬網路。 若要啟用此設定,請執行下列工作:

    1. 建立虛擬網路。

    2. 建立使用點對站組態的 VPN 閘道。 此設定可以搭配 Windows 和 MacOS 用戶端使用。

    3. 在虛擬網路中的 HDInsight 叢集上建立 Kafka。

    4. 設定 Kafka 以進行 IP 廣告。 此設定可讓用戶端使用訊息代理程式IP位址進行連線,而不是功能變數名稱。

    5. 下載並使用開發系統上的 VPN 用戶端。

      如需詳細資訊,請參閱使用 VPN 用戶端 連線 至 Apache Kafka 一節。

      警告

      此設定僅供開發之用,因為有下列限制:

      • 每個客戶端都必須使用 VPN 軟體用戶端進行連線。
      • VPN 用戶端不會將名稱解析要求傳遞至虛擬網路,因此您必須使用IP位址來與 Kafka 通訊。 IP 通訊需要在 Kafka 叢集上進行額外的設定。

如需在虛擬網路中使用 HDInsight 的詳細資訊,請參閱 規劃 Azure HDInsight 叢集的虛擬網路。

從內部部署網路 連線 至 Apache Kafka

若要建立與內部部署網路通訊的 Kafka 叢集,請遵循 連線 HDInsight 到內部部署網路檔中的步驟

重要

建立 HDInsight 叢集時,選取 Kafka 叢集類型。

這些步驟會建立下列組態:

  • Azure 虛擬網路
  • 站對站 VPN 閘道
  • Azure 儲存體 帳戶(HDInsight 使用)
  • HDInsight 上的 Kafka

若要確認 Kafka 用戶端可以從內部部署連線到叢集,請使用範例:Python 用戶端一節中的步驟。

使用 VPN 用戶端 連線 至 Apache Kafka

使用本節中的步驟來建立下列組態:

  • Azure 虛擬網路
  • 點對站 VPN 閘道
  • Azure 儲存體 帳戶 (HDInsight 使用)
  • HDInsight 上的 Kafka
  1. 請遵循使用點對站聯機檔的自我簽署憑證中的步驟。 本檔會建立閘道所需的憑證。

  2. 開啟 PowerShell 提示字元,並使用下列程式代碼登入您的 Azure 訂用帳戶:

    Connect-AzAccount
    # If you have multiple subscriptions, uncomment to set the subscription
    #Select-AzSubscription -SubscriptionName "name of your subscription"
    
  3. 使用下列程式代碼來建立包含組態資訊的變數:

    # Prompt for generic information
    $resourceGroupName = Read-Host "What is the resource group name?"
    $baseName = Read-Host "What is the base name? It is used to create names for resources, such as 'net-basename' and 'kafka-basename':"
    $location = Read-Host "What Azure Region do you want to create the resources in?"
    $rootCert = Read-Host "What is the file path to the root certificate? It is used to secure the VPN gateway."
    
    # Prompt for HDInsight credentials
    $adminCreds = Get-Credential -Message "Enter the HTTPS user name and password for the HDInsight cluster" -UserName "admin"
    $sshCreds = Get-Credential -Message "Enter the SSH user name and password for the HDInsight cluster" -UserName "sshuser"
    
    # Names for Azure resources
    $networkName = "net-$baseName"
    $clusterName = "kafka-$baseName"
    $storageName = "store$baseName" # Can't use dashes in storage names
    $defaultContainerName = $clusterName
    $defaultSubnetName = "default"
    $gatewaySubnetName = "GatewaySubnet"
    $gatewayPublicIpName = "GatewayIp"
    $gatewayIpConfigName = "GatewayConfig"
    $vpnRootCertName = "rootcert"
    $vpnName = "VPNGateway"
    
    # Network settings
    $networkAddressPrefix = "10.0.0.0/16"
    $defaultSubnetPrefix = "10.0.0.0/24"
    $gatewaySubnetPrefix = "10.0.1.0/24"
    $vpnClientAddressPool = "172.16.201.0/24"
    
    # HDInsight settings
    $hdiWorkerNodes = 4
    $hdiVersion = "3.6"
    $hdiType = "Kafka"
    
  4. 使用下列程式代碼來建立 Azure 資源群組和虛擬網路:

    # Create the resource group that contains everything
    New-AzResourceGroup -Name $resourceGroupName -Location $location
    
    # Create the subnet configuration
    $defaultSubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -AddressPrefix $defaultSubnetPrefix
    $gatewaySubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -AddressPrefix $gatewaySubnetPrefix
    
    # Create the subnet
    New-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AddressPrefix $networkAddressPrefix `
        -Subnet $defaultSubnetConfig, $gatewaySubnetConfig
    
    # Get the network & subnet that were created
    $network = Get-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName
    $gatewaySubnet = Get-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -VirtualNetwork $network
    $defaultSubnet = Get-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -VirtualNetwork $network
    
    # Set a dynamic public IP address for the gateway subnet
    $gatewayPublicIp = New-AzPublicIpAddress -Name $gatewayPublicIpName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AllocationMethod Dynamic
    $gatewayIpConfig = New-AzVirtualNetworkGatewayIpConfig -Name $gatewayIpConfigName `
        -Subnet $gatewaySubnet `
        -PublicIpAddress $gatewayPublicIp
    
    # Get the certificate info
    # Get the full path in case a relative path was passed
    $rootCertFile = Get-ChildItem $rootCert
    $cert = New-Object System.Security.Cryptography.X509Certificates.X509Certificate2($rootCertFile)
    $certBase64 = [System.Convert]::ToBase64String($cert.RawData)
    $p2sRootCert = New-AzVpnClientRootCertificate -Name $vpnRootCertName `
        -PublicCertData $certBase64
    
    # Create the VPN gateway
    New-AzVirtualNetworkGateway -Name $vpnName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -IpConfigurations $gatewayIpConfig `
        -GatewayType Vpn `
        -VpnType RouteBased `
        -EnableBgp $false `
        -GatewaySku Standard `
        -VpnClientAddressPool $vpnClientAddressPool `
        -VpnClientRootCertificates $p2sRootCert
    

    警告

    此程式可能需要幾分鐘的時間才能完成。

  5. 使用下列程式代碼來建立 Azure 儲存體 帳戶和 Blob 容器:

    # Create the storage account
    New-AzStorageAccount `
        -ResourceGroupName $resourceGroupName `
        -Name $storageName `
        -SkuName Standard_GRS `
        -Location $location `
        -Kind StorageV2 `
        -EnableHttpsTrafficOnly 1
    
    # Get the storage account keys and create a context
    $defaultStorageKey = (Get-AzStorageAccountKey -ResourceGroupName $resourceGroupName `
        -Name $storageName)[0].Value
    $storageContext = New-AzStorageContext -StorageAccountName $storageName `
        -StorageAccountKey $defaultStorageKey
    
    # Create the default storage container
    New-AzStorageContainer -Name $defaultContainerName `
        -Context $storageContext
    
  6. 使用下列程式代碼來建立 HDInsight 叢集:

    # Create the HDInsight cluster
    New-AzHDInsightCluster `
        -ResourceGroupName $resourceGroupName `
        -ClusterName $clusterName `
        -Location $location `
        -ClusterSizeInNodes $hdiWorkerNodes `
        -ClusterType $hdiType `
        -OSType Linux `
        -Version $hdiVersion `
        -HttpCredential $adminCreds `
        -SshCredential $sshCreds `
        -DefaultStorageAccountName "$storageName.blob.core.windows.net" `
        -DefaultStorageAccountKey $defaultStorageKey `
        -DefaultStorageContainer $defaultContainerName `
        -DisksPerWorkerNode 2 `
        -VirtualNetworkId $network.Id `
        -SubnetName $defaultSubnet.Id
    

    警告

    此程序大約需要15分鐘才能完成。

設定 Kafka 以進行 IP 廣告

根據預設,Apache Zookeeper 會將 Kafka 訊息代理程式功能變數名稱傳回給用戶端。 此設定不適用於 VPN 軟體客戶端,因為它無法使用虛擬網路中實體的名稱解析。 針對此設定,請使用下列步驟來設定 Kafka 來公告 IP 位址,而不是功能變數名稱:

  1. 使用網頁瀏覽器移至 https://CLUSTERNAME.azurehdinsight.net。 將取代 CLUSTERNAME 為 HDInsight 叢集上的 Kafka 名稱。

    出現提示時,請使用叢集的 HTTPS 使用者名稱和密碼。 叢集的Ambari Web UI隨即顯示。

  2. 若要檢視 Kafka 的資訊,請從左側清單中選取 Kafka

    Service list with Kafka highlighted.

  3. 若要檢視 Kafka 組態,請從頂端中間選取 [設定 ]。

    Apache Ambari services configuration.

  4. 若要尋找 kafka-env 組態,請在右上方的 [篩選] 字段中輸入 kafka-env

    Kafka configuration, for kafka-env.

  5. 若要將 Kafka 設定為公告 IP 位址,請將下列文字新增至 kafka-env-template 字段底部

    # Configure Kafka to advertise IP addresses instead of FQDN
    IP_ADDRESS=$(hostname -i)
    echo advertised.listeners=$IP_ADDRESS
    sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
    echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
    
  6. 若要設定 Kafka 接聽的介面,請在右上方的 [篩選] 字段中輸入 listeners

  7. 若要將 Kafka 設定為在所有網路介面上接聽,請將接 聽程式 欄位中的值變更為 PLAINTEXT://0.0.0.0:9092

  8. 若要儲存組態變更,請使用 [ 儲存] 按鈕。 輸入描述變更的簡訊。 儲存變更之後,請選取 [ 確定 ]。

    Apache Ambari save configuration.

  9. 若要防止重新啟動 Kafka 時發生錯誤,請使用 [ 服務動作] 按鈕,然後選取 [ 開啟維護模式]。 選取 [確定] 以完成這項作業。

    Service actions, with turn on maintenance highlighted.

  10. 若要重新啟動 Kafka,請使用 [重新啟動] 按鈕,然後選取 [重新啟動所有受影響的]。 確認重新啟動,然後在作業完成之後使用 [確定 ] 按鈕。

    Restart button with restart all affected highlighted.

  11. 若要停用維護模式,請使用 [ 服務動作] 按鈕,然後選取 [ 關閉維護模式]。 選取 [ 確定 ] 以完成這項作業。

連線 至 VPN 閘道

若要連線到 VPN 閘道,請使用設定點對站連線檔的 連線 至 Azure 一節

範例:Python 用戶端

若要驗證 Kafka 的連線,請使用下列步驟來建立和執行 Python 產生者和取用者:

  1. 使用下列其中一種方法來擷取 Kafka 叢集中節點的完整域名 (FQDN) 和 IP 位址:

    $resourceGroupName = "The resource group that contains the virtual network used with HDInsight"
    
    $clusterNICs = Get-AzNetworkInterface -ResourceGroupName $resourceGroupName | where-object {$_.Name -like "*node*"}
    
    $nodes = @()
    foreach($nic in $clusterNICs) {
        $node = new-object System.Object
        $node | add-member -MemberType NoteProperty -name "Type" -value $nic.Name.Split('-')[1]
        $node | add-member -MemberType NoteProperty -name "InternalIP" -value $nic.IpConfigurations.PrivateIpAddress
        $node | add-member -MemberType NoteProperty -name "InternalFQDN" -value $nic.DnsSettings.InternalFqdn
        $nodes += $node
    }
    $nodes | sort-object Type
    
    az network nic list --resource-group <resourcegroupname> --output table --query "[?contains(name,'node')].{NICname:name,InternalIP:ipConfigurations[0].privateIpAddress,InternalFQDN:dnsSettings.internalFqdn}"
    

    此文稿假設 $resourceGroupName 是包含虛擬網路的 Azure 資源群組名稱。

    儲存傳回的資訊以供後續步驟使用。

  2. 使用下列命令來安裝 kafka-python 用戶端:

    pip install kafka-python
    
  3. 若要將數據傳送至 Kafka,請使用下列 Python 程式代碼:

    from kafka import KafkaProducer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # NOTE: you don't need the full list of worker nodes, just one or two.
    producer = KafkaProducer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'])
    for _ in range(50):
       producer.send('testtopic', b'test message')
    

    'kafka_broker' 專案取代為本節中步驟 1 傳回的位址:

    • 如果您使用 軟體 VPN 用戶端,請將專案取代 kafka_broker 為背景工作節點的 IP 位址。

    • 如果您已 透過自定義 DNS 伺服器啟用名稱解析,請將專案取代 kafka_broker 為背景工作節點的 FQDN。

      注意

      此程式代碼會將字串 test message 傳送至 主題 testtopic。 HDInsight 上 Kafka 的預設設定,如果主題不存在,則不會建立主題。 請參閱 如何在 HDInsight 上設定 Apache Kafka 來自動建立主題。 或者,您可以在產生訊息之前手動建立主題。

  4. 若要從 Kafka 擷取訊息,請使用下列 Python 程式代碼:

    from kafka import KafkaConsumer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # Again, you only need one or two, not the full list.
    # Note: auto_offset_reset='earliest' resets the starting offset to the beginning
    #       of the topic
    consumer = KafkaConsumer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'],auto_offset_reset='earliest')
    consumer.subscribe(['testtopic'])
    for msg in consumer:
      print (msg)
    

    'kafka_broker' 專案取代為本節中步驟 1 傳回的位址:

    • 如果您使用 軟體 VPN 用戶端,請將專案取代 kafka_broker 為背景工作節點的 IP 位址。

    • 如果您已 透過自定義 DNS 伺服器啟用名稱解析,請將專案取代 kafka_broker 為背景工作節點的 FQDN。

下一步

如需搭配虛擬網路使用 HDInsight 的詳細資訊,請參閱 規劃 Azure HDInsight 叢集的 虛擬網路部署檔。

如需使用點對站 VPN 閘道建立 Azure 虛擬網絡 的詳細資訊,請參閱下列檔:

如需在 HDInsight 上使用 Apache Kafka 的詳細資訊,請參閱下列檔: