分享方式:


在 Azure 服務匯流排佇列 (Python) 中傳送和接收訊息

在本教學課程中,您會完成下列步驟:

  1. 使用 Azure 入口網站建立服務匯流排命名空間。
  2. 使用 Azure 入口網站建立服務匯流排佇列。
  3. 撰寫 Python 程式碼以使用 azure-servicebus 套件:
    1. 將一組訊息傳送到佇列。
    2. 從佇列接收這些訊息。

注意

本快速入門提供將訊息傳送至服務匯流排佇列,並加以接收的簡單案例逐步指示。 您可以在 GitHub 上適用於 Python 存放庫的 Azure SDK 中找到適用於 Azure 服務匯流排的預先建置 JavaScript 和 TypeScript 樣本。

必要條件

如果您不熟悉服務,請先參閱服務匯流排概觀,再執行本快速入門。

若要搭配您自己的 Azure 帳戶使用此快速入門:

  • 安裝 Azure CLI,為開發人員電腦提供無密碼驗證。
  • 在終端機或命令提示字元中,使用 az login 登入您的 Azure 帳戶。
  • 當您將適當的資料角色新增至資源時,請使用相同的帳戶。
  • 在相同的終端機或命令提示字元中執行程式碼。
  • 記下您的服務匯流排命名空間的佇列名稱。 程式碼中需要這些資訊。

注意

本教學課程使用您可以透過 Python 來複製和執行的範例。 如需如何建立 Python 應用程式的指示,請參閱建立 Python 應用程式並部署至 Azure 網站。 如需安裝本教學課程中所使用套件的詳細資訊,請參閱 Python 安裝指南

在 Azure 入口網站中建立命名空間

若要開始在 Azure 中使用服務匯流排傳訊實體,您必須先使用 Azure 中的唯一名稱建立命名空間。 命名空間為應用程式內的服務匯流排資源 (佇列、主題等) 提供範圍容器。

若要建立命名空間:

  1. 登入 Azure 入口網站

  2. 瀏覽至 [所有服務] 頁面

  3. 在左側導覽列上,從類別清單中選取 [整合],將滑鼠停留在 [服務匯流排] 上方,然後選取 [服務匯流排] 圖格上的 + 按鈕。

    此圖顯示在功能表中依序選取 [建立資源]、[整合] 及 [服務匯流排]。

  4. 在 [建立命名空間] 頁面的 [基本] 標籤中,遵循下列步驟:

    1. 針對 [訂用帳戶],選擇要在其中建立命名空間的 Azure 訂用帳戶。

    2. 針對 [資源群組],選擇將存留命名空間的現有資源群組,或是建立新的資源群組。

    3. 輸入命名空間的名稱。 命名空間名稱應遵循下列命名慣例:

      • 名稱在整個 Azure 中必須是唯一的。 系統會立即檢查此名稱是否可用。
      • 名稱長度至少 6 個字元,最多 50 個字元。
      • 名稱僅可包含字母、數字和連字號 "-"。
      • 名稱開頭必須為字母,且結尾必須為字母或數字。
      • 名稱結尾不得為「-sb」或「-mgmt」。
    4. 針對 [位置],選擇應裝載命名空間的區域。

    5. 針對定價層,選取命名空間的定價層 (基本、標準或進階)。 針對本快速入門,選取 [標準]

      重要

      如果您想要使用主題和訂用帳戶,請選擇 [標準] 或 [進階]。 基本定價層不支援主題/訂用帳戶。

      若已選取 [進階] 定價層,請指定傳訊單位數目。 進階層可讓您的資源在 CPU 和記憶體層級上獲得隔離,讓每個工作負載能夠獨立執行。 此資源容器稱為傳訊單位。 進階命名空間都至少有一個傳訊單位。 您可以為每個服務匯流排的進階命名空間選取 1、2、4、8 或 16 個傳訊單位。 如需詳細資訊,請參閱服務匯流排進階傳訊

    6. 選取頁面底部的 [檢閱 + 建立] 。

      此圖顯示 [建立命名空間] 頁面

    7. 在 [檢閱 + 建立] 頁面上檢閱設定,然後選取 [建立]

  5. 一旦部署資源成功,請在部署頁面上選取 [移至資源]

    此圖顯示部署成功頁面及 [前往資源] 連結。

  6. 您會看到服務匯流排命名空間的首頁。

    此圖顯示所建立服務匯流排命名空間的首頁。

在 Azure 入口網站中建立佇列

  1. 在 [服務匯流排命名空間] 頁面的左側導覽功能表中選取 [佇列]

  2. 在 [佇列] 頁面上,選取工具列上的 [+ 佇列]

  3. 輸入佇列的 [名稱],並且讓其他值保留其預設值。

  4. 現在,選取 [建立]

    此圖顯示在入口網站中建立佇列

向 Azure 驗證應用程式

本快速入門顯示兩種連線到 Azure 服務匯流排的方式:無密碼連接字串

第一個選項顯示如何使用 Microsoft Entra ID 中的安全性主體和角色型存取控制 (RBAC),來連線到服務匯流排命名空間。 您不需要擔心在程式碼或設定檔或 Azure Key Vault 等安全儲存體中,有硬式編碼連接字串。

第二個選項顯示如何使用連接字串來連線到服務匯流排命名空間。 如果您不熟悉 Azure,則可能會發現連接字串選項更容易遵循。 建議在真實世界應用程式和實際執行環境中使用無密碼選項。 如需詳細資訊,請參閱驗證與授權。 您也可以在概觀頁面上,深入了解無密碼驗證。

將角色指派給 Microsoft Entra 使用者

在本機開發時,請確定連線到 Azure 服務匯流排的使用者帳戶具有正確的權限。 您需要 Azure 服務匯流排資料擁有者角色,才能傳送和接收訊息。 若要將此角色指派給您自己,您需要使用者存取管理員角色,或另一個包含 Microsoft.Authorization/roleAssignments/write 動作的角色。 您可以使用 Azure 入口網站、Azure CLI 或 Azure PowerShell,將 Azure RBAC 角色指派給使用者。 您可以在範圍概觀頁面上,深入了解角色指派的可用範圍。

下列範例會將 Azure Service Bus Data Owner 角色指派給您的使用者帳戶,該角色提供對 Azure 服務匯流排資源的完整存取權。 在實際案例中,遵循最低權限原則,只為使用者提供更安全實際執行環境所需的最低權限。

Azure 服務匯流排的 Azure 內建角色

對於 Azure 服務匯流排來說,透過 Azure 入口網站和 Azure 資源管理 API 來的管理命名空間和所有相關資源的作業,皆已使用 Azure RBAC 模型來加以保護。 Azure 提供下列 Azure 內建角色,以授權存取服務匯流排命名空間:

如果您想要建立自訂角色,請參閱服務匯流排作業所需的權限

將 Microsoft Entra 使用者新增至 Azure 服務匯流排擁有者角色

將您的 Microsoft Entra 使用者名稱新增至服務匯流排命名空間層級的 Azure 服務匯流排資料擁有者角色。 其可讓在使用者帳戶內容中執行的應用程式將訊息傳送至佇列或主題,以及從佇列或主題的訂用帳戶接收訊息。

重要

在大部分情況下,角色指派在 Azure 中傳播只需要一兩分鐘。 在少數情況下,可能需要長達八分鐘的時間。 如果您第一次執行程式碼時收到驗證錯誤,請稍候片刻再試一次。

  1. 如果沒有在 Azure 入口網站中開啟 [服務匯流排命名空間] 頁面,請使用主要搜尋列或左側導覽找出您的服務匯流排命名空間。

  2. 在概觀頁面上,從左側功能表中選取 [存取控制 (IAM)]

  3. 在 [存取控制 (IAM)] 頁面上,選取 [角色指派] 索引標籤。

  4. 從頂端功能表選取 [+ 新增],然後從產生的下拉功能表中選取 [新增角色指派]

    顯示如何指派角色的螢幕擷取畫面。

  5. 使用搜尋方塊,從結果篩選出所需的角色。 在此範例中,搜尋 Azure Service Bus Data Owner 並選取相符的結果。 接著,選擇 [下一步]

  6. 在 [存取權指派對象為] 下,選取 [使用者、群組或服務主體],然後選擇 [+ 選取成員]

  7. 在對話方塊中,搜尋 Microsoft Entra 使用者名稱 (通常是您的 user@domain 電子郵件地址),然後在對話方塊底部選擇 [選取]

  8. 選取 [檢閱 + 指派] 以移至最終頁面,然後再次選取 [檢閱 + 指派] 以完成此程序。

使用 PIP 安裝套件

  1. 若要安裝本服務匯流排教學課程所需的 Python 套件,請開啟在路徑中有 Python 的命令提示字元,並將目錄切換至您要用來存放範例的資料夾。

  2. 安裝下列套件:

    pip install azure-servicebus
    pip install azure-identity
    pip install aiohttp
    

傳送訊息至佇列

下列範例程式碼示範如何將訊息傳送至佇列。 開啟您慣用的編輯器 (例如 Visual Studio Code),建立檔案 send.py,並將下列程式碼新增至其中。

  1. 新增 import 陳述式。

    import asyncio
    from azure.servicebus.aio import ServiceBusClient
    from azure.servicebus import ServiceBusMessage
    from azure.identity.aio import DefaultAzureCredential
    
  2. 新增常數並定義認證。

    FULLY_QUALIFIED_NAMESPACE = "FULLY_QUALIFIED_NAMESPACE"
    QUEUE_NAME = "QUEUE_NAME"
    
    credential = DefaultAzureCredential()
    

    重要

    • FULLY_QUALIFIED_NAMESPACE 取代為服務匯流排命名空間的完整命名空間。
    • QUEUE_NAME 取代為佇列名稱。
  3. 新增方法來傳送單一訊息。

    async def send_single_message(sender):
        # Create a Service Bus message and send it to the queue
        message = ServiceBusMessage("Single Message")
        await sender.send_messages(message)
        print("Sent a single message")
    

    傳送者是物件,可作為您所建立之佇列的用戶端。 您稍後會建立此項目,並以引數的形式傳送至此函式。

  4. 新增方法來傳送訊息清單。

    async def send_a_list_of_messages(sender):
        # Create a list of messages and send it to the queue
        messages = [ServiceBusMessage("Message in list") for _ in range(5)]
        await sender.send_messages(messages)
        print("Sent a list of 5 messages")
    
  5. 新增方法來傳送訊息批次。

    async def send_batch_message(sender):
        # Create a batch of messages
        async with sender:
            batch_message = await sender.create_message_batch()
            for _ in range(10):
                try:
                    # Add a message to the batch
                    batch_message.add_message(ServiceBusMessage("Message inside a ServiceBusMessageBatch"))
                except ValueError:
                    # ServiceBusMessageBatch object reaches max_size.
                    # New ServiceBusMessageBatch object can be created here to send more data.
                    break
            # Send the batch of messages to the queue
            await sender.send_messages(batch_message)
        print("Sent a batch of 10 messages")
    
  6. 建立服務匯流排用戶端,然後建立傳送訊息的佇列傳送者物件。

    async def run():
        # create a Service Bus client using the credential
        async with ServiceBusClient(
            fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
            credential=credential,
            logging_enable=True) as servicebus_client:
            # get a Queue Sender object to send messages to the queue
            sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
            async with sender:
                # send one message
                await send_single_message(sender)
                # send a list of messages
                await send_a_list_of_messages(sender)
                # send a batch of messages
                await send_batch_message(sender)
    
            # Close credential when no longer needed.
            await credential.close()
    
  7. 呼叫 run 方法並列印訊息。

    asyncio.run(run())
    print("Done sending messages")
    print("-----------------------")
    

從佇列接收訊息

下列範例程式碼示範如何從佇列接收訊息。 下方所示程式碼會持續收到新訊息,直到 5 (max_wait_time) 秒未收到任何新訊息為止。

開啟您慣用的編輯器 (例如 Visual Studio Code),建立檔案 recv.py,並將下列程式碼新增至其中。

  1. 類似於傳送範例,請新增 import 陳述式、定義您應該以自己的值取代的常數,以及定義認證。

    import asyncio
    
    from azure.servicebus.aio import ServiceBusClient
    from azure.identity.aio import DefaultAzureCredential
    
    FULLY_QUALIFIED_NAMESPACE = "FULLY_QUALIFIED_NAMESPACE"
    QUEUE_NAME = "QUEUE_NAME"
    
    credential = DefaultAzureCredential()
    
  2. 建立服務匯流排用戶端,然後建立接收訊息的佇列接收者物件。

    async def run():
        # create a Service Bus client using the connection string
        async with ServiceBusClient(
            fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
            credential=credential,
            logging_enable=True) as servicebus_client:
    
            async with servicebus_client:
                # get the Queue Receiver object for the queue
                receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
                async with receiver:
                    received_msgs = await receiver.receive_messages(max_wait_time=5, max_message_count=20)
                    for msg in received_msgs:
                        print("Received: " + str(msg))
                        # complete the message so that the message is removed from the queue
                        await receiver.complete_message(msg)
    
            # Close credential when no longer needed.
            await credential.close()
    
  3. 呼叫 run 方法。

    asyncio.run(run())
    

執行應用程式

開啟在路徑中包含 Python 的命令提示字元,然後執行程式碼以從佇列傳送和接收訊息。

python send.py; python recv.py

您應該會看見下列輸出:

Sent a single message
Sent a list of 5 messages
Sent a batch of 10 messages
Done sending messages
-----------------------
Received: Single Message
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch

瀏覽至 Azure 入口網站中您的服務匯流排命名空間。 在概觀頁面上,確認傳入傳出訊息計數為 16。 如果未看到此計數,請等候幾分鐘後重新整理頁面。

內送和外寄郵件計數

選取此概觀頁面上的佇列,瀏覽至服務匯流排佇列頁面。 您也會在此頁面上看到傳入傳出訊息計數。 您也會看到其他資訊,例如佇列的目前大小作用中訊息計數等等。

佇列詳細資料

下一步

請參閱下列文件和範例: