共用方式為


使用 REST Proxy 與 Azure HDInsight 中的 Apache Kafka 叢集互動

Kafka REST Proxy 可讓您透過 HTTPS 透過 REST API 與 Kafka 叢集互動。 此動作表示 Kafka 用戶端可以位於虛擬網路外部。 用戶端可以簡單、安全的 HTTPS 呼叫 Kafka 叢集,而不是依賴 Kafka 連結庫。 本文說明如何建立已啟用 REST Proxy 的 Kafka 叢集。 也提供範例程式代碼,示範如何呼叫 REST Proxy。

REST API 參考資料

如需 Kafka REST API 支援的作業,請參閱 HDInsight Kafka REST Proxy API 參考

背景

Kafka REST proxy design.

如需 API 所支援作業的完整規格,請參閱 Apache Kafka REST Proxy API

REST Proxy 端點

使用 REST Proxy 建立 HDInsight Kafka 叢集會為您的叢集建立新的公用端點,您可以在 Azure 入口網站 上的 HDInsight 叢集屬性中找到此端點。

安全性

存取使用 Microsoft Entra 安全組管理的 Kafka REST Proxy。 建立 Kafka 叢集時,請提供具有 REST 端點存取權的 Microsoft Entra 安全組。 需要存取 REST Proxy 的 Kafka 客戶端應該由群組擁有者註冊至此群組。 群組擁有者可以透過入口網站或PowerShell註冊。

針對 REST Proxy 端點要求,用戶端應用程式應取得 OAuth 令牌。 令牌會使用 來驗證安全組成員資格。 尋找用戶端應用程式範例會示範如何取得 OAuth 令牌。 用戶端應用程式會將 HTTPS 要求中的 OAuth 令牌傳遞至 REST Proxy。

注意

若要深入瞭解 Microsoft Entra 安全組,請參閱 使用 Microsoft Entra 群組管理應用程式和資源存取。 如需 OAuth 令牌運作方式的詳細資訊,請參閱 使用 OAuth 2.0 程式代碼授與流程授權 Microsoft Entra Web 應用程式的存取權。

具有網路安全組的 Kafka REST Proxy

如果您攜帶自己的 VNet 並控制網路安全組的網路流量,除了埠 443 之外,還允許9400 上的輸入流量。 這可確保可連線到 Kafka REST Proxy 伺服器。

必要條件

  1. 使用 Microsoft Entra ID 註冊應用程式。 您撰寫以與 Kafka REST Proxy 互動的用戶端應用程式會使用此應用程式的識別碼和秘密向 Azure 進行驗證。

  2. 建立 Microsoft Entra 安全組。 將您向 Microsoft Entra ID 註冊的應用程式新增至安全組作為 群組的成員 。 此安全組將用來控制哪些應用程式允許與 REST Proxy 互動。 如需建立 Microsoft Entra 群組的詳細資訊,請參閱 使用 Microsoft Entra 標識符建立基本群組和新增成員。

    驗證群組的類型 為 SecuritySecurity Group.

    驗證應用程式是否為 Group 的成員。 Check Membership.

建立已啟用 REST Proxy 的 Kafka 叢集

這些步驟會使用 Azure 入口網站。 如需使用 Azure CLI 的範例,請參閱 使用 Azure CLI 建立 Apache Kafka REST Proxy 叢集。

  1. 在 Kafka 叢集建立工作流程期間,在 [ 安全性 + 網络] 索引 標籤中,核取 [ 啟用 Kafka REST Proxy ] 選項。

    Screenshot shows the Create HDInsight cluster page with Security + networking selected.

  2. 按兩下 [ 選取安全組]。 從安全組清單中,選取您想要存取 REST Proxy 的安全組。 您可以使用搜尋方塊來尋找適當的安全組。 按兩下底部的 [ 選取 ] 按鈕。

    Screenshot shows the Create HDInsight cluster page with the option to select a security group.

  3. 完成建立叢集的其餘步驟,如使用 Azure 入口網站 在 Azure HDInsight 中建立 Apache Kafka 叢集中所述

  4. 建立叢集之後,請移至叢集屬性以記錄 Kafka REST Proxy URL。

    view REST proxy URL.

用戶端應用程式範例

您可以使用 Python 程式代碼與 Kafka 叢集上的 REST Proxy 互動。 若要使用程式代碼範例,請遵循下列步驟:

  1. 將範例程式代碼儲存在已安裝 Python 的電腦上。

  2. 執行 pip3 install msal來安裝必要的 Python 相依性。

  3. 修改程式代碼區段 設定這些屬性 ,並更新您環境的下列屬性:

    屬性 說明
    租用戶識別碼 訂用帳戶所在的 Azure 租使用者。
    用戶端識別碼 您在安全組中註冊之應用程式的識別碼。
    用戶端祕密 您在安全組中註冊之應用程式的秘密。
    Kafkarest_endpoint 叢集概觀中的 [屬性] 索引標籤取得此值,如部署一 所述。 其格式應如下: https://<clustername>-kafkarest.azurehdinsight.net
  4. 從命令行執行 Python 檔案 sudo python3 <filename.py>

此程式代碼會執行下列動作:

  1. 從 Microsoft Entra 識別符擷取 OAuth 令牌。
  2. 示範如何向 Kafka REST Proxy 提出要求。

如需在 Python 中取得 OAuth 令牌的詳細資訊,請參閱 Python AuthenticationContext 類別。 您可能會在未透過 Kafka REST Proxy 建立或刪除時 topics 看到延遲。 此延遲是因為快取重新整理。 已增強產生者 API 的值欄位。 現在,它會接受 JSON 物件和任何串行化表單。

#Required Python packages
#pip3 install msal

import json
import msal
import random
import requests
import string
import sys
import time

def get_random_string():
    letters = string.ascii_letters
    random_string = ''.join(random.choice(letters) for i in range(7))

    return random_string


#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'aaaabbbb-0000-cccc-1111-dddd2222eeee'
# Your Client Application Id
client_id = '00001111-aaaa-2222-bbbb-3333cccc4444'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#

# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id

app = msal.ConfidentialClientApplication(
    client_id , client_secret, authority,
    #cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)

# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10

# Print access token
print("Access token: " + accessToken)

# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}'  # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'

# Request header
headers = {
    'Authorization': 'Bearer ' + accessToken,
    'Content-type': 'application/json'          # set Content-type to 'application/json'
}

# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")

topics = []

# Create a  new topic
# Example of topic config
topic_config = {
    "partition_count": 1,
    "replication_factor": 1,
    "topic_properties": {
        "retention.ms": 604800000,
        "min.insync.replicas": "1"
    }
}

create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)

if response.ok:
    while new_topic not in topics:
        print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
        time.sleep(30)
        # List Topic
        get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)

        response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
        topic_list = response.json()
        topics = topic_list.get("topics", [])
else:
    print("Topic " + new_topic + " was created. Exit.")
    sys.exit(1)

# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
    "records": [
        {
            "key": "key1",
            "value": "**********"         # A string                              
        },
        {
            "partition": 0,
            "value": 5                    # An integer
        },
        {
            "value": 3.14                 # A floating number
        },
        {
            "value": {                    # A JSON object
                "id": 1,
                "name": "HDInsight Kafka REST proxy"
            }
        },
        {
            "value": [                    # A list of JSON objects
                {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                {
                    "id": 2,
                    "name": "HDInsight Kafka REST proxy 2"
                },
                {
                    "id": 3,
                    "name": "HDInsight Kafka REST proxy 3"
                }
            ]
        },
        {
            "value": {                  # A nested JSON object
                "group id": 1,
                "HDI Kafka REST": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                "HDI Kafka REST server info": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1",
                    "servers": [
                        {
                            "server id": 1,
                            "server name": "HDInsight Kafka REST proxy server 1"
                        },
                        {
                            "server id": 2,
                            "server name": "HDInsight Kafka REST proxy server 2"
                        },
                        {
                            "server id": 3,
                            "server name": "HDInsight Kafka REST proxy server 3"
                        }
                    ]
                }
            }
        }
    ]
}

print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)

# Consume messages from the topic
partition_id = 0
offset = 0
count = 2

while True:
    consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
    print("Consuming " + str(count) + " messages from offset " + str(offset))

    response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)

    if response.ok:
        messages = response.json()
        print("Consumed messages: \n" + json.dumps(messages, indent=2))
        next_offset = response.headers.get("NextOffset")
        if offset == next_offset or not messages.get("records", []):
            print("Consumer caught up with producer. Exit for now...")
            break

        offset = next_offset

    else:
        print("Error " + str(response.status_code))
        break
        
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from  " + get_partitions_url)

response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))

# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from  " + get_partition_url)

response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))

請尋找以下另一個範例,瞭解如何使用 curl 命令從 Azure 取得 REST Proxy 的令牌。 請注意,取得權杖時,我們需要 scope=https://hib.azurehdinsight.net/.default 指定的 。

curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'

下一步