Interakció Apache Kafka-fürtökkel az Azure HDInsightban REST-proxy használatával
A Kafka REST Proxy lehetővé teszi a Kafka-fürttel való interakciót EGY REST API-n keresztül HTTPS-en keresztül. Ez a művelet azt jelenti, hogy a Kafka-ügyfelek a virtuális hálózaton kívül is lehetnek. Az ügyfelek a Kafka-kódtárak helyett egyszerű, biztonságos HTTPS-hívásokat kezdeményezhetnek a Kafka-fürthöz. Ez a cikk bemutatja, hogyan hozhat létre REST-proxyval kompatibilis Kafka-fürtöt. Emellett egy mintakódot is biztosít, amely bemutatja, hogyan kezdeményezhet hívásokat a REST-proxy felé.
REST API-referencia
A Kafka REST API által támogatott műveletekről lásd : HDInsight Kafka REST Proxy API-referencia.
Háttér
Az API által támogatott műveletek teljes specifikációját lásd : Apache Kafka REST Proxy API.
REST proxyvégpont
A HDInsight Kafka-fürt REST-proxyval való létrehozása új nyilvános végpontot hoz létre a fürt számára, amelyet az Azure PortalON található HDInsight-fürt tulajdonságai között talál.
Biztonság
Hozzáférés a Microsoft Entra biztonsági csoportokkal felügyelt Kafka REST-proxyhoz. A Kafka-fürt létrehozásakor adja meg a Microsoft Entra biztonsági csoportot REST-végpont-hozzáféréssel. A REST-proxyhoz hozzáférést igénylő Kafka-ügyfeleket a csoport tulajdonosának kell regisztrálnia ebbe a csoportba. A csoport tulajdonosa regisztrálhat a portálon vagy a PowerShellen keresztül.
REST-proxyvégpont-kérelmek esetén az ügyfélalkalmazásoknak OAuth-jogkivonatot kell kapniuk. A jogkivonat a biztonsági csoporttagság ellenőrzésére használja. Az ügyfélalkalmazás mintájának megkeresése bemutatja, hogyan szerezhet be OAuth-jogkivonatot. Az ügyfélalkalmazás átadja az OAuth-jogkivonatot a HTTPS-kérelemben a REST-proxynak.
Feljegyzés
További információ a Microsoft Entra biztonsági csoportjairól: Alkalmazás- és erőforrás-hozzáférés kezelése a Microsoft Entra-csoportokkal. Az OAuth-jogkivonatok működésével kapcsolatos további információkért lásd : Hozzáférés engedélyezése Microsoft Entra-webalkalmazásokhoz az OAuth 2.0 kódhozzáadási folyamat használatával.
Kafka REST-proxy hálózati biztonsági csoportokkal
Ha saját virtuális hálózatot hoz létre, és hálózati biztonsági csoportokkal szabályozza a hálózati forgalmat, engedélyezze a bejövő forgalmat a 9400-s porton a 443-as porton kívül. Ez biztosítja, hogy a Kafka REST proxykiszolgáló elérhető legyen.
Előfeltételek
Alkalmazás regisztrálása a Microsoft Entra-azonosítóval. A Kafka REST-proxyval való interakcióhoz írt ügyfélalkalmazások az alkalmazás azonosítóját és titkos kódját használják az Azure-beli hitelesítéshez.
Hozzon létre egy Microsoft Entra biztonsági csoportot. Adja hozzá a Microsoft Entra-azonosítóval regisztrált alkalmazást a biztonsági csoporthoz a csoport tagjaként. Ezzel a biztonsági csoporttal szabályozható, hogy mely alkalmazások használhatják a REST-proxyt. További információ a Microsoft Entra-csoportok létrehozásáról: Alapszintű csoport létrehozása és tagok hozzáadása a Microsoft Entra-azonosító használatával.
Ellenőrizze, hogy a csoport biztonsági típusú-e.
Ellenőrizze, hogy az alkalmazás tagja-e a Csoportnak.
Kafka-fürt létrehozása REST-proxyval engedélyezve
A lépések az Azure Portalt használják. Az Azure CLI-t használó példa: Apache Kafka REST proxyfürt létrehozása az Azure CLI használatával.
A Kafka-fürtlétrehozási munkafolyamat során a Biztonság + hálózatkezelés lapon ellenőrizze a Kafka REST-proxy engedélyezése lehetőséget.
Kattintson a Biztonsági csoport kiválasztása gombra. A biztonsági csoportok listájában válassza ki azt a biztonsági csoportot, amelyhez hozzá szeretne férni a REST-proxyhoz. A keresőmezővel megkeresheti a megfelelő biztonsági csoportot. Kattintson a Kiválasztás gombra az alján.
Végezze el a fürt létrehozásának további lépéseit az Azure HDInsightban az Apache Kafka-fürt azure HDInsightban az Azure Portallal való létrehozásával kapcsolatban leírtak szerint.
A fürt létrehozása után lépjen a fürt tulajdonságaira a Kafka REST-proxy URL-címének rögzítéséhez.
Ügyfélalkalmazás-minta
A Python-kód használatával használhatja a REST-proxyt a Kafka-fürtön. A kódminta használatához kövesse az alábbi lépéseket:
Mentse a mintakódot egy számítógépen, amelyen telepítve van a Python.
Telepítse a szükséges Python-függőségeket a végrehajtással
pip3 install msal
.Módosítsa a kódszakaszt Konfigurálja ezeket a tulajdonságokat , és frissítse a következő tulajdonságokat a környezetéhez:
Tulajdonság Leírás Bérlőazonosító Az Az Azure-bérlő, ahol az előfizetése található. Ügyfél azonosítója A biztonsági csoportban regisztrált alkalmazás azonosítója. Titkos ügyfélkulcs A biztonsági csoportban regisztrált alkalmazás titkos kódja. Kafkarest_endpoint Kérje le ezt az értéket a fürt Tulajdonságok lapjának áttekintéséből az üzembe helyezési szakaszban leírtak szerint. A következő formátumban kell lennie : https://<clustername>-kafkarest.azurehdinsight.net
A parancssorból hajtsa végre a Python-fájlt a végrehajtással
sudo python3 <filename.py>
Ez a kód a következő műveletet hajtja végre:
- OAuth-jogkivonatot hív le a Microsoft Entra-azonosítóból.
- Bemutatja, hogyan lehet kérelmet küldeni a Kafka REST-proxyhoz.
Az OAuth-jogkivonatok Pythonban való lekéréséről további információt a Python AuthenticationContext osztályban talál. Előfordulhat, hogy a Kafka REST-proxyn keresztül nem létrehozott vagy törölt késés topics
jelenik meg. Ezt a késést a gyorsítótár frissítése okozza. A Producer API értékmezője ki lett javítva. Most elfogadja a JSON-objektumokat és minden szerializált űrlapot.
#Required Python packages
#pip3 install msal
import json
import msal
import random
import requests
import string
import sys
import time
def get_random_string():
letters = string.ascii_letters
random_string = ''.join(random.choice(letters) for i in range(7))
return random_string
#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'aaaabbbb-0000-cccc-1111-dddd2222eeee'
# Your Client Application Id
client_id = '00001111-aaaa-2222-bbbb-3333cccc4444'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#
# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id
app = msal.ConfidentialClientApplication(
client_id , client_secret, authority,
#cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)
# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10
# Print access token
print("Access token: " + accessToken)
# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}' # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'
# Request header
headers = {
'Authorization': 'Bearer ' + accessToken,
'Content-type': 'application/json' # set Content-type to 'application/json'
}
# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")
topics = []
# Create a new topic
# Example of topic config
topic_config = {
"partition_count": 1,
"replication_factor": 1,
"topic_properties": {
"retention.ms": 604800000,
"min.insync.replicas": "1"
}
}
create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)
if response.ok:
while new_topic not in topics:
print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
time.sleep(30)
# List Topic
get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)
response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
topic_list = response.json()
topics = topic_list.get("topics", [])
else:
print("Topic " + new_topic + " was created. Exit.")
sys.exit(1)
# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
"records": [
{
"key": "key1",
"value": "**********" # A string
},
{
"partition": 0,
"value": 5 # An integer
},
{
"value": 3.14 # A floating number
},
{
"value": { # A JSON object
"id": 1,
"name": "HDInsight Kafka REST proxy"
}
},
{
"value": [ # A list of JSON objects
{
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
{
"id": 2,
"name": "HDInsight Kafka REST proxy 2"
},
{
"id": 3,
"name": "HDInsight Kafka REST proxy 3"
}
]
},
{
"value": { # A nested JSON object
"group id": 1,
"HDI Kafka REST": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
"HDI Kafka REST server info": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1",
"servers": [
{
"server id": 1,
"server name": "HDInsight Kafka REST proxy server 1"
},
{
"server id": 2,
"server name": "HDInsight Kafka REST proxy server 2"
},
{
"server id": 3,
"server name": "HDInsight Kafka REST proxy server 3"
}
]
}
}
}
]
}
print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)
# Consume messages from the topic
partition_id = 0
offset = 0
count = 2
while True:
consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
print("Consuming " + str(count) + " messages from offset " + str(offset))
response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)
if response.ok:
messages = response.json()
print("Consumed messages: \n" + json.dumps(messages, indent=2))
next_offset = response.headers.get("NextOffset")
if offset == next_offset or not messages.get("records", []):
print("Consumer caught up with producer. Exit for now...")
break
offset = next_offset
else:
print("Error " + str(response.status_code))
break
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from " + get_partitions_url)
response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))
# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from " + get_partition_url)
response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))
Az alábbiakban egy másik példát talál arra vonatkozóan, hogyan kérhet le jogkivonatot az Azure for REST-proxyból curl-paranccsal. Figyelje meg, hogy a jogkivonat lekérésekor szükségünk van a scope=https://hib.azurehdinsight.net/.default
megadottra.
curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'