Interagera med Apache Kafka-kluster i Azure HDInsight med hjälp av en REST-proxy
Med Kafka REST Proxy kan du interagera med ditt Kafka-kluster via ett REST API via HTTPS. Den här åtgärden innebär att dina Kafka-klienter kan finnas utanför ditt virtuella nätverk. Klienter kan göra enkla, säkra HTTPS-anrop till Kafka-klustret i stället för att förlita sig på Kafka-bibliotek. Den här artikeln visar hur du skapar ett REST-proxyaktiverat Kafka-kluster. Innehåller också en exempelkod som visar hur du gör anrop till REST-proxy.
Referens för REST-API
Åtgärder som stöds av Kafka REST API finns i REFERENS för HDInsight Kafka REST Proxy API.
Bakgrund
Fullständig specifikation av åtgärder som stöds av API:et finns i Apache Kafka REST Proxy API.
REST Proxy-slutpunkt
När du skapar ett HDInsight Kafka-kluster med REST-proxy skapas en ny offentlig slutpunkt för klustret, som du hittar i hdinsight-klusteregenskaperna på Azure-portalen.
Säkerhet
Åtkomst till Kafka REST-proxyn som hanteras med Microsoft Entra-säkerhetsgrupper. När du skapar Kafka-klustret anger du säkerhetsgruppen Microsoft Entra med REST-slutpunktsåtkomst. Kafka-klienter som behöver åtkomst till REST-proxyn bör registreras i den här gruppen av gruppägaren. Gruppägaren kan registrera sig via portalen eller via PowerShell.
För REST-proxyslutpunktsbegäranden bör klientprogram hämta en OAuth-token. Token används för att verifiera medlemskap i säkerhetsgrupper. Leta upp ett klientprogramexempel som visar hur du hämtar en OAuth-token. Klientprogrammet skickar OAuth-token i HTTPS-begäran till REST-proxyn.
Kommentar
Mer information om Microsoft Entra-säkerhetsgrupper finns i Hantera app- och resursåtkomst med Hjälp av Microsoft Entra-grupper. Mer information om hur OAuth-token fungerar finns i Auktorisera åtkomst till Microsoft Entra-webbprogram med hjälp av OAuth 2.0-kodbidragsflödet.
Kafka REST-proxy med nätverkssäkerhetsgrupper
Om du tar med ditt eget virtuella nätverk och styr nätverkstrafiken med nätverkssäkerhetsgrupper tillåter du inkommande trafik på port 9400 utöver port 443. Detta säkerställer att Kafka REST-proxyservern kan nås.
Förutsättningar
Registrera ett program med Microsoft Entra ID. De klientprogram som du skriver för att interagera med Kafka REST-proxyn använder programmets ID och hemlighet för att autentisera till Azure.
Skapa en Microsoft Entra-säkerhetsgrupp. Lägg till programmet som du har registrerat med Microsoft Entra-ID i säkerhetsgruppen som medlem i gruppen. Den här säkerhetsgruppen används för att styra vilka program som tillåts interagera med REST-proxyn. Mer information om hur du skapar Microsoft Entra-grupper finns i Skapa en grundläggande grupp och lägga till medlemmar med hjälp av Microsoft Entra-ID.
Kontrollera att gruppen är av typen Säkerhet.
Verifiera att programmet är medlem i Grupp.
Skapa ett Kafka-kluster med REST-proxy aktiverat
Stegen använder Azure-portalen. Ett exempel på hur du använder Azure CLI finns i Skapa Apache Kafka REST-proxykluster med Azure CLI.
Under arbetsflödet för att skapa Kafka-kluster går du till fliken Säkerhet + nätverk och markerar alternativet Aktivera Kafka REST-proxy .
Klicka på Välj säkerhetsgrupp. I listan över säkerhetsgrupper väljer du den säkerhetsgrupp som du vill ha åtkomst till REST-proxyn. Du kan använda sökrutan för att hitta rätt säkerhetsgrupp. Klicka på knappen Välj längst ned.
Slutför de återstående stegen för att skapa klustret enligt beskrivningen i Skapa Apache Kafka-kluster i Azure HDInsight med Hjälp av Azure-portalen.
När klustret har skapats går du till klusteregenskaperna för att registrera Kafka REST-proxy-URL:en.
Exempel på klientprogram
Du kan använda Python-koden för att interagera med REST-proxyn i ditt Kafka-kluster. Följ dessa steg om du vill använda kodexemplet:
Spara exempelkoden på en dator med Python installerat.
Installera nödvändiga Python-beroenden genom att
pip3 install msal
köra .Ändra kodavsnittet Konfigurera dessa egenskaper och uppdatera följande egenskaper för din miljö:
Från kommandoraden kör du Python-filen genom att köra
sudo python3 <filename.py>
Den här koden utför följande åtgärd:
- Hämtar en OAuth-token från Microsoft Entra-ID.
- Visar hur du gör en begäran till Kafka REST-proxyn.
Mer information om hur du hämtar OAuth-token i Python finns i Klassen Python AuthenticationContext. Du kan se en fördröjning när topics
den inte skapas eller tas bort via Kafka REST-proxyn återspeglas där. Den här fördröjningen beror på cacheuppdatering. Värdefältet för producent-API:et har förbättrats. Nu accepterar den JSON-objekt och alla serialiserade formulär.
#Required Python packages
#pip3 install msal
import json
import msal
import random
import requests
import string
import sys
import time
def get_random_string():
letters = string.ascii_letters
random_string = ''.join(random.choice(letters) for i in range(7))
return random_string
#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'aaaabbbb-0000-cccc-1111-dddd2222eeee'
# Your Client Application Id
client_id = '00001111-aaaa-2222-bbbb-3333cccc4444'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#
# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id
app = msal.ConfidentialClientApplication(
client_id , client_secret, authority,
#cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)
# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10
# Print access token
print("Access token: " + accessToken)
# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}' # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'
# Request header
headers = {
'Authorization': 'Bearer ' + accessToken,
'Content-type': 'application/json' # set Content-type to 'application/json'
}
# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")
topics = []
# Create a new topic
# Example of topic config
topic_config = {
"partition_count": 1,
"replication_factor": 1,
"topic_properties": {
"retention.ms": 604800000,
"min.insync.replicas": "1"
}
}
create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)
if response.ok:
while new_topic not in topics:
print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
time.sleep(30)
# List Topic
get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)
response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
topic_list = response.json()
topics = topic_list.get("topics", [])
else:
print("Topic " + new_topic + " was created. Exit.")
sys.exit(1)
# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
"records": [
{
"key": "key1",
"value": "**********" # A string
},
{
"partition": 0,
"value": 5 # An integer
},
{
"value": 3.14 # A floating number
},
{
"value": { # A JSON object
"id": 1,
"name": "HDInsight Kafka REST proxy"
}
},
{
"value": [ # A list of JSON objects
{
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
{
"id": 2,
"name": "HDInsight Kafka REST proxy 2"
},
{
"id": 3,
"name": "HDInsight Kafka REST proxy 3"
}
]
},
{
"value": { # A nested JSON object
"group id": 1,
"HDI Kafka REST": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
"HDI Kafka REST server info": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1",
"servers": [
{
"server id": 1,
"server name": "HDInsight Kafka REST proxy server 1"
},
{
"server id": 2,
"server name": "HDInsight Kafka REST proxy server 2"
},
{
"server id": 3,
"server name": "HDInsight Kafka REST proxy server 3"
}
]
}
}
}
]
}
print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)
# Consume messages from the topic
partition_id = 0
offset = 0
count = 2
while True:
consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
print("Consuming " + str(count) + " messages from offset " + str(offset))
response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)
if response.ok:
messages = response.json()
print("Consumed messages: \n" + json.dumps(messages, indent=2))
next_offset = response.headers.get("NextOffset")
if offset == next_offset or not messages.get("records", []):
print("Consumer caught up with producer. Exit for now...")
break
offset = next_offset
else:
print("Error " + str(response.status_code))
break
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from " + get_partitions_url)
response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))
# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from " + get_partition_url)
response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))
Nedan hittar du ett annat exempel på hur du hämtar en token från Azure for REST-proxyn med hjälp av ett curl-kommando. Observera att vi behöver den scope=https://hib.azurehdinsight.net/.default
angivna när du hämtar en token.
curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'