Подключение к Apache Kafka в HDInsight с помощью виртуальной сети Azure

Узнайте, как подключиться напрямую к Apache Kafka в HDInsight с помощью виртуальной сети Azure. В этой статье приведены сведения о подключении к Kafka с использованием приведенных ниже конфигураций.

  • Из ресурсов в локальной сети. Это подключение устанавливается с помощью VPN-устройства (программного или аппаратного) в локальной сети.
  • Из среды разработки с помощью программного VPN-клиента.

Примечание.

Мы рекомендуем использовать модуль Azure Az PowerShell для взаимодействия с Azure. Чтобы начать работу, см. статью Установка Azure PowerShell. Дополнительные сведения см. в статье Перенос Azure PowerShell с AzureRM на Az.

Архитектура и планирование

HDInsight не разрешает прямое подключение к Kafka через общедоступный Интернет. Вместо этого клиенты Kafka (производители и потребители) должны использовать один из следующих методов подключения.

  • Запустите клиент в той же виртуальной сети, что и Kafka в HDInsight. Сведения об использовании этой конфигурации см. в статье Приступая к работе с Apache Kafka в HDInsight. Клиент работает непосредственно на узлах кластера HDInsight или на другой виртуальной машине в той же сети.

  • Подключите частную сеть, например локальную, к виртуальной сети. Эта конфигурация разрешает клиентам в локальной сети напрямую работать с Kafka. Чтобы включить эту конфигурацию, сделайте следующее:

    1. Создайте виртуальную сеть.

    2. Создайте VPN-шлюз, использующий конфигурацию типа "сеть — сеть". Конфигурация, используемая в этой статье, подключается к устройству шлюза VPN в локальной сети.

    3. Создайте DNS-сервер в виртуальной сети.

    4. Настройте переадресацию между DNS-серверами в каждой сети.

    5. Создайте Kafka в кластере HDInsight в виртуальной сети.

      Дополнительные сведения см. в разделе Подключение к Apache Kafka из локальной сети.

  • Подключите отдельные виртуальные машины к виртуальной сети с помощью VPN-шлюза и VPN-клиента. Чтобы включить эту конфигурацию, сделайте следующее:

    1. Создайте виртуальную сеть.

    2. Создайте VPN-шлюз, использующий конфигурацию типа "точка — сеть". Эту конфигурацию можно использовать с клиентами Windows и MacOS.

    3. Создайте Kafka в кластере HDInsight в виртуальной сети.

    4. Настройте Kafka для объявления IP-адресов. Такая конфигурация позволяет клиенту подключаться с помощью IP-адресов брокера вместо использования доменных имен.

    5. Скачайте и используйте VPN-клиент в системе разработки.

      Дополнительные сведения см. в разделе Подключение к Apache Kafka с помощью VPN-клиента.

      Предупреждение

      Эта конфигурация рекомендуется только для целей разработки из-за следующих ограничений:

      • Каждый клиент должен подключиться с помощью программного VPN-клиента.
      • Клиент VPN не передает запросы на разрешения имен в виртуальную сеть, поэтому для обмена данными с Kafka необходимо использовать IP-адрес. Для взаимодействия с IP-адресом требуется дополнительная настройка кластера Kafka.

Дополнительные сведения об использовании HDInsight в виртуальной сети см. в статье Планирование виртуальной сети для кластеров Azure HDInsight.

Подключение в Apache Kafka из локальной сети

Чтобы создать кластер Kafka, который взаимодействует с локальной сетью, выполните действия, описанные в статье Подключение HDInsight к локальной сети.

Внимание

При создании кластера HDInsight выберите тип кластера Kafka.

Выполнив эти действия, вы создадите следующую конфигурацию:

  • Виртуальная сеть Azure
  • VPN-шлюз типа "сеть — сеть";
  • учетная запись хранения Azure (используемая HDInsight);
  • Kafka в HDInsight

Чтобы убедиться, что клиент Kafka может подключиться к кластеру из локальной сети, ознакомьтесь с разделом Пример: клиент Python.

Подключение в Apache Kafka с VPN-клиентом

В этом разделе описаны действия по созданию следующей конфигурации:

  • Виртуальная сеть Azure
  • VPN-шлюз типа "точка — сеть"
  • Учетная запись хранения Azure (используется HDInsight)
  • Kafka в HDInsight
  1. Дополнительные сведения см. в статье Создание и экспорт сертификатов для подключений типа "точка — сеть" с помощью PowerShell в Windows 10. Там приведены действия по созданию сертификатов, необходимых для шлюза.

  2. Откройте командную строку PowerShell и используйте следующий код, чтобы войти в подписку Azure:

    Connect-AzAccount
    # If you have multiple subscriptions, uncomment to set the subscription
    #Select-AzSubscription -SubscriptionName "name of your subscription"
    
  3. Используйте следующий код, чтобы создать переменные, которые содержат сведения о конфигурации:

    # Prompt for generic information
    $resourceGroupName = Read-Host "What is the resource group name?"
    $baseName = Read-Host "What is the base name? It is used to create names for resources, such as 'net-basename' and 'kafka-basename':"
    $location = Read-Host "What Azure Region do you want to create the resources in?"
    $rootCert = Read-Host "What is the file path to the root certificate? It is used to secure the VPN gateway."
    
    # Prompt for HDInsight credentials
    $adminCreds = Get-Credential -Message "Enter the HTTPS user name and password for the HDInsight cluster" -UserName "admin"
    $sshCreds = Get-Credential -Message "Enter the SSH user name and password for the HDInsight cluster" -UserName "sshuser"
    
    # Names for Azure resources
    $networkName = "net-$baseName"
    $clusterName = "kafka-$baseName"
    $storageName = "store$baseName" # Can't use dashes in storage names
    $defaultContainerName = $clusterName
    $defaultSubnetName = "default"
    $gatewaySubnetName = "GatewaySubnet"
    $gatewayPublicIpName = "GatewayIp"
    $gatewayIpConfigName = "GatewayConfig"
    $vpnRootCertName = "rootcert"
    $vpnName = "VPNGateway"
    
    # Network settings
    $networkAddressPrefix = "10.0.0.0/16"
    $defaultSubnetPrefix = "10.0.0.0/24"
    $gatewaySubnetPrefix = "10.0.1.0/24"
    $vpnClientAddressPool = "172.16.201.0/24"
    
    # HDInsight settings
    $hdiWorkerNodes = 4
    $hdiVersion = "3.6"
    $hdiType = "Kafka"
    
  4. Используйте следующий код, чтобы создать группу ресурсов и виртуальную сеть Azure:

    # Create the resource group that contains everything
    New-AzResourceGroup -Name $resourceGroupName -Location $location
    
    # Create the subnet configuration
    $defaultSubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -AddressPrefix $defaultSubnetPrefix
    $gatewaySubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -AddressPrefix $gatewaySubnetPrefix
    
    # Create the subnet
    New-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AddressPrefix $networkAddressPrefix `
        -Subnet $defaultSubnetConfig, $gatewaySubnetConfig
    
    # Get the network & subnet that were created
    $network = Get-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName
    $gatewaySubnet = Get-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -VirtualNetwork $network
    $defaultSubnet = Get-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -VirtualNetwork $network
    
    # Set a dynamic public IP address for the gateway subnet
    $gatewayPublicIp = New-AzPublicIpAddress -Name $gatewayPublicIpName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AllocationMethod Dynamic
    $gatewayIpConfig = New-AzVirtualNetworkGatewayIpConfig -Name $gatewayIpConfigName `
        -Subnet $gatewaySubnet `
        -PublicIpAddress $gatewayPublicIp
    
    # Get the certificate info
    # Get the full path in case a relative path was passed
    $rootCertFile = Get-ChildItem $rootCert
    $cert = New-Object System.Security.Cryptography.X509Certificates.X509Certificate2($rootCertFile)
    $certBase64 = [System.Convert]::ToBase64String($cert.RawData)
    $p2sRootCert = New-AzVpnClientRootCertificate -Name $vpnRootCertName `
        -PublicCertData $certBase64
    
    # Create the VPN gateway
    New-AzVirtualNetworkGateway -Name $vpnName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -IpConfigurations $gatewayIpConfig `
        -GatewayType Vpn `
        -VpnType RouteBased `
        -EnableBgp $false `
        -GatewaySku Standard `
        -VpnClientAddressPool $vpnClientAddressPool `
        -VpnClientRootCertificates $p2sRootCert
    

    Предупреждение

    Этот процесс может занять несколько минут.

  5. Используйте следующий код, чтобы создать учетную запись хранения Azure и контейнер BLOB-объектов:

    # Create the storage account
    New-AzStorageAccount `
        -ResourceGroupName $resourceGroupName `
        -Name $storageName `
        -SkuName Standard_GRS `
        -Location $location `
        -Kind StorageV2 `
        -EnableHttpsTrafficOnly 1
    
    # Get the storage account keys and create a context
    $defaultStorageKey = (Get-AzStorageAccountKey -ResourceGroupName $resourceGroupName `
        -Name $storageName)[0].Value
    $storageContext = New-AzStorageContext -StorageAccountName $storageName `
        -StorageAccountKey $defaultStorageKey
    
    # Create the default storage container
    New-AzStorageContainer -Name $defaultContainerName `
        -Context $storageContext
    
  6. Используйте следующий код, чтобы создать кластер HDInsight:

    # Create the HDInsight cluster
    New-AzHDInsightCluster `
        -ResourceGroupName $resourceGroupName `
        -ClusterName $clusterName `
        -Location $location `
        -ClusterSizeInNodes $hdiWorkerNodes `
        -ClusterType $hdiType `
        -OSType Linux `
        -Version $hdiVersion `
        -HttpCredential $adminCreds `
        -SshCredential $sshCreds `
        -DefaultStorageAccountName "$storageName.blob.core.windows.net" `
        -DefaultStorageAccountKey $defaultStorageKey `
        -DefaultStorageContainer $defaultContainerName `
        -DisksPerWorkerNode 2 `
        -VirtualNetworkId $network.Id `
        -SubnetName $defaultSubnet.Id
    

    Предупреждение

    Эта процедура занимает около 15 минут.

Настройка Kafka для объявления IP-адресов

По умолчанию Apache Zookeeper возвращает клиентам доменное имя брокеров Kafka. Эта конфигурация не работает для программного VPN-клиента, так как он не может использовать разрешение имен для сущностей в виртуальной сети. Для этой конфигурации выполните следующие действия, чтобы настроить Kafka для объявления IP-адресов вместо доменных имен:

  1. Откройте веб-браузер и перейдите по адресу https://CLUSTERNAME.azurehdinsight.net. Замените CLUSTERNAME именем Kafka в кластере HDInsight.

    При появлении запроса введите имя пользователя и пароль HTTPS для кластера. Отобразится веб-интерфейс Ambari для кластера.

  2. Чтобы просмотреть сведения о Kafka, из списка слева выберите Kafka.

    Service list with Kafka highlighted.

  3. Чтобы просмотреть конфигурацию Kafka, выберите пункт Configs (Конфигурации) в верхней части окна.

    Apache Ambari services configuration.

  4. Чтобы найти конфигурацию kafka-env, введите kafka-env в поле фильтра в правом верхнем углу.

    Kafka configuration, for kafka-env.

  5. Чтобы настроить Kafka для объявления IP-адресов, добавьте следующий текст в нижнюю часть поля kafka-env template (шаблон kafka-env):

    # Configure Kafka to advertise IP addresses instead of FQDN
    IP_ADDRESS=$(hostname -i)
    echo advertised.listeners=$IP_ADDRESS
    sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
    echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
    
  6. Чтобы настроить интерфейс, через который Kafka ожидает передачи данных, введите listeners в поле фильтра в правом верхнем углу.

  7. Чтобы настроить Kafka для ожидания передачи данных через все сетевые интерфейсы, измените значение в поле listeners на PLAINTEXT://0.0.0.0:9092.

  8. Нажмите кнопку Save (Сохранить), чтобы сохранить изменения в конфигурации. Введите текст, описывающий изменения. После сохранения изменений нажмите кнопку ОК.

    Apache Ambari save configuration.

  9. Для предотвращения ошибок при перезапуске Kafka нажмите кнопку Service Actions (Действия со службой) и выберите Turn On Maintenance Mode (Включить режим обслуживания). Чтобы завершить эту операцию, нажмите кнопку ОК.

    Service actions, with turn on maintenance highlighted.

  10. Чтобы перезапустить Kafka, нажмите кнопку Restart (Перезапустить) и выберите Restart All Affected (Перезапустить все затронутые). Подтвердите перезапуск, а после завершения операции нажмите кнопку ОК.

    Restart button with restart all affected highlighted.

  11. Чтобы отключить режим обслуживания нажмите кнопку Service Actions (Действия со службой) и выберите Turn Off Maintenance Mode (Отключить режим обслуживания). Чтобы завершить эту операцию, нажмите кнопку ОК.

Подключение к VPN-шлюзу

Для подключения к VPN-шлюзу используйте раздел Подключение к Azure статьи Настройка подключения типа "точка — сеть".

Пример: клиент Python

Чтобы проверить подключение к Kafka, выполните следующие действия для создания и запуска производителя и потребителя Python:

  1. Чтобы получить полное доменное имя (FQDN) и IP-адреса узлов кластера Kafka, используйте один из следующих методов.

    $resourceGroupName = "The resource group that contains the virtual network used with HDInsight"
    
    $clusterNICs = Get-AzNetworkInterface -ResourceGroupName $resourceGroupName | where-object {$_.Name -like "*node*"}
    
    $nodes = @()
    foreach($nic in $clusterNICs) {
        $node = new-object System.Object
        $node | add-member -MemberType NoteProperty -name "Type" -value $nic.Name.Split('-')[1]
        $node | add-member -MemberType NoteProperty -name "InternalIP" -value $nic.IpConfigurations.PrivateIpAddress
        $node | add-member -MemberType NoteProperty -name "InternalFQDN" -value $nic.DnsSettings.InternalFqdn
        $nodes += $node
    }
    $nodes | sort-object Type
    
    az network nic list --resource-group <resourcegroupname> --output table --query "[?contains(name,'node')].{NICname:name,InternalIP:ipConfigurations[0].privateIpAddress,InternalFQDN:dnsSettings.internalFqdn}"
    

    В этом сценарии предполагается, что $resourceGroupName — это имя группы ресурсов Azure, содержащей виртуальную сеть.

    Сохраните полученные данные для использования на последующих шагах.

  2. Чтобы установить клиент kafka-python, используйте следующую команду:

    pip install kafka-python
    
  3. Чтобы отправить данные в Kafka, используйте следующий код Python:

    from kafka import KafkaProducer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # NOTE: you don't need the full list of worker nodes, just one or two.
    producer = KafkaProducer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'])
    for _ in range(50):
       producer.send('testtopic', b'test message')
    

    Замените записи 'kafka_broker' адресами, полученными на шаге 1 этого раздела.

    • При использовании программного VPN-клиента замените записи kafka_broker IP-адресом рабочих узлов.

    • Если вы включили разрешение имен через пользовательский DNS-сервер, замените записи kafka_broker полным доменным именем рабочих узлов.

      Примечание.

      Этот код отправляет строку test message в раздел testtopic. По умолчанию Kafka в HDInsight не создает раздел, если он не существует. См. раздел Настройка автоматического создания разделов в Apache Kafka в HDInsight. Кроме того, можно создавать разделы вручную перед созданием сообщений.

  4. Для получения сообщений от Kafka используйте следующий код Python:

    from kafka import KafkaConsumer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # Again, you only need one or two, not the full list.
    # Note: auto_offset_reset='earliest' resets the starting offset to the beginning
    #       of the topic
    consumer = KafkaConsumer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'],auto_offset_reset='earliest')
    consumer.subscribe(['testtopic'])
    for msg in consumer:
      print (msg)
    

    Замените записи 'kafka_broker' адресами, полученными на шаге 1 этого раздела.

    • При использовании программного VPN-клиента замените записи kafka_broker IP-адресом рабочих узлов.

    • Если вы включили разрешение имен через пользовательский DNS-сервер, замените записи kafka_broker полным доменным именем рабочих узлов.

Следующие шаги

Дополнительные сведения об использовании HDInsight в виртуальной сети см. в статье Планирование развертывания виртуальной сети для кластеров Azure HDInsight.

Дополнительные сведения о создании виртуальной сети Azure с VPN-шлюзом типа "точка — сеть" см. в следующих документах:

Дополнительные сведения о работе с Apache Kafka HDInsight см. в следующих документах: