你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 Python 的 Azure 存储队列客户端库 - 版本 12.9.0

Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。 单个队列消息的大小最多可为 64 KiB,一个队列可以包含数百万条消息,最高可达到存储帐户的总容量限制。

队列存储的常见用途包括:

  • 创建积压工作以进行异步处理
  • 在分布式应用程序的不同部分之间传递消息

源代码 | 包 (PyPI) | 包 (Conda) | API 参考文档 | 产品文档 | 样品

入门

先决条件

安装包

使用 pip 安装适用于 Python 的 Azure 存储队列客户端库:

pip install azure-storage-queue

创建存储帐户

如果要创建新的存储帐户,可以使用 Azure 门户Azure PowerShellAzure CLI

# Create a new resource group to hold the storage account -
# if using an existing resource group, skip this step
az group create --name my-resource-group --location westus2

# Create the storage account
az storage account create -n my-storage-account-name -g my-resource-group

创建客户端

使用适用于 Python 的 Azure 存储队列客户端库,可以与三种类型的资源进行交互:存储帐户本身、队列和消息。 与这些资源的交互从 客户端的实例开始。 若要创建客户端对象,需要存储帐户的队列服务终结点 URL 和可用于访问存储帐户的凭据:

from azure.storage.queue import QueueServiceClient

service = QueueServiceClient(account_url="https://<my-storage-account-name>.queue.core.windows.net/", credential=credential)

查找帐户 URL

可以使用 Azure 门户Azure PowerShellAzure CLI 查找存储帐户的队列服务 URL:

# Get the queue service URL for the storage account
az storage account show -n my-storage-account-name -g my-resource-group --query "primaryEndpoints.queue"

凭据类型

参数 credential 可能以多种不同的形式提供,具体取决于要使用的 授权 类型:

  1. 若要使用 共享访问签名 (SAS) 令牌,请以字符串的形式提供令牌。 如果帐户 URL 包含 SAS 令牌,请省略凭据参数。 可以从 Azure 门户的“共享访问签名”下生成 SAS 令牌,或使用以下 generate_sas() 函数之一为存储帐户或队列创建 sas 令牌:

    from datetime import datetime, timedelta
    from azure.storage.queue import QueueServiceClient, generate_account_sas, ResourceTypes, AccountSasPermissions
    
    sas_token = generate_account_sas(
        account_name="<storage-account-name>",
        account_key="<account-access-key>",
        resource_types=ResourceTypes(service=True),
        permission=AccountSasPermissions(read=True),
        start=datetime.utcnow(),
        expiry=datetime.utcnow() + timedelta(hours=1)
    )
    
    queue_service_client = QueueServiceClient(account_url="https://<my_account_name>.queue.core.windows.net", credential=sas_token)
    
  2. 若要使用存储帐户 共享密钥 (又名帐户密钥或访问密钥) ,请以字符串的形式提供密钥。 这可以在 Azure 门户的“访问密钥”部分下找到,也可以通过运行以下 Azure CLI 命令找到:

    az storage account keys list -g MyResourceGroup -n MyStorageAccount

    使用密钥作为凭据参数对客户端进行身份验证:

    from azure.storage.queue import QueueServiceClient
    service = QueueServiceClient(account_url="https://<my_account_name>.queue.core.windows.net", credential="<account_access_key>")
    
  3. 若要使用 Azure Active Directory (AAD) 令牌凭据,请提供从 azure 标识 库获取的所需凭据类型的实例。 例如, DefaultAzureCredential 可用于对客户端进行身份验证。

    这需要一些初始设置:

    • 安装 azure-identity
    • 注册新的 AAD 应用程序 并授予访问 Azure 存储的权限
    • Azure 门户中使用 RBAC 授予对 Azure 队列数据的访问权限
    • 将 AAD 应用程序的客户端 ID、租户 ID 和客户端密码的值设置为环境变量:AZURE_TENANT_ID、AZURE_CLIENT_ID、AZURE_CLIENT_SECRET

    使用返回的令牌凭据对客户端进行身份验证:

        from azure.identity import DefaultAzureCredential
        from azure.storage.queue import QueueServiceClient
        token_credential = DefaultAzureCredential()
    
        queue_service_client = QueueServiceClient(
            account_url="https://<my_account_name>.queue.core.windows.net",
            credential=token_credential
        )
    

从连接字符串创建客户端

根据用例和授权方法,你可能更喜欢使用存储连接字符串初始化客户端实例,而不是单独提供帐户 URL 和凭据。 为此,请将存储连接字符串传递给客户端的 from_connection_string 类方法:

from azure.storage.queue import QueueServiceClient

connection_string = "DefaultEndpointsProtocol=https;AccountName=xxxx;AccountKey=xxxx;EndpointSuffix=core.windows.net"
service = QueueServiceClient.from_connection_string(conn_str=connection_string)

存储帐户的连接字符串可以在 Azure 门户的“访问密钥”部分下找到,也可以通过运行以下 CLI 命令找到:

az storage account show-connection-string -g MyResourceGroup -n MyStorageAccount

关键概念

以下组件构成 Azure 队列服务:

  • 存储帐户本身
  • 存储帐户中的队列,其中包含一组消息
  • 队列中任何格式的消息,最大为 64 KiB

使用适用于 Python 的 Azure 存储队列客户端库,可以使用专用客户端对象来与其中每个组件进行交互。

异步客户端

此库包含 Python 3.5+ 上支持的完整异步 API。 若要使用它,必须先安装异步传输,例如 aiohttp。 有关详细信息,请参阅 azure-core 文档

不再需要异步客户端和凭据时,应将其关闭。 这些对象是异步上下文管理器并定义异步 close 方法。

客户端

提供了两个不同的客户端来与队列服务的各个组件进行交互:

  1. QueueServiceClient - 此客户端表示与 Azure 存储帐户本身的交互,并允许获取预配置的客户端实例以访问其中的队列。 它提供检索和配置帐户属性以及列出、创建和删除帐户中的队列的操作。 若要对特定队列执行操作,请使用 get_queue_client 方法检索客户端。
  2. QueueClient - 此客户端表示与特定队列 (交互,该队列) 尚不存在。 它提供创建、删除或配置队列的操作,并包括在其中发送、接收、速览、删除和更新消息的操作。

消息

  • 发送 - 将消息添加到队列,并选择性地为消息设置可见性超时。
  • 接收 - 从队列中检索消息,并使它对其他使用者不可见。
  • 速览 - 从队列前面检索消息,而不更改消息可见性。
  • 更新 - 汇报消息和/或消息内容的可见性超时。
  • Delete - 从队列中删除指定的消息。
  • 清除 - 清除队列中的所有消息。

示例

以下部分提供了几个代码片段,涵盖了一些最常见的存储队列任务,包括:

创建队列

在存储帐户中创建队列

from azure.storage.queue import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
queue.create_queue()

使用异步客户端创建队列

from azure.storage.queue.aio import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
await queue.create_queue()

发送消息

将消息发送到队列

from azure.storage.queue import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
queue.send_message("I'm using queues!")
queue.send_message("This is my second message")

异步发送消息

import asyncio
from azure.storage.queue.aio import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
await asyncio.gather(
    queue.send_message("I'm using queues!"),
    queue.send_message("This is my second message")
)

接收消息

从队列接收和处理消息

from azure.storage.queue import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
response = queue.receive_messages()

for message in response:
    print(message.content)
    queue.delete_message(message)

# Printed messages from the front of the queue:
# >> I'm using queues!
# >> This is my second message

批量接收和处理消息

from azure.storage.queue import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
response = queue.receive_messages(messages_per_page=10)

for message_batch in response.by_page():
    for message in message_batch:
        print(message.content)
        queue.delete_message(message)

异步接收和处理消息

from azure.storage.queue.aio import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
response = queue.receive_messages()

async for message in response:
    print(message.content)
    await queue.delete_message(message)

可选配置

可选关键字 (keyword) 可在客户端和每个操作级别传入的参数。

重试策略配置

在实例化客户端时使用以下关键字 (keyword) 参数来配置重试策略:

  • retry_total (int) :允许的重试总数。 优先于其他计数。 retry_total=0如果不想对请求重试,请传入 。 默认值为 10。
  • retry_connect (int) :重试多少个与连接相关的错误。 默认值为 3。
  • retry_read (int) :读取错误时重试的次数。 默认值为 3。
  • retry_status (int) :对错误状态代码重试的次数。 默认值为 3。
  • retry_to_secondary (bool) :是否应将请求重试到辅助数据库(如果可以)。 应仅启用 RA-GRS 帐户,并可以处理可能过时的数据。 默认为 False

其他客户端/每操作配置

其他可选配置关键字 (keyword) 可在客户端或每个操作上指定的参数。

客户端关键字 (keyword) 参数:

  • connection_timeout (int) :客户端将等待与服务器建立连接的秒数。 默认为 20 秒。
  • read_timeout (int) :客户端在连续读取操作之间等待服务器响应的秒数。 这是套接字级别超时,不受整体数据大小的影响。 客户端读取超时将自动重试。 默认值为 60 秒。
  • 传输 (任何) :用户提供的用于发送 HTTP 请求的传输。

每个操作关键字 (keyword) 参数:

  • raw_response_hook (可调用) :给定回调使用从服务返回的响应。
  • raw_request_hook (可调用) :给定回调在发送到服务之前使用请求。
  • client_request_id (str) :请求的可选用户指定标识。
  • user_agent (str) :将自定义值追加到要随请求发送的用户代理标头。
  • logging_enable (bool) :在 DEBUG 级别启用日志记录。 默认为 False。 还可以在客户端级别传入以为所有请求启用它。
  • logging_body (bool) :启用记录请求和响应正文。 默认为 False。 还可以在客户端级别传入以为所有请求启用它。
  • 标头 (dict) :将自定义标头作为键、值对传入。 例如 headers={'CustomValue': value}

疑难解答

常规

存储队列客户端引发 Azure Core 中定义的异常。

此列表可用于引用以捕获引发的异常。 若要获取异常的特定错误代码,请使用 error_code 属性,即 exception.error_code

日志记录

此库使用标准 日志记录 库进行日志记录。 有关 HTTP 会话 (URL、标头等 ) 的基本信息记录在信息级别。

可以使用 参数在客户端 logging_enable 上启用详细的 DEBUG 级别日志记录,包括请求/响应正文和未执行的标头:

import sys
import logging
from azure.storage.queue import QueueServiceClient

# Create a logger for the 'azure.storage.queue' SDK
logger = logging.getLogger('azure.storage.queue')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

# This client will log detailed information about its HTTP sessions, at DEBUG level
service_client = QueueServiceClient.from_connection_string("your_connection_string", logging_enable=True)

同样,即使没有为客户端启用详细日志记录,logging_enable 也可以为单个操作启用:

service_client.get_service_stats(logging_enable=True)

后续步骤

更多示例代码

开始使用我们的 队列示例

SDK 的 GitHub 存储库中提供了多个存储队列 Python SDK 示例。 这些示例为使用存储队列时经常遇到的其他方案提供了示例代码:

其他文档

有关 Azure 队列存储的更多文档,请参阅有关 docs.microsoft.com 的 Azure 队列存储文档

贡献

本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 https://cla.microsoft.com

提交拉取请求时,CLA 机器人将自动确定你是否需要提供 CLA,并相应地修饰 PR(例如标签、注释)。 直接按机器人提供的说明操作。 只需使用 CLA 对所有存储库执行一次这样的操作。

此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。