Udostępnij za pośrednictwem


Interakcja z klastrami platformy Apache Kafka w usłudze Azure HDInsight przy użyciu serwera proxy REST

Serwer proxy REST platformy Kafka umożliwia interakcję z klastrem Platformy Kafka za pośrednictwem interfejsu API REST za pośrednictwem protokołu HTTPS. Ta akcja oznacza, że klienci platformy Kafka mogą znajdować się poza siecią wirtualną. Klienci mogą tworzyć proste, bezpieczne wywołania HTTPS do klastra Kafka zamiast polegać na bibliotekach platformy Kafka. W tym artykule pokazano, jak utworzyć klaster platformy Kafka z włączonym serwerem proxy REST. Zawiera również przykładowy kod, który pokazuje, jak wykonywać wywołania serwera proxy REST.

Dokumentacja referencyjna interfejsu API REST

Aby uzyskać informacje o operacjach obsługiwanych przez interfejs API REST platformy Kafka, zobacz Dokumentacja interfejsu REST serwera proxy Kafka w usłudze HDInsight.

Kontekst

projekt serwera proxy REST platformy Kafka.

Aby uzyskać pełną specyfikację operacji obsługiwanych przez interfejs API, zobacz interfejs API REST Apache Kafka.

Punkt końcowy serwera proxy REST

Tworzenie klastra platformy Kafka usługi HDInsight za pomocą serwera proxy REST powoduje utworzenie nowego publicznego punktu końcowego dla klastra, który można znaleźć w klastrze usługi HDInsight właściwości w witrynie Azure Portal.

Bezpieczeństwo

Dostęp do serwera proxy REST platformy Kafka zarządzanego za pomocą grup zabezpieczeń firmy Microsoft Entra. Podczas tworzenia klastra platformy Kafka należy zapewnić grupie zabezpieczeń firmy Microsoft Entra dostęp do punktu końcowego REST. Klienci platformy Kafka, którzy potrzebują dostępu do serwera proxy REST, powinni być zarejestrowani w tej grupie przez właściciela grupy. Właściciel grupy może zarejestrować się za pośrednictwem portalu lub za pośrednictwem programu PowerShell.

W przypadku żądań punktu końcowego serwera proxy REST aplikacje klienckie powinny uzyskać token OAuth. Token używany do weryfikacji członkostwa w grupie zabezpieczeń. Znajdź przykładową aplikację klienta pokazano, jak uzyskać token OAuth. Aplikacja kliencka przekazuje token OAuth w żądaniu HTTPS do serwera proxy REST.

Uwaga

Zobacz Zarządzanie dostępem do aplikacji i zasobów przy użyciu grup firmy Microsoft Entra, aby dowiedzieć się więcej o grupach zabezpieczeń firmy Microsoft Entra. Aby uzyskać więcej informacji na temat sposobu działania tokenów OAuth, zobacz Autoryzuj dostęp do aplikacji internetowych Microsoft Entra przy użyciu przepływu kodu autoryzacyjnego OAuth 2.0.

Serwer proxy REST platformy Kafka z sieciowymi grupami zabezpieczeń

Jeśli używasz własnej sieci wirtualnej i kontrolujesz ruch sieciowy za pomocą sieciowych grup zabezpieczeń, zezwól na ruch przychodzący na port 9400 oprócz portu 443. Dzięki temu serwer proxy REST platformy Kafka jest osiągalny.

Wymagania wstępne

  1. Zarejestruj aplikację przy użyciu identyfikatora Entra firmy Microsoft. Aplikacje klienckie, które piszesz, aby komunikować się z serwerem proxy REST platformy Kafka, używają identyfikatora i tajnego klucza tej aplikacji do uwierzytelniania na platformie Azure.

  2. Utwórz grupę zabezpieczeń Entra firmy Microsoft. Dodaj aplikację zarejestrowaną w usłudze Microsoft Entra ID do grupy zabezpieczeń jako członek grupy. Ta grupa zabezpieczeń będzie używana do kontrolowania, które aplikacje umożliwiają interakcję z serwerem proxy REST. Aby uzyskać więcej informacji na temat tworzenia grup Entra firmy Microsoft, zobacz Stwórz podstawową grupę i dodaj członków przy użyciu Microsoft Entra ID.

    Sprawdź, czy grupa jest typu Security. Grupa Bezpieczeństwa.

    Sprawdź, czy aplikacja jest członkiem grupy. sprawdź członkostwo.

Tworzenie klastra platformy Kafka z włączonym serwerem proxy REST

W krokach jest używana witryna Azure Portal. Aby zapoznać się z przykładem użycia interfejsu wiersza polecenia platformy Azure, zobacz Create Apache Kafka REST proxy cluster using Azure CLI (Tworzenie klastra proxy REST platformy Apache Kafka przy użyciu interfejsu wiersza polecenia platformy Azure).

  1. Podczas przepływu pracy tworzenia klastra platformy Kafka na karcie zabezpieczenia i sieć sprawdź opcję Włącz serwer proxy REST platformy Kafka.

    Zrzut ekranu przedstawia stronę Tworzenie klastra usługi HDInsight z wybraną pozycją Zabezpieczenia + Sieć.

  2. Kliknij pozycję Wybierz grupę zabezpieczeń. Z listy grup zabezpieczeń wybierz grupę zabezpieczeń, do której chcesz mieć dostęp do serwera proxy REST. Możesz użyć pola wyszukiwania, aby znaleźć odpowiednią grupę zabezpieczeń. U dołu kliknij przycisk Wybierz.

    Zrzut ekranu przedstawia stronę Tworzenie klastra usługi HDInsight z opcją wybrania grupy zabezpieczeń.

  3. Wykonaj pozostałe kroki tworzenia klastra zgodnie z opisem w Tworzenie klastra Apache Kafka w usłudze Azure HDInsight przy użyciu witryny Azure Portal.

  4. Po utworzeniu klastra przejdź do właściwości klastra, aby zarejestrować adres URL serwera proxy REST platformy Kafka.

    wyświetlić adres URL serwera proxy REST.

Przykład aplikacji klienckiej

Kod języka Python umożliwia interakcję z serwerem proxy REST w klastrze platformy Kafka. Aby użyć przykładowego kodu, wykonaj następujące kroki:

  1. Zapisz przykładowy kod na maszynie z zainstalowanym językiem Python.

  2. Zainstaluj wymagane zależności języka Python, wykonując pip3 install msal.

  3. Zmodyfikuj sekcję kodu Skonfiguruj te właściwości i zaktualizuj następujące właściwości środowiska:

    Nieruchomość Opis
    Identyfikator dzierżawy Dzierżawca platformy Azure, w którym znajduje się Twoja subskrypcja.
    Identyfikator klienta Identyfikator aplikacji zarejestrowanej w grupie zabezpieczeń.
    Klucz tajny klienta Hasło (klucz lub sekret) dla aplikacji, którą zarejestrowano w grupie zabezpieczeń.
    Kafkarest_endpoint Pobierz tę wartość z karty właściwości w przeglądzie klastra, zgodnie z opisem w sekcji wdrażania . Powinien być w następującym formacie — https://<clustername>-kafkarest.azurehdinsight.net
  4. W wierszu polecenia wykonaj plik języka Python, wykonując sudo python3 <filename.py>

Ten kod wykonuje następującą akcję:

  1. Pobiera token OAuth z identyfikatora Entra firmy Microsoft.
  2. Pokazuje, jak wysłać żądanie do serwera proxy REST platformy Kafka.

Aby uzyskać więcej informacji na temat pobierania tokenów OAuth w języku Python, zobacz Python AuthenticationContext class. Może wystąpić opóźnienie w odzwierciedlaniu topics, które nie zostały utworzone ani usunięte za pośrednictwem serwera proxy REST platformy Kafka. To opóźnienie wynika z odświeżania pamięci podręcznej. Pole wartości w interfejsie API producenta zostało ulepszone. Teraz akceptuje obiekty JSON i dowolną serializowaną formę.

#Required Python packages
#pip3 install msal

import json
import msal
import random
import requests
import string
import sys
import time

def get_random_string():
    letters = string.ascii_letters
    random_string = ''.join(random.choice(letters) for i in range(7))

    return random_string


#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'aaaabbbb-0000-cccc-1111-dddd2222eeee'
# Your Client Application Id
client_id = '00001111-aaaa-2222-bbbb-3333cccc4444'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#

# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id

app = msal.ConfidentialClientApplication(
    client_id , client_secret, authority,
    #cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)

# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10

# Print access token
print("Access token: " + accessToken)

# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}'  # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'

# Request header
headers = {
    'Authorization': 'Bearer ' + accessToken,
    'Content-type': 'application/json'          # set Content-type to 'application/json'
}

# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")

topics = []

# Create a  new topic
# Example of topic config
topic_config = {
    "partition_count": 1,
    "replication_factor": 1,
    "topic_properties": {
        "retention.ms": 604800000,
        "min.insync.replicas": "1"
    }
}

create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)

if response.ok:
    while new_topic not in topics:
        print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
        time.sleep(30)
        # List Topic
        get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)

        response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
        topic_list = response.json()
        topics = topic_list.get("topics", [])
else:
    print("Topic " + new_topic + " was created. Exit.")
    sys.exit(1)

# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
    "records": [
        {
            "key": "key1",
            "value": "**********"         # A string                              
        },
        {
            "partition": 0,
            "value": 5                    # An integer
        },
        {
            "value": 3.14                 # A floating number
        },
        {
            "value": {                    # A JSON object
                "id": 1,
                "name": "HDInsight Kafka REST proxy"
            }
        },
        {
            "value": [                    # A list of JSON objects
                {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                {
                    "id": 2,
                    "name": "HDInsight Kafka REST proxy 2"
                },
                {
                    "id": 3,
                    "name": "HDInsight Kafka REST proxy 3"
                }
            ]
        },
        {
            "value": {                  # A nested JSON object
                "group id": 1,
                "HDI Kafka REST": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                "HDI Kafka REST server info": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1",
                    "servers": [
                        {
                            "server id": 1,
                            "server name": "HDInsight Kafka REST proxy server 1"
                        },
                        {
                            "server id": 2,
                            "server name": "HDInsight Kafka REST proxy server 2"
                        },
                        {
                            "server id": 3,
                            "server name": "HDInsight Kafka REST proxy server 3"
                        }
                    ]
                }
            }
        }
    ]
}

print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)

# Consume messages from the topic
partition_id = 0
offset = 0
count = 2

while True:
    consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
    print("Consuming " + str(count) + " messages from offset " + str(offset))

    response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)

    if response.ok:
        messages = response.json()
        print("Consumed messages: \n" + json.dumps(messages, indent=2))
        next_offset = response.headers.get("NextOffset")
        if offset == next_offset or not messages.get("records", []):
            print("Consumer caught up with producer. Exit for now...")
            break

        offset = next_offset

    else:
        print("Error " + str(response.status_code))
        break
        
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from  " + get_partitions_url)

response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))

# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from  " + get_partition_url)

response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))

Poniżej znajduje się kolejny przykład dotyczący pobierania tokenu z serwera proxy REST platformy Azure za pomocą polecenia curl. Zwróć uwagę, że potrzebujemy scope=https://hib.azurehdinsight.net/.default określonego podczas uzyskiwania tokenu.

curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'

Następne kroki