Nawiązywanie połączenia z platformą Apache Kafka w usłudze HDInsight przy użyciu sieci wirtualnej platformy Azure

Dowiedz się, jak bezpośrednio nawiązać połączenie z platformą Apache Kafka w usłudze HDInsight za pośrednictwem sieci wirtualnej platformy Azure. Ten dokument zawiera informacje dotyczące nawiązywania połączenia z platformą Kafka przy użyciu następujących konfiguracji:

  • Z zasobów w sieci lokalnej. To połączenie jest ustanawiane przy użyciu urządzenia sieci VPN (oprogramowania lub sprzętu) w sieci lokalnej.
  • Ze środowiska programistycznego przy użyciu klienta oprogramowania sieci VPN.

Uwaga

Do interakcji z platformą Azure zalecamy używanie modułu Azure Az w programie PowerShell. Zobacz Instalowanie programu Azure PowerShell, aby rozpocząć. Aby dowiedzieć się, jak przeprowadzić migrację do modułu Az PowerShell, zobacz Migracja programu Azure PowerShell z modułu AzureRM do modułu Az.

Architektura i planowanie

Usługa HDInsight nie zezwala na bezpośrednie połączenie z platformą Kafka za pośrednictwem publicznego Internetu. Zamiast tego klienci platformy Kafka (producenci i konsumenci) muszą używać jednej z następujących metod połączenia:

  • Uruchom klienta w tej samej sieci wirtualnej co platforma Kafka w usłudze HDInsight. Ta konfiguracja jest używana w dokumencie Start with Apache Kafka on HDInsight (Rozpoczynanie pracy z platformą Apache Kafka w usłudze HDInsight ). Klient działa bezpośrednio w węzłach klastra usługi HDInsight lub na innej maszynie wirtualnej w tej samej sieci.

  • Połączenie sieci prywatnej, takiej jak sieć lokalna, do sieci wirtualnej. Ta konfiguracja umożliwia klientom w sieci lokalnej bezpośrednią pracę z platformą Kafka. Aby włączyć tę konfigurację, wykonaj następujące zadania:

    1. Utwórz sieć prywatną.

    2. Utwórz bramę sieci VPN, która używa konfiguracji lokacja-lokacja. Konfiguracja używana w tym dokumencie łączy się z urządzeniem bramy sieci VPN w sieci lokalnej.

    3. Utwórz serwer DNS w sieci wirtualnej.

    4. Skonfiguruj przekazywanie między serwerem DNS w każdej sieci.

    5. Utwórz kafka w klastrze usługi HDInsight w sieci wirtualnej.

      Aby uzyskać więcej informacji, zobacz sekcję Połączenie do platformy Apache Kafka z sekcji sieci lokalnej.

  • Połączenie poszczególnych maszyn do sieci wirtualnej przy użyciu bramy sieci VPN i klienta sieci VPN. Aby włączyć tę konfigurację, wykonaj następujące zadania:

    1. Utwórz sieć prywatną.

    2. Utwórz bramę sieci VPN, która używa konfiguracji typu punkt-lokacja. Tej konfiguracji można używać zarówno z klientami systemu Windows, jak i MacOS.

    3. Utwórz kafka w klastrze usługi HDInsight w sieci wirtualnej.

    4. Konfigurowanie platformy Kafka pod kątem reklam IP. Ta konfiguracja umożliwia klientowi nawiązywanie połączenia przy użyciu adresów IP brokera zamiast nazw domen.

    5. Pobierz klienta sieci VPN i użyj go w systemie programistycznym.

      Aby uzyskać więcej informacji, zobacz sekcję Połączenie na platformie Apache Kafka z klientem sieci VPN.

      Ostrzeżenie

      Ta konfiguracja jest zalecana tylko do celów programistycznych z powodu następujących ograniczeń:

      • Każdy klient musi nawiązać połączenie przy użyciu klienta oprogramowania sieci VPN.
      • Klient sieci VPN nie przekazuje żądań rozpoznawania nazw do sieci wirtualnej, dlatego należy użyć adresowania IP do komunikowania się z platformą Kafka. Komunikacja z adresem IP wymaga dodatkowej konfiguracji w klastrze platformy Kafka.

Aby uzyskać więcej informacji na temat korzystania z usługi HDInsight w sieci wirtualnej, zobacz Planowanie sieci wirtualnej dla klastrów usługi Azure HDInsight.

Połączenie do platformy Apache Kafka z sieci lokalnej

Aby utworzyć klaster platformy Kafka komunikujący się z siecią lokalną, wykonaj kroki opisane w dokumencie Połączenie HDInsight w lokalnej sieci.

Ważne

Podczas tworzenia klastra usługi HDInsight wybierz typ klastra Kafka .

Te kroki umożliwiają utworzenie następującej konfiguracji:

  • Azure Virtual Network
  • Brama sieci VPN typu lokacja-lokacja
  • Konto usługi Azure Storage (używane przez usługę HDInsight)
  • Usługa Kafka w usłudze HDInsight

Aby sprawdzić, czy klient platformy Kafka może nawiązać połączenie z klastrem ze środowiska lokalnego, wykonaj kroki opisane w sekcji Przykład: klient języka Python.

Połączenie na platformę Apache Kafka przy użyciu klienta sieci VPN

Wykonaj kroki opisane w tej sekcji, aby utworzyć następującą konfigurację:

  • Azure Virtual Network
  • Brama sieci VPN typu punkt-lokacja
  • Konto usługi Azure Storage (używane przez usługę HDInsight)
  • Usługa Kafka w usłudze HDInsight
  1. Wykonaj kroki opisane w dokumencie Praca z certyfikatami z podpisem własnym dla połączeń punkt-lokacja . Ten dokument tworzy certyfikaty wymagane dla bramy.

  2. Otwórz wiersz polecenia programu PowerShell i użyj następującego kodu, aby zalogować się do subskrypcji platformy Azure:

    Connect-AzAccount
    # If you have multiple subscriptions, uncomment to set the subscription
    #Select-AzSubscription -SubscriptionName "name of your subscription"
    
  3. Użyj następującego kodu, aby utworzyć zmienne zawierające informacje o konfiguracji:

    # Prompt for generic information
    $resourceGroupName = Read-Host "What is the resource group name?"
    $baseName = Read-Host "What is the base name? It is used to create names for resources, such as 'net-basename' and 'kafka-basename':"
    $location = Read-Host "What Azure Region do you want to create the resources in?"
    $rootCert = Read-Host "What is the file path to the root certificate? It is used to secure the VPN gateway."
    
    # Prompt for HDInsight credentials
    $adminCreds = Get-Credential -Message "Enter the HTTPS user name and password for the HDInsight cluster" -UserName "admin"
    $sshCreds = Get-Credential -Message "Enter the SSH user name and password for the HDInsight cluster" -UserName "sshuser"
    
    # Names for Azure resources
    $networkName = "net-$baseName"
    $clusterName = "kafka-$baseName"
    $storageName = "store$baseName" # Can't use dashes in storage names
    $defaultContainerName = $clusterName
    $defaultSubnetName = "default"
    $gatewaySubnetName = "GatewaySubnet"
    $gatewayPublicIpName = "GatewayIp"
    $gatewayIpConfigName = "GatewayConfig"
    $vpnRootCertName = "rootcert"
    $vpnName = "VPNGateway"
    
    # Network settings
    $networkAddressPrefix = "10.0.0.0/16"
    $defaultSubnetPrefix = "10.0.0.0/24"
    $gatewaySubnetPrefix = "10.0.1.0/24"
    $vpnClientAddressPool = "172.16.201.0/24"
    
    # HDInsight settings
    $hdiWorkerNodes = 4
    $hdiVersion = "3.6"
    $hdiType = "Kafka"
    
  4. Użyj następującego kodu, aby utworzyć grupę zasobów platformy Azure i sieć wirtualną:

    # Create the resource group that contains everything
    New-AzResourceGroup -Name $resourceGroupName -Location $location
    
    # Create the subnet configuration
    $defaultSubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -AddressPrefix $defaultSubnetPrefix
    $gatewaySubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -AddressPrefix $gatewaySubnetPrefix
    
    # Create the subnet
    New-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AddressPrefix $networkAddressPrefix `
        -Subnet $defaultSubnetConfig, $gatewaySubnetConfig
    
    # Get the network & subnet that were created
    $network = Get-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName
    $gatewaySubnet = Get-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -VirtualNetwork $network
    $defaultSubnet = Get-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -VirtualNetwork $network
    
    # Set a dynamic public IP address for the gateway subnet
    $gatewayPublicIp = New-AzPublicIpAddress -Name $gatewayPublicIpName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AllocationMethod Dynamic
    $gatewayIpConfig = New-AzVirtualNetworkGatewayIpConfig -Name $gatewayIpConfigName `
        -Subnet $gatewaySubnet `
        -PublicIpAddress $gatewayPublicIp
    
    # Get the certificate info
    # Get the full path in case a relative path was passed
    $rootCertFile = Get-ChildItem $rootCert
    $cert = New-Object System.Security.Cryptography.X509Certificates.X509Certificate2($rootCertFile)
    $certBase64 = [System.Convert]::ToBase64String($cert.RawData)
    $p2sRootCert = New-AzVpnClientRootCertificate -Name $vpnRootCertName `
        -PublicCertData $certBase64
    
    # Create the VPN gateway
    New-AzVirtualNetworkGateway -Name $vpnName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -IpConfigurations $gatewayIpConfig `
        -GatewayType Vpn `
        -VpnType RouteBased `
        -EnableBgp $false `
        -GatewaySku Standard `
        -VpnClientAddressPool $vpnClientAddressPool `
        -VpnClientRootCertificates $p2sRootCert
    

    Ostrzeżenie

    Ukończenie tego procesu może potrwać kilka minut.

  5. Użyj następującego kodu, aby utworzyć konto usługi Azure Storage i kontener obiektów blob:

    # Create the storage account
    New-AzStorageAccount `
        -ResourceGroupName $resourceGroupName `
        -Name $storageName `
        -SkuName Standard_GRS `
        -Location $location `
        -Kind StorageV2 `
        -EnableHttpsTrafficOnly 1
    
    # Get the storage account keys and create a context
    $defaultStorageKey = (Get-AzStorageAccountKey -ResourceGroupName $resourceGroupName `
        -Name $storageName)[0].Value
    $storageContext = New-AzStorageContext -StorageAccountName $storageName `
        -StorageAccountKey $defaultStorageKey
    
    # Create the default storage container
    New-AzStorageContainer -Name $defaultContainerName `
        -Context $storageContext
    
  6. Użyj następującego kodu, aby utworzyć klaster usługi HDInsight:

    # Create the HDInsight cluster
    New-AzHDInsightCluster `
        -ResourceGroupName $resourceGroupName `
        -ClusterName $clusterName `
        -Location $location `
        -ClusterSizeInNodes $hdiWorkerNodes `
        -ClusterType $hdiType `
        -OSType Linux `
        -Version $hdiVersion `
        -HttpCredential $adminCreds `
        -SshCredential $sshCreds `
        -DefaultStorageAccountName "$storageName.blob.core.windows.net" `
        -DefaultStorageAccountKey $defaultStorageKey `
        -DefaultStorageContainer $defaultContainerName `
        -DisksPerWorkerNode 2 `
        -VirtualNetworkId $network.Id `
        -SubnetName $defaultSubnet.Id
    

    Ostrzeżenie

    Ukończenie tego procesu trwa około 15 minut.

Konfigurowanie platformy Kafka na potrzeby anonsowania adresów IP

Domyślnie usługa Apache Zookeeper zwraca nazwę domeny brokerów platformy Kafka klientom. Ta konfiguracja nie działa z klientem oprogramowania sieci VPN, ponieważ nie może używać rozpoznawania nazw dla jednostek w sieci wirtualnej. W przypadku tej konfiguracji wykonaj następujące kroki, aby skonfigurować platformę Kafka do anonsowania adresów IP zamiast nazw domen:

  1. Korzystając z przeglądarki internetowej, przejdź do adresu https://CLUSTERNAME.azurehdinsight.net. Zastąp CLUSTERNAME ciąg nazwą platformy Kafka w klastrze usługi HDInsight.

    Po wyświetleniu monitu użyj nazwy użytkownika i hasła HTTPS dla klastra. Zostanie wyświetlony internetowy interfejs użytkownika systemu Ambari dla klastra.

  2. Aby wyświetlić informacje na platformie Kafka, wybierz pozycję Kafka z listy po lewej stronie.

    Service list with Kafka highlighted.

  3. Aby wyświetlić konfigurację platformy Kafka, wybierz pozycję Konfiguracje z górnego środka.

    Apache Ambari services configuration.

  4. Aby znaleźć konfigurację kafka-env, wprowadź kafka-env w polu Filtr w prawym górnym rogu.

    Kafka configuration, for kafka-env.

  5. Aby skonfigurować platformę Kafka do anonsowania adresów IP, dodaj następujący tekst w dolnej części pola kafka-env-template :

    # Configure Kafka to advertise IP addresses instead of FQDN
    IP_ADDRESS=$(hostname -i)
    echo advertised.listeners=$IP_ADDRESS
    sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
    echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
    
  6. Aby skonfigurować interfejs, na który nasłuchuje platforma Kafka, wprowadź listeners w polu Filtr w prawym górnym rogu.

  7. Aby skonfigurować platformę Kafka do nasłuchiwania we wszystkich interfejsach sieciowych, zmień wartość w polu odbiorników na PLAINTEXT://0.0.0.0:9092.

  8. Aby zapisać zmiany konfiguracji, użyj przycisku Zapisz . Wprowadź wiadomość TEKSTową opisującą zmiany. Wybierz przycisk OK po zapisaniu zmian.

    Apache Ambari save configuration.

  9. Aby zapobiec błędom podczas ponownego uruchamiania platformy Kafka, użyj przycisku Akcje usługi i wybierz pozycję Włącz tryb konserwacji. Wybierz przycisk OK, aby ukończyć tę operację.

    Service actions, with turn on maintenance highlighted.

  10. Aby ponownie uruchomić platformę Kafka, użyj przycisku Uruchom ponownie i wybierz pozycję Uruchom ponownie wszystkie, których dotyczy problem. Potwierdź ponowne uruchomienie, a następnie użyj przycisku OK po zakończeniu operacji.

    Restart button with restart all affected highlighted.

  11. Aby wyłączyć tryb konserwacji, użyj przycisku Akcje usługi i wybierz pozycję Wyłącz tryb konserwacji. Wybierz przycisk OK , aby ukończyć tę operację.

Połączenie do bramy sieci VPN

Aby nawiązać połączenie z bramą sieci VPN, użyj sekcji Połączenie z platformą Azure w dokumencie Konfigurowanie połączenia punkt-lokacja.

Przykład: klient języka Python

Aby zweryfikować łączność z platformą Kafka, wykonaj następujące kroki, aby utworzyć i uruchomić producenta i konsumenta języka Python:

  1. Użyj jednej z następujących metod, aby pobrać w pełni kwalifikowaną nazwę domeny (FQDN) i adresy IP węzłów w klastrze Kafka:

    $resourceGroupName = "The resource group that contains the virtual network used with HDInsight"
    
    $clusterNICs = Get-AzNetworkInterface -ResourceGroupName $resourceGroupName | where-object {$_.Name -like "*node*"}
    
    $nodes = @()
    foreach($nic in $clusterNICs) {
        $node = new-object System.Object
        $node | add-member -MemberType NoteProperty -name "Type" -value $nic.Name.Split('-')[1]
        $node | add-member -MemberType NoteProperty -name "InternalIP" -value $nic.IpConfigurations.PrivateIpAddress
        $node | add-member -MemberType NoteProperty -name "InternalFQDN" -value $nic.DnsSettings.InternalFqdn
        $nodes += $node
    }
    $nodes | sort-object Type
    
    az network nic list --resource-group <resourcegroupname> --output table --query "[?contains(name,'node')].{NICname:name,InternalIP:ipConfigurations[0].privateIpAddress,InternalFQDN:dnsSettings.internalFqdn}"
    

    Ten skrypt zakłada, że $resourceGroupName jest to nazwa grupy zasobów platformy Azure, która zawiera sieć wirtualną.

    Zapisz zwrócone informacje do użycia w następnych krokach.

  2. Zainstaluj klienta platformy kafka-python przy użyciu następujących elementów:

    pip install kafka-python
    
  3. Aby wysłać dane do platformy Kafka, użyj następującego kodu w języku Python:

    from kafka import KafkaProducer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # NOTE: you don't need the full list of worker nodes, just one or two.
    producer = KafkaProducer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'])
    for _ in range(50):
       producer.send('testtopic', b'test message')
    

    'kafka_broker' Zastąp wpisy adresami zwróconymi z kroku 1 w tej sekcji:

    • Jeśli używasz klienta sieci VPN oprogramowania, zastąp kafka_broker wpisy adresem IP węzłów procesu roboczego.

    • Jeśli włączono rozpoznawanie nazw za pomocą niestandardowego serwera DNS, zastąp kafka_broker wpisy nazwą FQDN węzłów procesu roboczego.

      Uwaga

      Ten kod wysyła ciąg test message do tematu testtopic. Domyślną konfiguracją platformy Kafka w usłudze HDInsight nie jest utworzenie tematu, jeśli nie istnieje. Zobacz Jak skonfigurować platformę Apache Kafka w usłudze HDInsight, aby automatycznie tworzyć tematy. Alternatywnie można tworzyć tematy ręcznie przed utworzeniem komunikatów.

  4. Aby pobrać komunikaty z platformy Kafka, użyj następującego kodu w języku Python:

    from kafka import KafkaConsumer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # Again, you only need one or two, not the full list.
    # Note: auto_offset_reset='earliest' resets the starting offset to the beginning
    #       of the topic
    consumer = KafkaConsumer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'],auto_offset_reset='earliest')
    consumer.subscribe(['testtopic'])
    for msg in consumer:
      print (msg)
    

    'kafka_broker' Zastąp wpisy adresami zwróconymi z kroku 1 w tej sekcji:

    • Jeśli używasz klienta sieci VPN oprogramowania, zastąp kafka_broker wpisy adresem IP węzłów procesu roboczego.

    • Jeśli włączono rozpoznawanie nazw za pomocą niestandardowego serwera DNS, zastąp kafka_broker wpisy nazwą FQDN węzłów procesu roboczego.

Następne kroki

Aby uzyskać więcej informacji na temat korzystania z usługi HDInsight z siecią wirtualną, zobacz dokument Planowanie wdrożenia sieci wirtualnej dla klastrów usługi Azure HDInsight.

Aby uzyskać więcej informacji na temat tworzenia sieci wirtualnej platformy Azure z bramą sieci VPN typu punkt-lokacja, zobacz następujące dokumenty:

Aby uzyskać więcej informacji na temat pracy z platformą Apache Kafka w usłudze HDInsight, zobacz następujące dokumenty: