Interactie met Apache Kafka-clusters in Azure HDInsight met behulp van een REST-proxy

Met de Kafka REST-proxy kunt u communiceren met uw Kafka-cluster via een REST API via HTTPS. Deze actie betekent dat uw Kafka-clients zich buiten uw virtuele netwerk kunnen bevinden. Clients kunnen eenvoudige, beveiligde HTTPS-aanroepen naar het Kafka-cluster maken in plaats van te vertrouwen op Kafka-bibliotheken. In dit artikel leest u hoe u een Kafka-cluster met REST-proxy maakt. Biedt ook een voorbeeldcode die laat zien hoe u aanroepen naar DE REST-proxy uitvoert.

Naslaginformatie over REST API

Zie voor bewerkingen die worden ondersteund door de Kafka REST API HDInsight Kafka REST Proxy-API.

Achtergrond

Kafka REST proxy design.

Zie de Apache Kafka REST Proxy-API voor de volledige specificatie van bewerkingen die worden ondersteund door de API.

REST Proxy-eindpunt

Als u een HDInsight Kafka-cluster met REST-proxy maakt, maakt u een nieuw openbaar eindpunt voor uw cluster, dat u kunt vinden in de eigenschappen van uw HDInsight-cluster in Azure Portal.

Beveiliging

Toegang tot de Kafka REST-proxy die wordt beheerd met Microsoft Entra-beveiligingsgroepen. Geef bij het maken van het Kafka-cluster de Microsoft Entra-beveiligingsgroep op met REST-eindpunttoegang. Kafka-clients die toegang nodig hebben tot de REST-proxy, moeten worden geregistreerd bij deze groep door de groepseigenaar. De groepseigenaar kan zich registreren via de portal of via PowerShell.

Voor REST-proxy-eindpuntaanvragen moeten clienttoepassingen een OAuth-token ophalen. Het token gebruikt om het lidmaatschap van de beveiligingsgroep te controleren. Een voorbeeld van een clienttoepassing zoeken laat zien hoe u een OAuth-token kunt ophalen. De clienttoepassing geeft het OAuth-token in de HTTPS-aanvraag door aan de REST-proxy.

Notitie

Zie App - en resourcetoegang beheren met Behulp van Microsoft Entra-groepen voor meer informatie over Microsoft Entra-beveiligingsgroepen. Zie Toegang tot Microsoft Entra-webtoepassingen autoriseren met behulp van de OAuth 2.0-codetoekenningenstroom voor meer informatie over de werking van OAuth-tokens.

Kafka REST-proxy met netwerkbeveiligingsgroepen

Als u uw eigen VNet gebruikt en netwerkverkeer met netwerkbeveiligingsgroepen controleert, staat u inkomend verkeer op poort 9400 toe naast poort 443. Dit zorgt ervoor dat de Kafka REST-proxyserver bereikbaar is.

Vereisten

  1. Een toepassing registreren bij Microsoft Entra ID. De clienttoepassingen die u schrijft om te communiceren met de Kafka REST-proxy, gebruiken de id en het geheim van deze toepassing om te verifiëren bij Azure.

  2. Maak een Microsoft Entra-beveiligingsgroep. Voeg de toepassing die u hebt geregistreerd bij Microsoft Entra ID toe aan de beveiligingsgroep als lid van de groep. Deze beveiligingsgroep wordt gebruikt om te bepalen welke toepassingen interactie met de REST-proxy toestaan. Zie Een basisgroep maken en leden toevoegen met behulp van Microsoft Entra-id voor meer informatie over het maken van Microsoft Entra-groepen.

    Controleer of de groep van het type Beveiliging is. Security Group.

    Controleer of de toepassing lid is van de groep. Check Membership.

Een Kafka-cluster maken waarvoor REST-proxy is ingeschakeld

In de stappen wordt Azure Portal gebruikt. Zie Apache Kafka REST-proxycluster maken met behulp van Azure CLI voor een voorbeeld van het gebruik van Azure CLI.

  1. Schakel tijdens het maken van het Kafka-cluster op het tabblad Beveiliging en netwerken de optie Kafka REST-proxy inschakelen in.

    Screenshot shows the Create HDInsight cluster page with Security + networking selected.

  2. Klik op Beveiligingsgroep selecteren. Selecteer in de lijst met beveiligingsgroepen de beveiligingsgroep waartoe u toegang wilt hebben tot de REST-proxy. U kunt het zoekvak gebruiken om de juiste beveiligingsgroep te vinden. Klik op de knop Selecteren onderaan.

    Screenshot shows the Create HDInsight cluster page with the option to select a security group.

  3. Voer de resterende stappen uit om uw cluster te maken, zoals beschreven in Een Apache Kafka-cluster maken in Azure HDInsight met behulp van Azure Portal.

  4. Zodra het cluster is gemaakt, gaat u naar de clustereigenschappen om de Kafka REST-proxy-URL vast te leggen.

    view REST proxy URL.

Voorbeeld van clienttoepassing

U kunt de Python-code gebruiken om te communiceren met de REST-proxy in uw Kafka-cluster. Voer de volgende stappen uit om het codevoorbeeld te gebruiken:

  1. Sla de voorbeeldcode op een computer op waarop Python is geïnstalleerd.

  2. Installeer vereiste Python-afhankelijkheden door uit te pip3 install msalvoeren.

  3. Wijzig de codesectie Configureer deze eigenschappen en werk de volgende eigenschappen voor uw omgeving bij:

    Eigenschappen Beschrijving
    Tenant-id De Azure-tenant waar uw abonnement zich bevindt.
    Client ID De id voor de toepassing die u hebt geregistreerd in de beveiligingsgroep.
    Clientgeheim Het geheim voor de toepassing die u hebt geregistreerd in de beveiligingsgroep.
    Kafkarest_endpoint Haal deze waarde op via het tabblad Eigenschappen in het clusteroverzicht, zoals beschreven in de implementatiesectie. Deze moet de volgende indeling hebben: https://<clustername>-kafkarest.azurehdinsight.net
  4. Voer vanaf de opdrachtregel het Python-bestand uit door het uit te voeren sudo python3 <filename.py>

Deze code voert de volgende actie uit:

  1. Hiermee wordt een OAuth-token opgehaald uit Microsoft Entra-id.
  2. Laat zien hoe u een aanvraag indient bij de Kafka REST-proxy.

Zie de python AuthenticationContext-klasse voor meer informatie over het ophalen van OAuth-tokens in Python. Mogelijk ziet u een vertraging terwijl topics deze niet is gemaakt of verwijderd via de Kafka REST-proxy daar worden weergegeven. Deze vertraging wordt veroorzaakt door het vernieuwen van de cache. Het waardeveld van de Producer-API is uitgebreid. Nu accepteert het JSON-objecten en een geserialiseerd formulier.

#Required Python packages
#pip3 install msal

import json
import msal
import random
import requests
import string
import sys
import time

def get_random_string():
    letters = string.ascii_letters
    random_string = ''.join(random.choice(letters) for i in range(7))

    return random_string


#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'ABCDEFGH-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Application Id
client_id = 'XYZABCDE-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#

# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id

app = msal.ConfidentialClientApplication(
    client_id , client_secret, authority,
    #cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)

# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10

# Print access token
print("Access token: " + accessToken)

# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}'  # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'

# Request header
headers = {
    'Authorization': 'Bearer ' + accessToken,
    'Content-type': 'application/json'          # set Content-type to 'application/json'
}

# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")

topics = []

# Create a  new topic
# Example of topic config
topic_config = {
    "partition_count": 1,
    "replication_factor": 1,
    "topic_properties": {
        "retention.ms": 604800000,
        "min.insync.replicas": "1"
    }
}

create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)

if response.ok:
    while new_topic not in topics:
        print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
        time.sleep(30)
        # List Topic
        get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)

        response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
        topic_list = response.json()
        topics = topic_list.get("topics", [])
else:
    print("Topic " + new_topic + " was created. Exit.")
    sys.exit(1)

# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
    "records": [
        {
            "key": "key1",
            "value": "**********"         # A string                              
        },
        {
            "partition": 0,
            "value": 5                    # An integer
        },
        {
            "value": 3.14                 # A floating number
        },
        {
            "value": {                    # A JSON object
                "id": 1,
                "name": "HDInsight Kafka REST proxy"
            }
        },
        {
            "value": [                    # A list of JSON objects
                {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                {
                    "id": 2,
                    "name": "HDInsight Kafka REST proxy 2"
                },
                {
                    "id": 3,
                    "name": "HDInsight Kafka REST proxy 3"
                }
            ]
        },
        {
            "value": {                  # A nested JSON object
                "group id": 1,
                "HDI Kafka REST": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                "HDI Kafka REST server info": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1",
                    "servers": [
                        {
                            "server id": 1,
                            "server name": "HDInsight Kafka REST proxy server 1"
                        },
                        {
                            "server id": 2,
                            "server name": "HDInsight Kafka REST proxy server 2"
                        },
                        {
                            "server id": 3,
                            "server name": "HDInsight Kafka REST proxy server 3"
                        }
                    ]
                }
            }
        }
    ]
}

print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)

# Consume messages from the topic
partition_id = 0
offset = 0
count = 2

while True:
    consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
    print("Consuming " + str(count) + " messages from offset " + str(offset))

    response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)

    if response.ok:
        messages = response.json()
        print("Consumed messages: \n" + json.dumps(messages, indent=2))
        next_offset = response.headers.get("NextOffset")
        if offset == next_offset or not messages.get("records", []):
            print("Consumer caught up with producer. Exit for now...")
            break

        offset = next_offset

    else:
        print("Error " + str(response.status_code))
        break
        
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from  " + get_partitions_url)

response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))

# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from  " + get_partition_url)

response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))

Hieronder vindt u een ander voorbeeld over het ophalen van een token uit Azure voor REST-proxy met behulp van een curl-opdracht. U ziet dat we de scope=https://hib.azurehdinsight.net/.default opgegeven gegevens nodig hebben tijdens het ophalen van een token.

curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'

Volgende stappen