透過 Azure 虛擬網絡,連線 HDInsight 上的 Apache Kafka
瞭解如何透過 Azure 虛擬網絡 直接連線到 HDInsight 上的 Apache Kafka。 本檔提供使用下列組態連線到 Kafka 的資訊:
- 從內部部署網路中的資源。 此連線是使用局域網路上的 VPN 裝置(軟體或硬體)來建立的。
- 從使用 VPN 軟體客戶端的開發環境。
注意
建議您使用 Azure Az PowerShell 模組來與 Azure 互動。 請參閱安裝 Azure PowerShell 以開始使用。 若要了解如何移轉至 Az PowerShell 模組,請參閱將 Azure PowerShell 從 AzureRM 移轉至 Az。
架構與規劃
HDInsight 不允許透過公用因特網直接連線到 Kafka。 相反地,Kafka 用戶端(產生者和取用者)必須使用下列其中一種連接方法:
在 HDInsight 上與 Kafka 相同的虛擬網路中執行用戶端。 此設定用於 HDInsight 上的 Apache Kafka 開頭檔。 用戶端會直接在 HDInsight 叢集節點上或在相同網路中的另一部虛擬機上執行。
連線 專用網,例如內部部署網路,到虛擬網路。 此設定可讓內部部署網路中用戶端直接使用 Kafka。 若要啟用此設定,請執行下列工作:
建立虛擬網路。
建立使用站對站組態的 VPN 閘道。 本檔中所使用的組態會連線到內部部署網路中 VPN 閘道裝置。
在虛擬網路中建立 DNS 伺服器。
設定每個網路中 DNS 伺服器之間的轉送。
在虛擬網路中的 HDInsight 叢集上建立 Kafka。
如需詳細資訊,請參閱從內部部署網路 連線 至 Apache Kafka 一節。
連線 使用 VPN 閘道和 VPN 用戶端將個別機器連線到虛擬網路。 若要啟用此設定,請執行下列工作:
建立虛擬網路。
建立使用點對站組態的 VPN 閘道。 此設定可以搭配 Windows 和 MacOS 用戶端使用。
在虛擬網路中的 HDInsight 叢集上建立 Kafka。
設定 Kafka 以進行 IP 廣告。 此設定可讓用戶端使用訊息代理程式IP位址進行連線,而不是功能變數名稱。
下載並使用開發系統上的 VPN 用戶端。
如需詳細資訊,請參閱使用 VPN 用戶端 連線 至 Apache Kafka 一節。
警告
此設定僅供開發之用,因為有下列限制:
- 每個客戶端都必須使用 VPN 軟體用戶端進行連線。
- VPN 用戶端不會將名稱解析要求傳遞至虛擬網路,因此您必須使用IP位址來與 Kafka 通訊。 IP 通訊需要在 Kafka 叢集上進行額外的設定。
如需在虛擬網路中使用 HDInsight 的詳細資訊,請參閱 規劃 Azure HDInsight 叢集的虛擬網路。
從內部部署網路 連線 至 Apache Kafka
若要建立與內部部署網路通訊的 Kafka 叢集,請遵循 連線 HDInsight 到內部部署網路檔中的步驟。
重要
建立 HDInsight 叢集時,選取 Kafka 叢集類型。
這些步驟會建立下列組態:
- Azure 虛擬網路
- 站對站 VPN 閘道
- Azure 儲存體 帳戶(HDInsight 使用)
- HDInsight 上的 Kafka
若要確認 Kafka 用戶端可以從內部部署連線到叢集,請使用範例:Python 用戶端一節中的步驟。
使用 VPN 用戶端 連線 至 Apache Kafka
使用本節中的步驟來建立下列組態:
- Azure 虛擬網路
- 點對站 VPN 閘道
- Azure 儲存體 帳戶 (HDInsight 使用)
- HDInsight 上的 Kafka
請遵循使用點對站聯機檔的自我簽署憑證中的步驟。 本檔會建立閘道所需的憑證。
開啟 PowerShell 提示字元,並使用下列程式代碼登入您的 Azure 訂用帳戶:
Connect-AzAccount # If you have multiple subscriptions, uncomment to set the subscription #Select-AzSubscription -SubscriptionName "name of your subscription"
使用下列程式代碼來建立包含組態資訊的變數:
# Prompt for generic information $resourceGroupName = Read-Host "What is the resource group name?" $baseName = Read-Host "What is the base name? It is used to create names for resources, such as 'net-basename' and 'kafka-basename':" $location = Read-Host "What Azure Region do you want to create the resources in?" $rootCert = Read-Host "What is the file path to the root certificate? It is used to secure the VPN gateway." # Prompt for HDInsight credentials $adminCreds = Get-Credential -Message "Enter the HTTPS user name and password for the HDInsight cluster" -UserName "admin" $sshCreds = Get-Credential -Message "Enter the SSH user name and password for the HDInsight cluster" -UserName "sshuser" # Names for Azure resources $networkName = "net-$baseName" $clusterName = "kafka-$baseName" $storageName = "store$baseName" # Can't use dashes in storage names $defaultContainerName = $clusterName $defaultSubnetName = "default" $gatewaySubnetName = "GatewaySubnet" $gatewayPublicIpName = "GatewayIp" $gatewayIpConfigName = "GatewayConfig" $vpnRootCertName = "rootcert" $vpnName = "VPNGateway" # Network settings $networkAddressPrefix = "10.0.0.0/16" $defaultSubnetPrefix = "10.0.0.0/24" $gatewaySubnetPrefix = "10.0.1.0/24" $vpnClientAddressPool = "172.16.201.0/24" # HDInsight settings $hdiWorkerNodes = 4 $hdiVersion = "3.6" $hdiType = "Kafka"
使用下列程式代碼來建立 Azure 資源群組和虛擬網路:
# Create the resource group that contains everything New-AzResourceGroup -Name $resourceGroupName -Location $location # Create the subnet configuration $defaultSubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName ` -AddressPrefix $defaultSubnetPrefix $gatewaySubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName ` -AddressPrefix $gatewaySubnetPrefix # Create the subnet New-AzVirtualNetwork -Name $networkName ` -ResourceGroupName $resourceGroupName ` -Location $location ` -AddressPrefix $networkAddressPrefix ` -Subnet $defaultSubnetConfig, $gatewaySubnetConfig # Get the network & subnet that were created $network = Get-AzVirtualNetwork -Name $networkName ` -ResourceGroupName $resourceGroupName $gatewaySubnet = Get-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName ` -VirtualNetwork $network $defaultSubnet = Get-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName ` -VirtualNetwork $network # Set a dynamic public IP address for the gateway subnet $gatewayPublicIp = New-AzPublicIpAddress -Name $gatewayPublicIpName ` -ResourceGroupName $resourceGroupName ` -Location $location ` -AllocationMethod Dynamic $gatewayIpConfig = New-AzVirtualNetworkGatewayIpConfig -Name $gatewayIpConfigName ` -Subnet $gatewaySubnet ` -PublicIpAddress $gatewayPublicIp # Get the certificate info # Get the full path in case a relative path was passed $rootCertFile = Get-ChildItem $rootCert $cert = New-Object System.Security.Cryptography.X509Certificates.X509Certificate2($rootCertFile) $certBase64 = [System.Convert]::ToBase64String($cert.RawData) $p2sRootCert = New-AzVpnClientRootCertificate -Name $vpnRootCertName ` -PublicCertData $certBase64 # Create the VPN gateway New-AzVirtualNetworkGateway -Name $vpnName ` -ResourceGroupName $resourceGroupName ` -Location $location ` -IpConfigurations $gatewayIpConfig ` -GatewayType Vpn ` -VpnType RouteBased ` -EnableBgp $false ` -GatewaySku Standard ` -VpnClientAddressPool $vpnClientAddressPool ` -VpnClientRootCertificates $p2sRootCert
警告
此程式可能需要幾分鐘的時間才能完成。
使用下列程式代碼來建立 Azure 儲存體 帳戶和 Blob 容器:
# Create the storage account New-AzStorageAccount ` -ResourceGroupName $resourceGroupName ` -Name $storageName ` -SkuName Standard_GRS ` -Location $location ` -Kind StorageV2 ` -EnableHttpsTrafficOnly 1 # Get the storage account keys and create a context $defaultStorageKey = (Get-AzStorageAccountKey -ResourceGroupName $resourceGroupName ` -Name $storageName)[0].Value $storageContext = New-AzStorageContext -StorageAccountName $storageName ` -StorageAccountKey $defaultStorageKey # Create the default storage container New-AzStorageContainer -Name $defaultContainerName ` -Context $storageContext
使用下列程式代碼來建立 HDInsight 叢集:
# Create the HDInsight cluster New-AzHDInsightCluster ` -ResourceGroupName $resourceGroupName ` -ClusterName $clusterName ` -Location $location ` -ClusterSizeInNodes $hdiWorkerNodes ` -ClusterType $hdiType ` -OSType Linux ` -Version $hdiVersion ` -HttpCredential $adminCreds ` -SshCredential $sshCreds ` -DefaultStorageAccountName "$storageName.blob.core.windows.net" ` -DefaultStorageAccountKey $defaultStorageKey ` -DefaultStorageContainer $defaultContainerName ` -DisksPerWorkerNode 2 ` -VirtualNetworkId $network.Id ` -SubnetName $defaultSubnet.Id
警告
此程序大約需要15分鐘才能完成。
設定 Kafka 以進行 IP 廣告
根據預設,Apache Zookeeper 會將 Kafka 訊息代理程式功能變數名稱傳回給用戶端。 此設定不適用於 VPN 軟體客戶端,因為它無法使用虛擬網路中實體的名稱解析。 針對此設定,請使用下列步驟來設定 Kafka 來公告 IP 位址,而不是功能變數名稱:
使用網頁瀏覽器移至
https://CLUSTERNAME.azurehdinsight.net
。 將取代CLUSTERNAME
為 HDInsight 叢集上的 Kafka 名稱。出現提示時,請使用叢集的 HTTPS 使用者名稱和密碼。 叢集的Ambari Web UI隨即顯示。
若要檢視 Kafka 的資訊,請從左側清單中選取 Kafka 。
若要檢視 Kafka 組態,請從頂端中間選取 [設定 ]。
若要尋找 kafka-env 組態,請在右上方的 [篩選] 字段中輸入
kafka-env
。若要將 Kafka 設定為公告 IP 位址,請將下列文字新增至 kafka-env-template 字段底部:
# Configure Kafka to advertise IP addresses instead of FQDN IP_ADDRESS=$(hostname -i) echo advertised.listeners=$IP_ADDRESS sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
若要設定 Kafka 接聽的介面,請在右上方的 [篩選] 字段中輸入
listeners
。若要將 Kafka 設定為在所有網路介面上接聽,請將接 聽程式 欄位中的值變更為
PLAINTEXT://0.0.0.0:9092
。若要儲存組態變更,請使用 [ 儲存] 按鈕。 輸入描述變更的簡訊。 儲存變更之後,請選取 [ 確定 ]。
若要防止重新啟動 Kafka 時發生錯誤,請使用 [ 服務動作] 按鈕,然後選取 [ 開啟維護模式]。 選取 [確定] 以完成這項作業。
若要重新啟動 Kafka,請使用 [重新啟動] 按鈕,然後選取 [重新啟動所有受影響的]。 確認重新啟動,然後在作業完成之後使用 [確定 ] 按鈕。
若要停用維護模式,請使用 [ 服務動作] 按鈕,然後選取 [ 關閉維護模式]。 選取 [ 確定 ] 以完成這項作業。
連線 至 VPN 閘道
若要連線到 VPN 閘道,請使用設定點對站連線檔的 連線 至 Azure 一節。
範例:Python 用戶端
若要驗證 Kafka 的連線,請使用下列步驟來建立和執行 Python 產生者和取用者:
使用下列其中一種方法來擷取 Kafka 叢集中節點的完整域名 (FQDN) 和 IP 位址:
$resourceGroupName = "The resource group that contains the virtual network used with HDInsight" $clusterNICs = Get-AzNetworkInterface -ResourceGroupName $resourceGroupName | where-object {$_.Name -like "*node*"} $nodes = @() foreach($nic in $clusterNICs) { $node = new-object System.Object $node | add-member -MemberType NoteProperty -name "Type" -value $nic.Name.Split('-')[1] $node | add-member -MemberType NoteProperty -name "InternalIP" -value $nic.IpConfigurations.PrivateIpAddress $node | add-member -MemberType NoteProperty -name "InternalFQDN" -value $nic.DnsSettings.InternalFqdn $nodes += $node } $nodes | sort-object Type
az network nic list --resource-group <resourcegroupname> --output table --query "[?contains(name,'node')].{NICname:name,InternalIP:ipConfigurations[0].privateIpAddress,InternalFQDN:dnsSettings.internalFqdn}"
此文稿假設
$resourceGroupName
是包含虛擬網路的 Azure 資源群組名稱。儲存傳回的資訊以供後續步驟使用。
使用下列命令來安裝 kafka-python 用戶端:
pip install kafka-python
若要將數據傳送至 Kafka,請使用下列 Python 程式代碼:
from kafka import KafkaProducer # Replace the `ip_address` entries with the IP address of your worker nodes # NOTE: you don't need the full list of worker nodes, just one or two. producer = KafkaProducer(bootstrap_servers=['kafka_broker_1','kafka_broker_2']) for _ in range(50): producer.send('testtopic', b'test message')
將
'kafka_broker'
專案取代為本節中步驟 1 傳回的位址:如果您使用 軟體 VPN 用戶端,請將專案取代
kafka_broker
為背景工作節點的 IP 位址。如果您已 透過自定義 DNS 伺服器啟用名稱解析,請將專案取代
kafka_broker
為背景工作節點的 FQDN。注意
此程式代碼會將字串
test message
傳送至 主題testtopic
。 HDInsight 上 Kafka 的預設設定,如果主題不存在,則不會建立主題。 請參閱 如何在 HDInsight 上設定 Apache Kafka 來自動建立主題。 或者,您可以在產生訊息之前手動建立主題。
若要從 Kafka 擷取訊息,請使用下列 Python 程式代碼:
from kafka import KafkaConsumer # Replace the `ip_address` entries with the IP address of your worker nodes # Again, you only need one or two, not the full list. # Note: auto_offset_reset='earliest' resets the starting offset to the beginning # of the topic consumer = KafkaConsumer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'],auto_offset_reset='earliest') consumer.subscribe(['testtopic']) for msg in consumer: print (msg)
將
'kafka_broker'
專案取代為本節中步驟 1 傳回的位址:如果您使用 軟體 VPN 用戶端,請將專案取代
kafka_broker
為背景工作節點的 IP 位址。如果您已 透過自定義 DNS 伺服器啟用名稱解析,請將專案取代
kafka_broker
為背景工作節點的 FQDN。
下一步
如需搭配虛擬網路使用 HDInsight 的詳細資訊,請參閱 規劃 Azure HDInsight 叢集的 虛擬網路部署檔。
如需使用點對站 VPN 閘道建立 Azure 虛擬網絡 的詳細資訊,請參閱下列檔:
如需在 HDInsight 上使用 Apache Kafka 的詳細資訊,請參閱下列檔: