Connettersi ad Apache Kafka in HDInsight tramite una rete virtuale di Azure

Informazioni su come connettersi direttamente ad Apache Kafka in HDInsight tramite una rete virtuale di Azure. Questo documento contiene informazioni sulla connessione a Kafka con le configurazioni seguenti:

  • Dalle risorse in una rete locale. La connessione viene stabilita tramite un dispositivo VPN, un software o un hardware, nella rete locale.
  • Da un ambiente di sviluppo tramite un client software VPN.

Nota

È consigliabile usare il modulo Azure Az PowerShell per interagire con Azure. Per iniziare, vedere Installare Azure PowerShell. Per informazioni su come eseguire la migrazione al modulo AZ PowerShell, vedere Eseguire la migrazione di Azure PowerShell da AzureRM ad Az.

Architettura e pianificazione

HDInsight non consente la connessione diretta a Kafka nella rete Internet pubblica. I client di Kafka, produttori e clienti, invece, devono usare uno dei seguenti metodi di connessione:

  • Eseguire il client nella stessa rete virtuale di Kafka in HDInsight. Questa configurazione viene usata nel documento Iniziare a usare Apache Kafka in HDInsight. Il client viene eseguito direttamente nei nodi del cluster di HDInsight o in un'altra macchina virtuale nella stessa rete.

  • Collegare una rete privata, ad esempio la rete locale, alla rete virtuale. Questa configurazione consente ai client nella rete locale per lavorare direttamente con Kafka. Per abilitare questa configurazione, eseguire le attività seguenti:

    1. Crea una rete virtuale.

    2. Creare un gateway VPN che usi una configurazione da sito a sito. La configurazione usata in questo documento si connette a un dispositivo gateway VPN nella rete locale.

    3. Creare un server DNS nella rete virtuale.

    4. Configurare l'inoltro tra il server DNS in ogni rete.

    5. Creare un cluster Kafka in HDInsight nella rete virtuale.

      Per altre informazioni, vedere la sezione Connettersi ad Apache Kafka da una rete locale.

  • Connettere i computer individualmente alla rete virtuale usando un gateway VPN e il client VPN. Per abilitare questa configurazione, eseguire le attività seguenti:

    1. Crea una rete virtuale.

    2. Creare un gateway VPN che usi una configurazione da punto a sito. Questa configurazione può essere usata sia con i client Windows che con i client MacOS.

    3. Creare un cluster Kafka in HDInsight nella rete virtuale.

    4. Configurare Kafka per la pubblicità IP. Questa configurazione consente al client di connettersi usando l'indirizzo IP broker anziché i nomi di dominio.

    5. Scaricare e usare il client VPN nel sistema di sviluppo.

      Per altre informazioni, vedere la sezione Connettersi ad Apache Kafka con un client VPN.

      Avviso

      Questa configurazione è consigliata solo per scopi di sviluppo a causa delle limitazioni seguenti:

      • Ogni client deve connettersi usando un client software VPN.
      • Il client VPN non passa le richieste di risoluzione dei nomi alla rete virtuale, quindi per comunicare con Kafka è necessario usare gli indirizzi IP. La comunicazione tramite IP richiede una configurazione aggiuntiva nel cluster Kafka.

Per altre informazioni sull'uso di HDInsight in una rete virtuale, vedere Pianificare una rete virtuale per i cluster Azure HDInsight.

Connessione ad Apache Kafka da una rete locale

Per creare un cluster Kafka che comunichi con la rete locale, seguire i passaggi descritti nel documento Connect HDInsight to your on-premises network (Connettere HDInsight alla rete locale).

Importante

Quando si crea il cluster HDInsight, selezionare il tipo di cluster Kafka.

Questi passaggi creano la configurazione seguente:

  • Rete virtuale di Azure
  • Gateway VPN da sito a sito
  • Account di archiviazione di Azure, usato da HDInsight
  • Kafka in HDInsight

Per verificare che un client Kafka riesce a connettersi al cluster da locale, usare la procedura descritta nella sezione Esempio: client Python.

Connessione ad Apache Kafka con un client VPN

Usare la procedura descritta in questa sezione per creare la configurazione seguente:

  • Rete virtuale di Azure
  • Gateway VPN da punto a sito
  • Account di archiviazione di Azure, usato da HDInsight
  • Kafka in HDInsight
  1. Seguire la procedura nel documento Usare i certificati autofirmati per le connessioni da punto a sito. Questo documento consente di creare i certificati necessari per il gateway.

  2. Aprire un prompt di PowerShell e usare il codice seguente per accedere alla sottoscrizione di Azure:

    Connect-AzAccount
    # If you have multiple subscriptions, uncomment to set the subscription
    #Select-AzSubscription -SubscriptionName "name of your subscription"
    
  3. Usare il codice seguente per creare le variabili che contengono le informazioni di configurazione:

    # Prompt for generic information
    $resourceGroupName = Read-Host "What is the resource group name?"
    $baseName = Read-Host "What is the base name? It is used to create names for resources, such as 'net-basename' and 'kafka-basename':"
    $location = Read-Host "What Azure Region do you want to create the resources in?"
    $rootCert = Read-Host "What is the file path to the root certificate? It is used to secure the VPN gateway."
    
    # Prompt for HDInsight credentials
    $adminCreds = Get-Credential -Message "Enter the HTTPS user name and password for the HDInsight cluster" -UserName "admin"
    $sshCreds = Get-Credential -Message "Enter the SSH user name and password for the HDInsight cluster" -UserName "sshuser"
    
    # Names for Azure resources
    $networkName = "net-$baseName"
    $clusterName = "kafka-$baseName"
    $storageName = "store$baseName" # Can't use dashes in storage names
    $defaultContainerName = $clusterName
    $defaultSubnetName = "default"
    $gatewaySubnetName = "GatewaySubnet"
    $gatewayPublicIpName = "GatewayIp"
    $gatewayIpConfigName = "GatewayConfig"
    $vpnRootCertName = "rootcert"
    $vpnName = "VPNGateway"
    
    # Network settings
    $networkAddressPrefix = "10.0.0.0/16"
    $defaultSubnetPrefix = "10.0.0.0/24"
    $gatewaySubnetPrefix = "10.0.1.0/24"
    $vpnClientAddressPool = "172.16.201.0/24"
    
    # HDInsight settings
    $hdiWorkerNodes = 4
    $hdiVersion = "3.6"
    $hdiType = "Kafka"
    
  4. Usare il codice seguente per creare il gruppo di risorse di Azure e la rete virtuale:

    # Create the resource group that contains everything
    New-AzResourceGroup -Name $resourceGroupName -Location $location
    
    # Create the subnet configuration
    $defaultSubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -AddressPrefix $defaultSubnetPrefix
    $gatewaySubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -AddressPrefix $gatewaySubnetPrefix
    
    # Create the subnet
    New-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AddressPrefix $networkAddressPrefix `
        -Subnet $defaultSubnetConfig, $gatewaySubnetConfig
    
    # Get the network & subnet that were created
    $network = Get-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName
    $gatewaySubnet = Get-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -VirtualNetwork $network
    $defaultSubnet = Get-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -VirtualNetwork $network
    
    # Set a dynamic public IP address for the gateway subnet
    $gatewayPublicIp = New-AzPublicIpAddress -Name $gatewayPublicIpName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AllocationMethod Dynamic
    $gatewayIpConfig = New-AzVirtualNetworkGatewayIpConfig -Name $gatewayIpConfigName `
        -Subnet $gatewaySubnet `
        -PublicIpAddress $gatewayPublicIp
    
    # Get the certificate info
    # Get the full path in case a relative path was passed
    $rootCertFile = Get-ChildItem $rootCert
    $cert = New-Object System.Security.Cryptography.X509Certificates.X509Certificate2($rootCertFile)
    $certBase64 = [System.Convert]::ToBase64String($cert.RawData)
    $p2sRootCert = New-AzVpnClientRootCertificate -Name $vpnRootCertName `
        -PublicCertData $certBase64
    
    # Create the VPN gateway
    New-AzVirtualNetworkGateway -Name $vpnName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -IpConfigurations $gatewayIpConfig `
        -GatewayType Vpn `
        -VpnType RouteBased `
        -EnableBgp $false `
        -GatewaySku Standard `
        -VpnClientAddressPool $vpnClientAddressPool `
        -VpnClientRootCertificates $p2sRootCert
    

    Avviso

    Il completamento del processo richiede qualche minuto.

  5. Per creare un contenitore BLOB e l'account di archiviazione di Azure, usare il codice seguente:

    # Create the storage account
    New-AzStorageAccount `
        -ResourceGroupName $resourceGroupName `
        -Name $storageName `
        -SkuName Standard_GRS `
        -Location $location `
        -Kind StorageV2 `
        -EnableHttpsTrafficOnly 1
    
    # Get the storage account keys and create a context
    $defaultStorageKey = (Get-AzStorageAccountKey -ResourceGroupName $resourceGroupName `
        -Name $storageName)[0].Value
    $storageContext = New-AzStorageContext -StorageAccountName $storageName `
        -StorageAccountKey $defaultStorageKey
    
    # Create the default storage container
    New-AzStorageContainer -Name $defaultContainerName `
        -Context $storageContext
    
  6. Per creare il cluster HDInsight usare il codice seguente:

    # Create the HDInsight cluster
    New-AzHDInsightCluster `
        -ResourceGroupName $resourceGroupName `
        -ClusterName $clusterName `
        -Location $location `
        -ClusterSizeInNodes $hdiWorkerNodes `
        -ClusterType $hdiType `
        -OSType Linux `
        -Version $hdiVersion `
        -HttpCredential $adminCreds `
        -SshCredential $sshCreds `
        -DefaultStorageAccountName "$storageName.blob.core.windows.net" `
        -DefaultStorageAccountKey $defaultStorageKey `
        -DefaultStorageContainer $defaultContainerName `
        -DisksPerWorkerNode 2 `
        -VirtualNetworkId $network.Id `
        -SubnetName $defaultSubnet.Id
    

    Avviso

    Il completamento di questo processo richiede circa 15 minuti.

Configurare Kafka per la pubblicità IP

Per impostazione predefinita, Apache ZooKeeper restituisce il nome di dominio dei broker di Kafka ai client. Questa configurazione non funziona con il client software VPN, poiché non può usare la risoluzione dei nomi per le entità nella rete virtuale. Per questa configurazione usare la procedura seguente per configurare Kafka in HDInsight affinché possa creare pubblicità per gli indirizzi IP anziché per i nomi di dominio:

  1. Usando un Web browser, passare a https://CLUSTERNAME.azurehdinsight.net. Sostituire CLUSTERNAME con il nome del cluster Kafka in HDInsight.

    Quando richiesto, usare il nome utente HTTPS e la password per il cluster. Viene visualizzata l'interfaccia utente di Ambari Web per il cluster.

  2. Per visualizzare le informazioni su Kafka, selezionare Kafka dall'elenco a sinistra.

    Service list with Kafka highlighted.

  3. Per visualizzare la configurazione di Kafka, selezionare Configs (Configurazioni) nella parte centrale in alto.

    Apache Ambari services configuration.

  4. Per trovare la configurazione kafka-env, immettere kafka-env nel campo Filtro in alto a destra.

    Kafka configuration, for kafka-env.

  5. Per configurare Kafka affinché generi pubblicità per gli indirizzi IP, aggiungere il testo seguente nella parte inferiore del campo kafka-env-template:

    # Configure Kafka to advertise IP addresses instead of FQDN
    IP_ADDRESS=$(hostname -i)
    echo advertised.listeners=$IP_ADDRESS
    sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
    echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
    
  6. Per configurare l'interfaccia per cui è in ascolto Kafka immettere listeners nel campo Filtro in alto a destra.

  7. Per configurare tutte le interfacce di rete per cui Kafka è in ascolto, modificare il valore nel campo Listener su PLAINTEXT://0.0.0.0:9092.

  8. Per salvare le modifiche alla configurazione usare il pulsante Salva. Immettere un messaggio di testo che descrive le modifiche. Selezionare OK dopo aver salvato le modifiche.

    Apache Ambari save configuration.

  9. Per evitare errori al riavvio di Kafka, usare il pulsante Service Actions (Azioni del servizio) e selezionare Attiva modalità di manutenzione. Per completare questa operazione selezionare OK.

    Service actions, with turn on maintenance highlighted.

  10. Per riavviare Kafka, utilizzare il pulsante Riavvia e selezionare Restart All Affected (Riavviare tutti gli elementi interessati). Confermare il riavvio, quindi usare il pulsante OK dopo aver completato l'operazione.

    Restart button with restart all affected highlighted.

  11. Per disabilitare la modalità di manutenzione, usare il pulsante Service Actions (Azioni del servizio) e selezionare Disattiva modalità di manutenzione. Per completare questa operazione selezionare OK.

Connettersi al gateway VPN

Per connettersi al gateway VPN, usare la sezione Connettersi ad Azure del documento Configurare una connessione da punto a sito.

Esempio: client Python

Per convalidare la connettività a Kafka, usare la procedura seguente per creare ed eseguire un produttore e un utente di Python:

  1. Per recuperare il nome di dominio completo e gli indirizzi IP dei nodi nel cluster Kafka, usare uno dei metodi seguenti:

    $resourceGroupName = "The resource group that contains the virtual network used with HDInsight"
    
    $clusterNICs = Get-AzNetworkInterface -ResourceGroupName $resourceGroupName | where-object {$_.Name -like "*node*"}
    
    $nodes = @()
    foreach($nic in $clusterNICs) {
        $node = new-object System.Object
        $node | add-member -MemberType NoteProperty -name "Type" -value $nic.Name.Split('-')[1]
        $node | add-member -MemberType NoteProperty -name "InternalIP" -value $nic.IpConfigurations.PrivateIpAddress
        $node | add-member -MemberType NoteProperty -name "InternalFQDN" -value $nic.DnsSettings.InternalFqdn
        $nodes += $node
    }
    $nodes | sort-object Type
    
    az network nic list --resource-group <resourcegroupname> --output table --query "[?contains(name,'node')].{NICname:name,InternalIP:ipConfigurations[0].privateIpAddress,InternalFQDN:dnsSettings.internalFqdn}"
    

    Questo script presuppone che $resourceGroupName sia il nome del gruppo di risorse di Azure che contiene la rete virtuale.

    Salvare le informazioni restituite da usare nei passaggi successivi.

  2. Per installare il client kafka-python usare il codice seguente:

    pip install kafka-python
    
  3. Per inviare dati a Kafka, usare il seguente codice Python:

    from kafka import KafkaProducer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # NOTE: you don't need the full list of worker nodes, just one or two.
    producer = KafkaProducer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'])
    for _ in range(50):
       producer.send('testtopic', b'test message')
    

    Sostituire le voci 'kafka_broker' con gli indirizzi restituiti dal passaggio 1 in questa sezione:

    • Se si usa un client VPN del software, sostituire le voci kafka_broker con l'indirizzo IP dei nodi di lavoro.

    • Se è abilitata la risoluzione dei nomi tramite un server DNS personalizzato, sostituire le voci kafka_broker con il nome di dominio completo dei nodi di lavoro.

      Nota

      Questo codice invia la stringa test message all'argomento testtopic. La configurazione predefinita di Kafka in HDInsight non consiste nel creare l'argomento se non esiste. Vedere Come configurare Apache Kafka in HDInsight per creare automaticamente argomenti. In alternativa, è possibile creare argomenti manualmente prima di produrre messaggi.

  4. Per recuperare i messaggi da Kafka, usare il seguente codice Python:

    from kafka import KafkaConsumer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # Again, you only need one or two, not the full list.
    # Note: auto_offset_reset='earliest' resets the starting offset to the beginning
    #       of the topic
    consumer = KafkaConsumer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'],auto_offset_reset='earliest')
    consumer.subscribe(['testtopic'])
    for msg in consumer:
      print (msg)
    

    Sostituire le voci 'kafka_broker' con gli indirizzi restituiti dal passaggio 1 in questa sezione:

    • Se si usa un client VPN del software, sostituire le voci kafka_broker con l'indirizzo IP dei nodi di lavoro.

    • Se è abilitata la risoluzione dei nomi tramite un server DNS personalizzato, sostituire le voci kafka_broker con il nome di dominio completo dei nodi di lavoro.

Passaggi successivi

Per altre informazioni sull'uso di HDInsight con una rete virtuale, vedere il documento Pianificare una distribuzione di rete virtuale per cluster Azure HDInsight.

Per altre informazioni sulla creazione di una rete virtuale di Azure con gateway VPN da punto a sito, vedere i seguenti documenti:

Per altre informazioni sull'uso della gestione di Apache Kafka in HDInsight, vedere i documenti seguenti: