你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于 Python 的 Azure 存储队列客户端库 - 版本 12.9.0
Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。 单个队列消息的大小最多可为 64 KiB,一个队列可以包含数百万条消息,最高可达到存储帐户的总容量限制。
队列存储的常见用途包括:
- 创建积压工作以进行异步处理
- 在分布式应用程序的不同部分之间传递消息
源代码 | 包 (PyPI) | 包 (Conda) | API 参考文档 | 产品文档 | 样品
入门
先决条件
- 使用此包需要 Python 3.7 或更高版本。 有关更多详细信息,请阅读有关 Azure SDK for Python 版本支持策略的页面。
- 必须具有 Azure 订阅 和 Azure 存储帐户 才能使用此包。
安装包
使用 pip 安装适用于 Python 的 Azure 存储队列客户端库:
pip install azure-storage-queue
创建存储帐户
如果要创建新的存储帐户,可以使用 Azure 门户、Azure PowerShell或 Azure CLI:
# Create a new resource group to hold the storage account -
# if using an existing resource group, skip this step
az group create --name my-resource-group --location westus2
# Create the storage account
az storage account create -n my-storage-account-name -g my-resource-group
创建客户端
使用适用于 Python 的 Azure 存储队列客户端库,可以与三种类型的资源进行交互:存储帐户本身、队列和消息。 与这些资源的交互从 客户端的实例开始。 若要创建客户端对象,需要存储帐户的队列服务终结点 URL 和可用于访问存储帐户的凭据:
from azure.storage.queue import QueueServiceClient
service = QueueServiceClient(account_url="https://<my-storage-account-name>.queue.core.windows.net/", credential=credential)
查找帐户 URL
可以使用 Azure 门户、Azure PowerShell或 Azure CLI 查找存储帐户的队列服务 URL:
# Get the queue service URL for the storage account
az storage account show -n my-storage-account-name -g my-resource-group --query "primaryEndpoints.queue"
凭据类型
参数 credential
可能以多种不同的形式提供,具体取决于要使用的 授权 类型:
若要使用 共享访问签名 (SAS) 令牌,请以字符串的形式提供令牌。 如果帐户 URL 包含 SAS 令牌,请省略凭据参数。 可以从 Azure 门户的“共享访问签名”下生成 SAS 令牌,或使用以下
generate_sas()
函数之一为存储帐户或队列创建 sas 令牌:from datetime import datetime, timedelta from azure.storage.queue import QueueServiceClient, generate_account_sas, ResourceTypes, AccountSasPermissions sas_token = generate_account_sas( account_name="<storage-account-name>", account_key="<account-access-key>", resource_types=ResourceTypes(service=True), permission=AccountSasPermissions(read=True), start=datetime.utcnow(), expiry=datetime.utcnow() + timedelta(hours=1) ) queue_service_client = QueueServiceClient(account_url="https://<my_account_name>.queue.core.windows.net", credential=sas_token)
若要使用存储帐户 共享密钥 (又名帐户密钥或访问密钥) ,请以字符串的形式提供密钥。 这可以在 Azure 门户的“访问密钥”部分下找到,也可以通过运行以下 Azure CLI 命令找到:
az storage account keys list -g MyResourceGroup -n MyStorageAccount
使用密钥作为凭据参数对客户端进行身份验证:
from azure.storage.queue import QueueServiceClient service = QueueServiceClient(account_url="https://<my_account_name>.queue.core.windows.net", credential="<account_access_key>")
若要使用 Azure Active Directory (AAD) 令牌凭据,请提供从 azure 标识 库获取的所需凭据类型的实例。 例如, DefaultAzureCredential 可用于对客户端进行身份验证。
这需要一些初始设置:
- 安装 azure-identity
- 注册新的 AAD 应用程序 并授予访问 Azure 存储的权限
- 在 Azure 门户中使用 RBAC 授予对 Azure 队列数据的访问权限
- 将 AAD 应用程序的客户端 ID、租户 ID 和客户端密码的值设置为环境变量:AZURE_TENANT_ID、AZURE_CLIENT_ID、AZURE_CLIENT_SECRET
使用返回的令牌凭据对客户端进行身份验证:
from azure.identity import DefaultAzureCredential from azure.storage.queue import QueueServiceClient token_credential = DefaultAzureCredential() queue_service_client = QueueServiceClient( account_url="https://<my_account_name>.queue.core.windows.net", credential=token_credential )
从连接字符串创建客户端
根据用例和授权方法,你可能更喜欢使用存储连接字符串初始化客户端实例,而不是单独提供帐户 URL 和凭据。 为此,请将存储连接字符串传递给客户端的 from_connection_string
类方法:
from azure.storage.queue import QueueServiceClient
connection_string = "DefaultEndpointsProtocol=https;AccountName=xxxx;AccountKey=xxxx;EndpointSuffix=core.windows.net"
service = QueueServiceClient.from_connection_string(conn_str=connection_string)
存储帐户的连接字符串可以在 Azure 门户的“访问密钥”部分下找到,也可以通过运行以下 CLI 命令找到:
az storage account show-connection-string -g MyResourceGroup -n MyStorageAccount
关键概念
以下组件构成 Azure 队列服务:
- 存储帐户本身
- 存储帐户中的队列,其中包含一组消息
- 队列中任何格式的消息,最大为 64 KiB
使用适用于 Python 的 Azure 存储队列客户端库,可以使用专用客户端对象来与其中每个组件进行交互。
异步客户端
此库包含 Python 3.5+ 上支持的完整异步 API。 若要使用它,必须先安装异步传输,例如 aiohttp。 有关详细信息,请参阅 azure-core 文档 。
不再需要异步客户端和凭据时,应将其关闭。 这些对象是异步上下文管理器并定义异步 close
方法。
客户端
提供了两个不同的客户端来与队列服务的各个组件进行交互:
- QueueServiceClient - 此客户端表示与 Azure 存储帐户本身的交互,并允许获取预配置的客户端实例以访问其中的队列。 它提供检索和配置帐户属性以及列出、创建和删除帐户中的队列的操作。 若要对特定队列执行操作,请使用
get_queue_client
方法检索客户端。 - QueueClient - 此客户端表示与特定队列 (交互,该队列) 尚不存在。 它提供创建、删除或配置队列的操作,并包括在其中发送、接收、速览、删除和更新消息的操作。
消息
- 发送 - 将消息添加到队列,并选择性地为消息设置可见性超时。
- 接收 - 从队列中检索消息,并使它对其他使用者不可见。
- 速览 - 从队列前面检索消息,而不更改消息可见性。
- 更新 - 汇报消息和/或消息内容的可见性超时。
- Delete - 从队列中删除指定的消息。
- 清除 - 清除队列中的所有消息。
示例
以下部分提供了几个代码片段,涵盖了一些最常见的存储队列任务,包括:
创建队列
在存储帐户中创建队列
from azure.storage.queue import QueueClient
queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
queue.create_queue()
使用异步客户端创建队列
from azure.storage.queue.aio import QueueClient
queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
await queue.create_queue()
发送消息
将消息发送到队列
from azure.storage.queue import QueueClient
queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
queue.send_message("I'm using queues!")
queue.send_message("This is my second message")
异步发送消息
import asyncio
from azure.storage.queue.aio import QueueClient
queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
await asyncio.gather(
queue.send_message("I'm using queues!"),
queue.send_message("This is my second message")
)
接收消息
从队列接收和处理消息
from azure.storage.queue import QueueClient
queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
response = queue.receive_messages()
for message in response:
print(message.content)
queue.delete_message(message)
# Printed messages from the front of the queue:
# >> I'm using queues!
# >> This is my second message
批量接收和处理消息
from azure.storage.queue import QueueClient
queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
response = queue.receive_messages(messages_per_page=10)
for message_batch in response.by_page():
for message in message_batch:
print(message.content)
queue.delete_message(message)
异步接收和处理消息
from azure.storage.queue.aio import QueueClient
queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
response = queue.receive_messages()
async for message in response:
print(message.content)
await queue.delete_message(message)
可选配置
可选关键字 (keyword) 可在客户端和每个操作级别传入的参数。
重试策略配置
在实例化客户端时使用以下关键字 (keyword) 参数来配置重试策略:
- retry_total (int) :允许的重试总数。 优先于其他计数。
retry_total=0
如果不想对请求重试,请传入 。 默认值为 10。 - retry_connect (int) :重试多少个与连接相关的错误。 默认值为 3。
- retry_read (int) :读取错误时重试的次数。 默认值为 3。
- retry_status (int) :对错误状态代码重试的次数。 默认值为 3。
- retry_to_secondary (bool) :是否应将请求重试到辅助数据库(如果可以)。
应仅启用 RA-GRS 帐户,并可以处理可能过时的数据。
默认为
False
。
其他客户端/每操作配置
其他可选配置关键字 (keyword) 可在客户端或每个操作上指定的参数。
客户端关键字 (keyword) 参数:
- connection_timeout (int) :客户端将等待与服务器建立连接的秒数。 默认为 20 秒。
- read_timeout (int) :客户端在连续读取操作之间等待服务器响应的秒数。 这是套接字级别超时,不受整体数据大小的影响。 客户端读取超时将自动重试。 默认值为 60 秒。
- 传输 (任何) :用户提供的用于发送 HTTP 请求的传输。
每个操作关键字 (keyword) 参数:
- raw_response_hook (可调用) :给定回调使用从服务返回的响应。
- raw_request_hook (可调用) :给定回调在发送到服务之前使用请求。
- client_request_id (str) :请求的可选用户指定标识。
- user_agent (str) :将自定义值追加到要随请求发送的用户代理标头。
- logging_enable (bool) :在 DEBUG 级别启用日志记录。 默认为 False。 还可以在客户端级别传入以为所有请求启用它。
- logging_body (bool) :启用记录请求和响应正文。 默认为 False。 还可以在客户端级别传入以为所有请求启用它。
- 标头 (dict) :将自定义标头作为键、值对传入。 例如
headers={'CustomValue': value}
疑难解答
常规
存储队列客户端引发 Azure Core 中定义的异常。
此列表可用于引用以捕获引发的异常。 若要获取异常的特定错误代码,请使用 error_code
属性,即 exception.error_code
。
日志记录
此库使用标准 日志记录 库进行日志记录。 有关 HTTP 会话 (URL、标头等 ) 的基本信息记录在信息级别。
可以使用 参数在客户端 logging_enable
上启用详细的 DEBUG 级别日志记录,包括请求/响应正文和未执行的标头:
import sys
import logging
from azure.storage.queue import QueueServiceClient
# Create a logger for the 'azure.storage.queue' SDK
logger = logging.getLogger('azure.storage.queue')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
# This client will log detailed information about its HTTP sessions, at DEBUG level
service_client = QueueServiceClient.from_connection_string("your_connection_string", logging_enable=True)
同样,即使没有为客户端启用详细日志记录,logging_enable
也可以为单个操作启用:
service_client.get_service_stats(logging_enable=True)
后续步骤
更多示例代码
开始使用我们的 队列示例。
SDK 的 GitHub 存储库中提供了多个存储队列 Python SDK 示例。 这些示例为使用存储队列时经常遇到的其他方案提供了示例代码:
queue_samples_hello_world.py (异步版本) - 本文中的示例:
- 客户端创建
- 创建队列
- 发送消息
- 接收消息
queue_samples_authentication.py (异步版本) - 身份验证和创建客户端的示例:
- 从连接字符串
- 从共享访问密钥
- 从共享访问签名令牌
- 从 Azure Active Directory
queue_samples_service.py (异步版本) - 与队列服务交互的示例:
- 获取和设置服务属性
- 列出存储帐户中的队列
- 从服务创建和删除队列
- 获取 QueueClient
queue_samples_message.py (异步版本) - 处理队列和消息的示例:
- 设置访问策略
- 获取和设置队列元数据
- 发送和接收消息
- 删除指定邮件并清除所有邮件
- 速览和更新消息
其他文档
有关 Azure 队列存储的更多文档,请参阅有关 docs.microsoft.com 的 Azure 队列存储文档 。
贡献
本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 https://cla.microsoft.com 。
提交拉取请求时,CLA 机器人将自动确定你是否需要提供 CLA,并相应地修饰 PR(例如标签、注释)。 直接按机器人提供的说明操作。 只需使用 CLA 对所有存储库执行一次这样的操作。
此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。