在 Azure 服務匯流排佇列 (Python) 中傳送和接收訊息
在本教學課程中,您會完成下列步驟:
- 使用 Azure 入口網站建立服務匯流排命名空間。
- 使用 Azure 入口網站建立服務匯流排佇列。
- 撰寫 Python 程式碼以使用 azure-servicebus 套件:
- 將一組訊息傳送到佇列。
- 從佇列接收這些訊息。
注意
本快速入門提供將訊息傳送至服務匯流排佇列,並加以接收的簡單案例逐步指示。 您可以在 GitHub 上適用於 Python 存放庫的 Azure SDK 中找到適用於 Azure 服務匯流排的預先建置 JavaScript 和 TypeScript 樣本。
必要條件
如果您不熟悉服務,請先參閱服務匯流排概觀,再執行本快速入門。
Azure 訂用帳戶。 若要完成此教學課程,您需要 Azure 帳戶。 您可以啟用自己的 MSDN 訂戶權益或是註冊免費帳戶。
Python 3.8 或更新版本。
若要搭配您自己的 Azure 帳戶使用此快速入門:
- 安裝 Azure CLI,為開發人員電腦提供無密碼驗證。
- 在終端機或命令提示字元中,使用
az login
登入您的 Azure 帳戶。 - 當您將適當的資料角色新增至資源時,請使用相同的帳戶。
- 在相同的終端機或命令提示字元中執行程式碼。
- 記下您的服務匯流排命名空間的佇列名稱。 程式碼中需要這些資訊。
注意
本教學課程使用您可以透過 Python 來複製和執行的範例。 如需如何建立 Python 應用程式的指示,請參閱建立 Python 應用程式並部署至 Azure 網站。 如需安裝本教學課程中所使用套件的詳細資訊,請參閱 Python 安裝指南。
在 Azure 入口網站中建立命名空間
若要開始在 Azure 中使用服務匯流排傳訊實體,您必須先使用 Azure 中的唯一名稱建立命名空間。 命名空間為應用程式內的服務匯流排資源 (佇列、主題等) 提供範圍容器。
若要建立命名空間:
登入 Azure 入口網站。
瀏覽至 [所有服務] 頁面。
在左側導覽列上,從類別清單中選取 [整合],將滑鼠停留在 [服務匯流排] 上方,然後選取 [服務匯流排] 圖格上的 + 按鈕。
在 [建立命名空間] 頁面的 [基本] 標籤中,遵循下列步驟:
針對 [訂用帳戶],選擇要在其中建立命名空間的 Azure 訂用帳戶。
針對 [資源群組],選擇將存留命名空間的現有資源群組,或是建立新的資源群組。
輸入命名空間的名稱。 命名空間名稱應遵循下列命名慣例:
- 名稱在整個 Azure 中必須是唯一的。 系統會立即檢查此名稱是否可用。
- 名稱長度至少 6 個字元,最多 50 個字元。
- 名稱僅可包含字母、數字和連字號 "-"。
- 名稱開頭必須為字母,且結尾必須為字母或數字。
- 名稱結尾不得為「-sb」或「-mgmt」。
針對 [位置],選擇應裝載命名空間的區域。
針對定價層,選取命名空間的定價層 (基本、標準或進階)。 針對本快速入門,選取 [標準]。
重要
如果您想要使用主題和訂用帳戶,請選擇 [標準] 或 [進階]。 基本定價層不支援主題/訂用帳戶。
若已選取 [進階] 定價層,請指定傳訊單位數目。 進階層可讓您的資源在 CPU 和記憶體層級上獲得隔離,讓每個工作負載能夠獨立執行。 此資源容器稱為傳訊單位。 進階命名空間都至少有一個傳訊單位。 您可以為每個服務匯流排的進階命名空間選取 1、2、4、8 或 16 個傳訊單位。 如需詳細資訊,請參閱服務匯流排進階傳訊。
選取頁面底部的 [檢閱 + 建立] 。
在 [檢閱 + 建立] 頁面上檢閱設定,然後選取 [建立]。
一旦部署資源成功,請在部署頁面上選取 [移至資源]。
您會看到服務匯流排命名空間的首頁。
在 Azure 入口網站中建立佇列
在 [服務匯流排命名空間] 頁面的左側導覽功能表中選取 [佇列]。
在 [佇列] 頁面上,選取工具列上的 [+ 佇列]。
輸入佇列的 [名稱],並且讓其他值保留其預設值。
現在,選取 [建立]。
向 Azure 驗證應用程式
本快速入門顯示兩種連線到 Azure 服務匯流排的方式:無密碼和連接字串。
第一個選項顯示如何使用 Microsoft Entra ID 中的安全性主體和角色型存取控制 (RBAC),來連線到服務匯流排命名空間。 您不需要擔心在程式碼或設定檔或 Azure Key Vault 等安全儲存體中,有硬式編碼連接字串。
第二個選項顯示如何使用連接字串來連線到服務匯流排命名空間。 如果您不熟悉 Azure,則可能會發現連接字串選項更容易遵循。 建議在真實世界應用程式和實際執行環境中使用無密碼選項。 如需詳細資訊,請參閱驗證與授權。 您也可以在概觀頁面上,深入了解無密碼驗證。
將角色指派給 Microsoft Entra 使用者
在本機開發時,請確定連線到 Azure 服務匯流排的使用者帳戶具有正確的權限。 您需要 Azure 服務匯流排資料擁有者角色,才能傳送和接收訊息。 若要將此角色指派給您自己,您需要使用者存取管理員角色,或另一個包含 Microsoft.Authorization/roleAssignments/write
動作的角色。 您可以使用 Azure 入口網站、Azure CLI 或 Azure PowerShell,將 Azure RBAC 角色指派給使用者。 您可以在範圍概觀頁面上,深入了解角色指派的可用範圍。
下列範例會將 Azure Service Bus Data Owner
角色指派給您的使用者帳戶,該角色提供對 Azure 服務匯流排資源的完整存取權。 在實際案例中,遵循最低權限原則,只為使用者提供更安全實際執行環境所需的最低權限。
Azure 服務匯流排的 Azure 內建角色
對於 Azure 服務匯流排來說,透過 Azure 入口網站和 Azure 資源管理 API 來的管理命名空間和所有相關資源的作業,皆已使用 Azure RBAC 模型來加以保護。 Azure 提供下列 Azure 內建角色,以授權存取服務匯流排命名空間:
- Azure 服務匯流排資料擁有者:啟用對服務匯流排命名空間及其實體 (佇列、主題、訂用帳戶和篩選條件) 的資料存取。 此角色的成員可以從佇列或主題/訂用帳戶傳送和接收訊息。
- Azure 服務匯流排資料傳送者:使用此角色來授與服務匯流排命名空間及其實體的資料傳送權限。
- Azure 服務匯流排資料接收者:使用此角色來授與服務匯流排命名空間及其實體的資料接收權限。
如果您想要建立自訂角色,請參閱服務匯流排作業所需的權限。
將 Microsoft Entra 使用者新增至 Azure 服務匯流排擁有者角色
將您的 Microsoft Entra 使用者名稱新增至服務匯流排命名空間層級的 Azure 服務匯流排資料擁有者角色。 其可讓在使用者帳戶內容中執行的應用程式將訊息傳送至佇列或主題,以及從佇列或主題的訂用帳戶接收訊息。
重要
在大部分情況下,角色指派在 Azure 中傳播只需要一兩分鐘。 在少數情況下,可能需要長達八分鐘的時間。 如果您第一次執行程式碼時收到驗證錯誤,請稍候片刻再試一次。
如果沒有在 Azure 入口網站中開啟 [服務匯流排命名空間] 頁面,請使用主要搜尋列或左側導覽找出您的服務匯流排命名空間。
在概觀頁面上,從左側功能表中選取 [存取控制 (IAM)]。
在 [存取控制 (IAM)] 頁面上,選取 [角色指派] 索引標籤。
從頂端功能表選取 [+ 新增],然後從產生的下拉功能表中選取 [新增角色指派]。
使用搜尋方塊,從結果篩選出所需的角色。 在此範例中,搜尋
Azure Service Bus Data Owner
並選取相符的結果。 接著,選擇 [下一步]。在 [存取權指派對象為] 下,選取 [使用者、群組或服務主體],然後選擇 [+ 選取成員]。
在對話方塊中,搜尋 Microsoft Entra 使用者名稱 (通常是您的 user@domain 電子郵件地址),然後在對話方塊底部選擇 [選取]。
選取 [檢閱 + 指派] 以移至最終頁面,然後再次選取 [檢閱 + 指派] 以完成此程序。
使用 PIP 安裝套件
若要安裝本服務匯流排教學課程所需的 Python 套件,請開啟在路徑中有 Python 的命令提示字元,並將目錄切換至您要用來存放範例的資料夾。
安裝下列套件:
pip install azure-servicebus pip install azure-identity pip install aiohttp
傳送訊息至佇列
下列範例程式碼示範如何將訊息傳送至佇列。 開啟您慣用的編輯器 (例如 Visual Studio Code),建立檔案 send.py,並將下列程式碼新增至其中。
新增 import 陳述式。
import asyncio from azure.servicebus.aio import ServiceBusClient from azure.servicebus import ServiceBusMessage from azure.identity.aio import DefaultAzureCredential
新增常數並定義認證。
FULLY_QUALIFIED_NAMESPACE = "FULLY_QUALIFIED_NAMESPACE" QUEUE_NAME = "QUEUE_NAME" credential = DefaultAzureCredential()
重要
- 將
FULLY_QUALIFIED_NAMESPACE
取代為服務匯流排命名空間的完整命名空間。 - 將
QUEUE_NAME
取代為佇列名稱。
- 將
新增方法來傳送單一訊息。
async def send_single_message(sender): # Create a Service Bus message and send it to the queue message = ServiceBusMessage("Single Message") await sender.send_messages(message) print("Sent a single message")
傳送者是物件,可作為您所建立之佇列的用戶端。 您稍後會建立此項目,並以引數的形式傳送至此函式。
新增方法來傳送訊息清單。
async def send_a_list_of_messages(sender): # Create a list of messages and send it to the queue messages = [ServiceBusMessage("Message in list") for _ in range(5)] await sender.send_messages(messages) print("Sent a list of 5 messages")
新增方法來傳送訊息批次。
async def send_batch_message(sender): # Create a batch of messages async with sender: batch_message = await sender.create_message_batch() for _ in range(10): try: # Add a message to the batch batch_message.add_message(ServiceBusMessage("Message inside a ServiceBusMessageBatch")) except ValueError: # ServiceBusMessageBatch object reaches max_size. # New ServiceBusMessageBatch object can be created here to send more data. break # Send the batch of messages to the queue await sender.send_messages(batch_message) print("Sent a batch of 10 messages")
建立服務匯流排用戶端,然後建立傳送訊息的佇列傳送者物件。
async def run(): # create a Service Bus client using the credential async with ServiceBusClient( fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE, credential=credential, logging_enable=True) as servicebus_client: # get a Queue Sender object to send messages to the queue sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) async with sender: # send one message await send_single_message(sender) # send a list of messages await send_a_list_of_messages(sender) # send a batch of messages await send_batch_message(sender) # Close credential when no longer needed. await credential.close()
呼叫
run
方法並列印訊息。asyncio.run(run()) print("Done sending messages") print("-----------------------")
從佇列接收訊息
下列範例程式碼示範如何從佇列接收訊息。 下方所示程式碼會持續收到新訊息,直到 5 (max_wait_time
) 秒未收到任何新訊息為止。
開啟您慣用的編輯器 (例如 Visual Studio Code),建立檔案 recv.py,並將下列程式碼新增至其中。
類似於傳送範例,請新增
import
陳述式、定義您應該以自己的值取代的常數,以及定義認證。import asyncio from azure.servicebus.aio import ServiceBusClient from azure.identity.aio import DefaultAzureCredential FULLY_QUALIFIED_NAMESPACE = "FULLY_QUALIFIED_NAMESPACE" QUEUE_NAME = "QUEUE_NAME" credential = DefaultAzureCredential()
建立服務匯流排用戶端,然後建立接收訊息的佇列接收者物件。
async def run(): # create a Service Bus client using the connection string async with ServiceBusClient( fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE, credential=credential, logging_enable=True) as servicebus_client: async with servicebus_client: # get the Queue Receiver object for the queue receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME) async with receiver: received_msgs = await receiver.receive_messages(max_wait_time=5, max_message_count=20) for msg in received_msgs: print("Received: " + str(msg)) # complete the message so that the message is removed from the queue await receiver.complete_message(msg) # Close credential when no longer needed. await credential.close()
呼叫
run
方法。asyncio.run(run())
執行應用程式
開啟在路徑中包含 Python 的命令提示字元,然後執行程式碼以從佇列傳送和接收訊息。
python send.py; python recv.py
您應該會看見下列輸出:
Sent a single message
Sent a list of 5 messages
Sent a batch of 10 messages
Done sending messages
-----------------------
Received: Single Message
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
瀏覽至 Azure 入口網站中您的服務匯流排命名空間。 在概觀頁面上,確認傳入和傳出訊息計數為 16。 如果未看到此計數,請等候幾分鐘後重新整理頁面。
選取此概觀頁面上的佇列,瀏覽至服務匯流排佇列頁面。 您也會在此頁面上看到傳入和傳出訊息計數。 您也會看到其他資訊,例如佇列的目前大小和作用中訊息計數等等。
下一步
請參閱下列文件和範例:
- 適用於 Python 的 Azure 服務匯流排用戶端程式庫
- 範例。
- sync_samples 資料夾中有範例,示範如何以同步方式與服務匯流排互動。
- sync_samples 資料夾中有範例,示範如何以同步方式與服務匯流排互動。 在本快速入門中,您使用了這個方法。
- azure-servicebus 參考文件