Interagire con cluster di Apache Kafka in Azure HDInsight usando un proxy REST

Il proxy REST Kafka consente di interagire con il cluster Kafka tramite un'API REST tramite HTTPS. Questa azione indica che i client Kafka possono trovarsi all'esterno della rete virtuale. I client possono effettuare chiamate HTTPS semplici e sicure al cluster Kafka, invece di basarsi sulle librerie Kafka. Questo articolo illustra come creare un cluster Kafka abilitato per il proxy REST. Fornisce inoltre un esempio di codice che illustra come effettuare chiamate al proxy REST.

Informazioni di riferimento sulle API REST

Per le operazioni supportate dall'API REST Kafka, vedere le informazioni di riferimento sull'API del proxy REST Kafka in HDInsight.

Background

Kafka REST proxy design.

Per la specifica completa delle operazioni supportate dall'API, vedere API del proxy REST Apache Kafka.

Endpoint proxy REST

Quando si crea un cluster Kafka HDInsight con il proxy REST, viene creato un nuovo endpoint pubblico per il cluster, che è disponibile nelle proprietà del cluster HDInsight nel portale di Azure.

Sicurezza

Accesso al proxy REST Kafka gestito con i gruppi di sicurezza Microsoft Entra. Quando si crea il cluster Kafka, fornire al gruppo di sicurezza Microsoft Entra l'accesso all'endpoint REST. I client Kafka che devono accedere al proxy REST devono essere registrati in questo gruppo dal proprietario del gruppo. Il proprietario del gruppo può eseguire la registrazione tramite il portale o PowerShell.

Per le richieste di endpoint proxy REST, le applicazioni client devono ottenere un token OAuth. Il token usa per verificare l'appartenenza al gruppo di sicurezza. Trovare un esempio di applicazione client illustra come ottenere un token OAuth. L'applicazione client passa il token OAuth nella richiesta HTTPS al proxy REST.

Nota

Per altre informazioni sui gruppi di sicurezza di Microsoft Entra, vedere Gestire l'accesso alle app e alle risorse usando i gruppi di Microsoft Entra. Per altre informazioni sul funzionamento dei token OAuth, vedere Autorizzare l'accesso alle applicazioni Web di Microsoft Entra usando il flusso di concessione del codice OAuth 2.0.

Proxy REST Kafka con gruppi di sicurezza di rete

Se si porta la propria rete virtuale e si controlla il traffico di rete con gruppi di sicurezza di rete, consentire il traffico in ingresso sulla porta 9400 oltre alla porta 443. In questo modo si garantisce che il server proxy REST Kafka sia raggiungibile.

Prerequisiti

  1. Registrare un'applicazione con Microsoft Entra ID. Le applicazioni client scritte per interagire con il proxy REST Kafka usano l'ID e il segreto dell'applicazione per l'autenticazione in Azure.

  2. Creare un gruppo di sicurezza Microsoft Entra. Aggiungere l'applicazione registrata con Microsoft Entra ID al gruppo di sicurezza come membro del gruppo. Questo gruppo di sicurezza verrà usato per controllare quali applicazioni consentono di interagire con il proxy REST. Per altre informazioni sulla creazione di gruppi di Microsoft Entra, vedere Creare un gruppo di base e aggiungere membri con Microsoft Entra ID.

    Verificare che il gruppo sia di tipo Sicurezza. Security Group.

    Verificare che l'applicazione sia membro del gruppo. Check Membership.

Creare un cluster Kafka con proxy REST abilitato

I passaggi usano il portale di Azure. Per un esempio in cui viene usata l'interfaccia della riga di comando di Azure, vedere Creare il cluster del proxy REST Apache Kafka con l'interfaccia della riga di comando di Azure.

  1. Durante il flusso di lavoro di creazione del cluster Kafka nella scheda Sicurezza + rete selezionare l'opzione Abilitare il proxy REST Kafka.

    Screenshot shows the Create HDInsight cluster page with Security + networking selected.

  2. Fare clic su Seleziona il gruppo di sicurezza. Nell'elenco dei gruppi di sicurezza selezionare il gruppo di sicurezza che deve accedere al proxy REST. È possibile usare la casella di ricerca per trovare il gruppo di sicurezza appropriato. Fare clic sul pulsante Seleziona in basso.

    Screenshot shows the Create HDInsight cluster page with the option to select a security group.

  3. Completare i passaggi rimanenti per creare il cluster come descritto in Creare il cluster Apache Kafka in Azure HDInsight con il portale di Azure.

  4. Dopo aver creato il cluster, passare alle proprietà del cluster per prendere nota dell'URL del proxy REST Kafka.

    view REST proxy URL.

Esempio di applicazione client

È possibile usare il codice Python per interagire con il proxy REST nel cluster Kafka. Per eseguire il codice di esempio, seguire questa procedura:

  1. Salvare il codice di esempio in un computer in cui è installato Python.

  2. Installare le dipendenze Python necessarie eseguendo pip3 install msal.

  3. Modificare la sezione di codice Configure these properties e aggiornare le proprietà seguenti per l'ambiente:

    Proprietà Descrizione
    ID tenant Tenant di Azure in cui si trova la sottoscrizione.
    Client ID ID dell'applicazione registrata nel gruppo di sicurezza.
    Segreto client Segreto per l'applicazione registrata nel gruppo di sicurezza.
    kafkarest_endpoint Questo valore è indicato nella scheda Proprietà della panoramica del cluster, come descritto nella sezione relativa alla distribuzione. Deve essere specificato nel formato seguente: https://<clustername>-kafkarest.azurehdinsight.net
  4. Dalla riga di comando eseguire il file Python eseguendo sudo python3 <filename.py>

Il codice esegue le azioni seguenti:

  1. Recupera un token OAuth dall'ID Microsoft Entra.
  2. Mostra come effettuare una richiesta al proxy REST Kafka.

Per altre informazioni sul recupero di token OAuth in Python, vedere Classe Python AuthenticationContext. È possibile che venga visualizzato un ritardo durante topics la creazione o l'eliminazione tramite il proxy REST Kafka. Questo ritardo è dovuto all'aggiornamento della cache. Il campo valore dell'API Producer è stato migliorato. Accetta ora oggetti JSON e qualsiasi modulo serializzato.

#Required Python packages
#pip3 install msal

import json
import msal
import random
import requests
import string
import sys
import time

def get_random_string():
    letters = string.ascii_letters
    random_string = ''.join(random.choice(letters) for i in range(7))

    return random_string


#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'ABCDEFGH-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Application Id
client_id = 'XYZABCDE-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#

# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id

app = msal.ConfidentialClientApplication(
    client_id , client_secret, authority,
    #cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)

# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10

# Print access token
print("Access token: " + accessToken)

# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}'  # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'

# Request header
headers = {
    'Authorization': 'Bearer ' + accessToken,
    'Content-type': 'application/json'          # set Content-type to 'application/json'
}

# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")

topics = []

# Create a  new topic
# Example of topic config
topic_config = {
    "partition_count": 1,
    "replication_factor": 1,
    "topic_properties": {
        "retention.ms": 604800000,
        "min.insync.replicas": "1"
    }
}

create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)

if response.ok:
    while new_topic not in topics:
        print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
        time.sleep(30)
        # List Topic
        get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)

        response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
        topic_list = response.json()
        topics = topic_list.get("topics", [])
else:
    print("Topic " + new_topic + " was created. Exit.")
    sys.exit(1)

# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
    "records": [
        {
            "key": "key1",
            "value": "**********"         # A string                              
        },
        {
            "partition": 0,
            "value": 5                    # An integer
        },
        {
            "value": 3.14                 # A floating number
        },
        {
            "value": {                    # A JSON object
                "id": 1,
                "name": "HDInsight Kafka REST proxy"
            }
        },
        {
            "value": [                    # A list of JSON objects
                {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                {
                    "id": 2,
                    "name": "HDInsight Kafka REST proxy 2"
                },
                {
                    "id": 3,
                    "name": "HDInsight Kafka REST proxy 3"
                }
            ]
        },
        {
            "value": {                  # A nested JSON object
                "group id": 1,
                "HDI Kafka REST": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                "HDI Kafka REST server info": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1",
                    "servers": [
                        {
                            "server id": 1,
                            "server name": "HDInsight Kafka REST proxy server 1"
                        },
                        {
                            "server id": 2,
                            "server name": "HDInsight Kafka REST proxy server 2"
                        },
                        {
                            "server id": 3,
                            "server name": "HDInsight Kafka REST proxy server 3"
                        }
                    ]
                }
            }
        }
    ]
}

print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)

# Consume messages from the topic
partition_id = 0
offset = 0
count = 2

while True:
    consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
    print("Consuming " + str(count) + " messages from offset " + str(offset))

    response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)

    if response.ok:
        messages = response.json()
        print("Consumed messages: \n" + json.dumps(messages, indent=2))
        next_offset = response.headers.get("NextOffset")
        if offset == next_offset or not messages.get("records", []):
            print("Consumer caught up with producer. Exit for now...")
            break

        offset = next_offset

    else:
        print("Error " + str(response.status_code))
        break
        
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from  " + get_partitions_url)

response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))

# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from  " + get_partition_url)

response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))

Ecco un altro esempio su come ottenere un token da Azure per il proxy REST usando un comando curl. Tenere presente che è necessario specificare scope=https://hib.azurehdinsight.net/.default durante il recupero di un token.

curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'

Passaggi successivi