適用于 Python 的 Azure 儲存體佇列用戶端程式庫 - 12.9.0 版

Azure 佇列儲存體是一項儲存大量訊息的服務,全球任何地方都可利用 HTTP 或 HTTPS 並透過驗證的呼叫來存取這些訊息。 單一佇列訊息的大小上限為 64 KiB,而佇列可以包含數百萬則訊息,最多可達儲存體帳戶的總容量限制。

佇列儲存體的一般用途包括:

  • 建立積存的工作供非同步處理
  • 在分散式應用程式的不同部分之間傳遞訊息

| 原始程式碼套件 (PyPI) | 封裝 (Conda) | API 參考檔 | 產品檔 | 樣品

開始使用

Prerequisites

安裝套件

使用 pip安裝適用于 Python 的 Azure 儲存體佇列用戶端程式庫:

pip install azure-storage-queue

建立儲存體帳戶

如果您想要建立新的儲存體帳戶,您可以使用Azure 入口網站Azure PowerShellAzure CLI

# Create a new resource group to hold the storage account -
# if using an existing resource group, skip this step
az group create --name my-resource-group --location westus2

# Create the storage account
az storage account create -n my-storage-account-name -g my-resource-group

建立用戶端

適用于 Python 的 Azure 儲存體佇列用戶端程式庫可讓您與三種類型的資源互動:儲存體帳戶本身、佇列和訊息。 與這些資源的互動會從 用戶端的實例開始。 若要建立用戶端物件,您需要儲存體帳戶的佇列服務端點 URL,以及可讓您存取儲存體帳戶的認證:

from azure.storage.queue import QueueServiceClient

service = QueueServiceClient(account_url="https://<my-storage-account-name>.queue.core.windows.net/", credential=credential)

查閱帳戶 URL

您可以使用Azure入口網站、Azure PowerShellAzure CLI來尋找儲存體帳戶的佇列服務 URL:

# Get the queue service URL for the storage account
az storage account show -n my-storage-account-name -g my-resource-group --query "primaryEndpoints.queue"

認證類型

credential根據您想要使用的授權類型而定,參數可能會以許多不同的形式提供:

  1. 若要使用 共用存取簽章 (SAS) 權杖,請提供權杖作為字串。 如果您的帳戶 URL 包含 SAS 權杖,請省略認證參數。 您可以從 Azure 入口網站在 [共用存取簽章] 下產生 SAS 權杖,或使用其中一個 generate_sas() 函式建立儲存體帳戶或佇列的 SAS 權杖:

    from datetime import datetime, timedelta
    from azure.storage.queue import QueueServiceClient, generate_account_sas, ResourceTypes, AccountSasPermissions
    
    sas_token = generate_account_sas(
        account_name="<storage-account-name>",
        account_key="<account-access-key>",
        resource_types=ResourceTypes(service=True),
        permission=AccountSasPermissions(read=True),
        start=datetime.utcnow(),
        expiry=datetime.utcnow() + timedelta(hours=1)
    )
    
    queue_service_client = QueueServiceClient(account_url="https://<my_account_name>.queue.core.windows.net", credential=sas_token)
    
  2. 若要使用儲存體帳戶 共用金鑰 (也稱為帳戶金鑰或存取金鑰) ,請提供金鑰作為字串。 您可以在 Azure 入口網站的 [存取金鑰] 區段下,或執行下列 Azure CLI 命令來找到:

    az storage account keys list -g MyResourceGroup -n MyStorageAccount

    使用金鑰作為認證參數來驗證用戶端:

    from azure.storage.queue import QueueServiceClient
    service = QueueServiceClient(account_url="https://<my_account_name>.queue.core.windows.net", credential="<account_access_key>")
    
  3. 若要使用 Azure Active Directory (AAD) 權杖認證,請提供從 azure-identity 程式庫取得所需認證類型的實例。 例如, DefaultAzureCredential 可用來驗證用戶端。

    這需要一些初始設定:

    • 安裝 azure-identity
    • 註冊新的 AAD 應用程式 ,並提供存取 Azure 儲存體的許可權
    • Azure 入口網站中使用 RBAC 授與 Azure 佇列資料的存取權
    • 將 AAD 應用程式的用戶端識別碼、租使用者識別碼和用戶端密碼的值設定為環境變數:AZURE_TENANT_ID、AZURE_CLIENT_ID、AZURE_CLIENT_SECRET

    使用傳回的權杖認證來驗證用戶端:

        from azure.identity import DefaultAzureCredential
        from azure.storage.queue import QueueServiceClient
        token_credential = DefaultAzureCredential()
    
        queue_service_client = QueueServiceClient(
            account_url="https://<my_account_name>.queue.core.windows.net",
            credential=token_credential
        )
    

從連接字串建立用戶端

根據您的使用案例和授權方法,您可能偏好使用儲存體連接字串初始化用戶端實例,而不是個別提供帳戶 URL 和認證。 若要這樣做,請將儲存體連接字串傳遞至用戶端的 from_connection_string 類別方法:

from azure.storage.queue import QueueServiceClient

connection_string = "DefaultEndpointsProtocol=https;AccountName=xxxx;AccountKey=xxxx;EndpointSuffix=core.windows.net"
service = QueueServiceClient.from_connection_string(conn_str=connection_string)

您可以在 Azure 入口網站的 [存取金鑰] 區段下,或執行下列 CLI 命令,找到儲存體帳戶連接字串:

az storage account show-connection-string -g MyResourceGroup -n MyStorageAccount

重要概念

下列元件組成 Azure 佇列服務:

  • 儲存體帳戶本身
  • 儲存體帳戶內的佇列,其中包含一組訊息
  • 佇列中的訊息,格式上限為 64 KiB

適用于 Python 的 Azure 儲存體佇列用戶端程式庫可讓您透過使用專用用戶端物件,與每個元件互動。

非同步用戶端

此程式庫包含 Python 3.5+ 上支援的完整非同步 API。 若要使用它,您必須先安裝非同步傳輸,例如 aioHTTP。 如需詳細資訊 ,請參閱 azure 核心檔

不再需要非同步用戶端和認證時,應該關閉它們。 這些物件是非同步內容管理員,並定義非同步 close 方法。

用戶端

提供兩個不同的用戶端來與佇列服務的各種元件互動:

  1. QueueServiceClient - 此用戶端代表與 Azure 儲存體帳戶本身的互動,並可讓您取得預先設定的用戶端實例來存取其中的佇列。 它會提供作業來擷取和設定帳戶屬性,以及在帳戶內列出、建立和刪除佇列。 若要在特定佇列上執行作業,請使用 get_queue_client 方法擷取用戶端。
  2. QueueClient - 此用戶端代表與特定佇列 (的互動,這些佇列尚未存在) 。 它提供建立、刪除或設定佇列的作業,並包含傳送、接收、查看、刪除和更新其中訊息的作業。

訊息

  • 傳送 - 將訊息新增至佇列,並選擇性地設定訊息的可見度逾時。
  • 接收 - 從佇列擷取訊息,並讓其他取用者看不到訊息。
  • 預覽- 從佇列前端擷取訊息,而不變更訊息可見度。
  • 更新- 匯報訊息和/或訊息內容的可見度逾時。
  • 刪除 - 從佇列中刪除指定的訊息。
  • 清除 - 清除佇列中的所有訊息。

範例

下列各節提供數個程式碼片段,涵蓋一些最常見的儲存體佇列工作,包括:

建立佇列

在儲存體帳戶中建立佇列

from azure.storage.queue import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
queue.create_queue()

使用非同步用戶端建立佇列

from azure.storage.queue.aio import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
await queue.create_queue()

傳送訊息

將訊息傳送至佇列

from azure.storage.queue import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
queue.send_message("I'm using queues!")
queue.send_message("This is my second message")

以非同步方式傳送訊息

import asyncio
from azure.storage.queue.aio import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
await asyncio.gather(
    queue.send_message("I'm using queues!"),
    queue.send_message("This is my second message")
)

接收訊息

接收和處理來自佇列的訊息

from azure.storage.queue import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
response = queue.receive_messages()

for message in response:
    print(message.content)
    queue.delete_message(message)

# Printed messages from the front of the queue:
# >> I'm using queues!
# >> This is my second message

以批次方式接收和處理訊息

from azure.storage.queue import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
response = queue.receive_messages(messages_per_page=10)

for message_batch in response.by_page():
    for message in message_batch:
        print(message.content)
        queue.delete_message(message)

以非同步方式接收和處理訊息

from azure.storage.queue.aio import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
response = queue.receive_messages()

async for message in response:
    print(message.content)
    await queue.delete_message(message)

選用組態

可在用戶端和每個作業層級傳入的選擇性關鍵字引數。

重試原則設定

具現化用戶端以設定重試原則時,請使用下列關鍵字引數:

  • retry_total (int) :允許的重試次數總數。 優先于其他計數。 如果您不想在要求上重試,請傳入 retry_total=0 。 預設值為 10。
  • retry_connect (int) :要重試的連線相關錯誤數目。 預設值為 3。
  • retry_read (int) :讀取錯誤時重試的次數。 預設值為 3。
  • retry_status (int) :重試錯誤狀態碼的次數。 預設值為 3。
  • retry_to_secondary (bool) :如果能夠,是否應將要求重試至次要。 這應該只啟用 RA-GRS 帳戶的使用,而且可能會處理過時的資料。 預設值為 False

其他用戶端/每個作業組態

可在用戶端或個別作業上指定的其他選擇性組態關鍵字引數。

用戶端關鍵字引數:

  • connection_timeout (int) :用戶端將等候建立與伺服器的連線的秒數。 預設值為 20 秒。
  • read_timeout (int) :用戶端在連續讀取作業之間等候的秒數,以取得來自伺服器的回應。 這是通訊端層級逾時,不會受到整體資料大小的影響。 會自動重試用戶端讀取逾時。 預設值是 60 秒。
  • 傳輸 (任何) :使用者提供的傳輸以傳送 HTTP 要求。

每個作業關鍵字引數:

  • raw_response_hook (可呼叫) :指定的回呼會使用從服務傳回的回應。
  • raw_request_hook (可呼叫) :指定的回呼會在傳送至服務之前使用要求。
  • client_request_id (str) :選擇性使用者指定的要求識別。
  • user_agent (str) :將自訂值附加至要與要求一起傳送的使用者代理程式標頭。
  • logging_enable (bool) :啟用偵錯層級的記錄。 預設為 False。 也可以在用戶端層級傳入,以針對所有要求啟用它。
  • logging_body (bool) :啟用記錄要求和回應本文。 預設為 False。 也可以在用戶端層級傳入,以針對所有要求啟用它。
  • 標頭 (聽寫) :傳入自訂標頭做為索引鍵、值組。 例如 headers={'CustomValue': value}

疑難排解

一般

儲存體佇列用戶端會引發 Azure Core中定義的例外狀況。

此清單可用於參考,以攔截擲回的例外狀況。 若要取得例外狀況的特定錯誤碼,請使用 error_code 屬性,也就是 exception.error_code

記錄

此程式庫會使用標準記錄程式庫進行 記錄 。 HTTP 會話的基本資訊 (URL、標頭等。) 會記錄在 INFO 層級。

您可以在具有 引數的用戶端 logging_enable 上啟用詳細的 DEBUG 層級記錄,包括要求/回應主體和未回應標頭:

import sys
import logging
from azure.storage.queue import QueueServiceClient

# Create a logger for the 'azure.storage.queue' SDK
logger = logging.getLogger('azure.storage.queue')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

# This client will log detailed information about its HTTP sessions, at DEBUG level
service_client = QueueServiceClient.from_connection_string("your_connection_string", logging_enable=True)

同樣地,logging_enable 可對單一作業啟用詳細記錄,即使未對用戶端啟用也可行:

service_client.get_service_stats(logging_enable=True)

下一步

更多的程式碼範例

開始使用 我們的佇列範例

SDK 的 GitHub 存放庫中有數個儲存體佇列 Python SDK 範例可供您使用。 這些範例提供使用儲存體佇列時常見之其他案例的範例程式碼:

其他文件

如需有關 Azure 佇列儲存體的詳細資訊,請參閱 azure 佇列儲存體檔 docs.microsoft.com。

參與

此專案歡迎參與和提供建議。 大部分的參與都要求您同意「參與者授權合約 (CLA)」,宣告您有權且確實授與我們使用投稿的權利。 如需詳細資料,請前往 https://cla.microsoft.com

當您提交提取要求時,CLA Bot 會自動判斷您是否需要提供 CLA,並適當地裝飾 PR (例如標籤、註解)。 請遵循 bot 提供的指示。 您只需要使用我們的 CLA 在所有存放庫上執行此動作一次。

此專案採用 Microsoft Open Source Code of Conduct (Microsoft 開放原始碼管理辦法)。 如需詳細資訊,請參閱管理辦法常見問題集,如有任何其他問題或意見請連絡 opencode@microsoft.com