你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 Python 的Azure 服务总线客户端库 - 版本 7.11.4

Azure 服务总线 是一种高性能的云托管消息传递服务,用于在分布式发送方和接收方之间提供实时和容错通信。

服务总线为异步高度可靠的通信提供了多种机制,例如结构化的先入先出消息传送、发布/订阅功能,以及随需求增长轻松缩放的能力。

使用适用于 Python 的服务总线客户端库在应用程序和服务之间进行通信,并实现异步消息传递模式。

  • 创建服务总线命名空间、队列、主题和订阅,并修改其设置。
  • 在服务总线通道内发送和接收消息。
  • 利用消息锁、会话和死信功能来实现复杂的消息传送模式。

源代码 | 包 (PyPi) | 包 (Conda) | API 参考文档 | 产品文档 | 样品 | 更改日志

注意:如果使用的是版本 0.50 或更低版本,并且想要迁移到此包的最新版本,请查看我们的 迁移指南,从服务总线 V0.50 迁移到服务总线 V7

入门

安装包

使用 pip 安装适用于 Python 的 Azure 服务总线 客户端库:

pip install azure-servicebus

先决条件:

若要使用此包,必须具有:

如果需要 Azure 服务总线命名空间,可以通过 Azure 门户创建它。 如果不想使用图形门户 UI,可以通过Cloud Shell使用 Azure CLI,或者在本地运行 Azure CLI,使用此 Azure CLI 命令创建一个:

az servicebus namespace create --resource-group <resource-group-name> --name <servicebus-namespace-name> --location <servicebus-namespace-location>

验证客户端

与服务总线的交互从 类的 ServiceBusClient 实例开始。 需要一个包含 SAS 密钥的连接字符串,或者需要一个命名空间及其其中一个帐户密钥来实例化客户端对象。 请找到下面链接的示例,以演示如何通过任一方法进行身份验证。

从 连接字符串 创建客户端

使用 azure-identity 库创建客户端

  • 此构造函数采用服务总线实例的完全限定命名空间和实现 TokenCredential 协议的凭据。 azure-identity 包中提供了协议的实现TokenCredential。 完全限定的命名空间的格式为 <yournamespace.servicebus.windows.net>
  • 若要使用 提供的 azure-identity凭据类型,请安装包: pip install azure-identity
  • 此外,若要使用异步 API,必须先安装异步传输,例如 aiohttppip install aiohttp
  • 使用 Azure Active Directory 时,必须为主体分配一个允许访问服务总线的角色,例如Azure 服务总线数据所有者角色。 有关将 Azure Active Directory 授权与服务总线配合使用的详细信息,请参阅 相关文档

注意: 客户端可以在没有上下文管理器的情况下进行初始化,但必须通过 client.close () 手动关闭,才能不泄漏资源。

关键概念

初始化 后, ServiceBusClient可以与服务总线命名空间中的主资源类型进行交互,其中可以存在多个资源类型,并在其中发生实际消息传输,命名空间通常充当应用程序容器:

  • 队列:允许发送和接收消息。 通常用于点到点通信。

  • 主题:与队列不同,主题更适合发布/订阅方案。 主题可以发送到,但需要订阅,其中可以有多个并行使用。

  • 订阅:从主题使用的机制。 每个订阅都是独立的,并接收发送到主题的每条消息的副本。 规则和筛选器可用于定制特定订阅接收的消息。

有关这些资源的详细信息,请参阅什么是Azure 服务总线?

若要与这些资源交互,应熟悉以下 SDK 概念:

  • ServiceBusClient:这是用户应首先初始化以连接到服务总线命名空间的对象。 若要与队列、主题或订阅交互,会从此客户端生成发送方或接收方。

  • ServiceBusSender:若要将消息发送到队列或主题,将使用get_queue_sender实例对应的 ServiceBusClientget_topic_sender 方法,如此所示。

  • ServiceBusReceiver:若要从队列或订阅接收消息,将使用实例对应的 get_queue_receiverServiceBusClientget_subscription_receiver 方法,如下所示。

  • ServiceBusMessage:发送时,这是要构造为包含有效负载的类型。 接收时,你将在这里访问有效负载。

线程安全

我们不保证 ServiceBusClient、ServiceBusSender 和 ServiceBusReceiver 是线程安全的。 不建议跨线程重用这些实例。 由正在运行的应用程序以线程安全的方式使用这些类。

示例

以下部分提供了几个代码片段,涵盖了一些最常见的服务总线任务,包括:

若要执行创建和删除队列/主题/订阅等管理任务,请使用 此处提供的 azure-mgmt-servicebus 库。

请在演示常见服务总线方案 例如发送、接收、会话管理和消息处理)的示例目录中查找更多示例。

向队列发送消息

注意: 请参阅 此处的参考文档。

此示例将单个消息和消息数组发送到假定已存在的队列,该队列通过 Azure 门户 或 az 命令创建。

from azure.servicebus import ServiceBusClient, ServiceBusMessage

import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_sender(queue_name) as sender:
        # Sending a single message
        single_message = ServiceBusMessage("Single message")
        sender.send_messages(single_message)

        # Sending a list of messages
        messages = [ServiceBusMessage("First message"), ServiceBusMessage("Second message")]
        sender.send_messages(messages)

注意:可以使用 方法或通过在调用之前指定ServiceBusMessage.scheduled_enqueue_time_utc来计划延迟传递ServiceBusSender.schedule_messages()消息ServiceBusSender.send_messages()

有关计划和计划取消的更多详细信息,请参阅 此处的示例。

从队列接收消息

若要从队列接收,可以通过 执行即席接收 receiver.receive_messages() ,也可以通过接收方本身永久接收。

通过循环访问 ServiceBusReceiver 从队列接收消息

from azure.servicebus import ServiceBusClient

import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
    # max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
    # Default is None; to receive forever.
    with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
        for msg in receiver:  # ServiceBusReceiver instance is a generator.
            print(str(msg))
            # If it is desired to halt receiving early, one can break out of the loop here safely.

注意: 使用 receive_mode=PEEK_LOCK (接收的任何消息都是默认值,替代RECEIVE_AND_DELETE在收到消息时立即从队列中删除消息,) 如果处理时间超过锁定持续时间,则必须通过该锁在过期之前续订 receiver.renew_message_lock 该锁。 有关在后台自动执行此操作的帮助程序,请参阅 自动 更新。 锁定持续时间在 Azure 中针对队列或主题本身设置。

通过 ServiceBusReceiver.receive_messages () 从队列接收消息

注意:ServiceBusReceiver.receive_messages() 通过即席方法调用接收单个或受约束的消息列表,而不是从生成器永久接收消息。 它始终返回列表。

from azure.servicebus import ServiceBusClient

import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        received_message_array = receiver.receive_messages(max_wait_time=10)  # try to receive a single message within 10 seconds
        if received_message_array:
            print(str(received_message_array[0]))

    with client.get_queue_receiver(queue_name) as receiver:
        received_message_array = receiver.receive_messages(max_message_count=5, max_wait_time=10)  # try to receive maximum 5 messages in a batch within 10 seconds
        for message in received_message_array:
            print(str(message))

在此示例中,max_message_count声明在命中max_wait_time之前尝试接收的最大消息数(以秒为单位)。

注意: 还应注意, ServiceBusReceiver.peek_messages() 它与接收略有不同,因为它不会锁定被偷看的消息,因此无法解决。

从已启用会话的队列发送和接收消息

注意:请参阅会话发送和接收的参考文档。

会话在队列或订阅的基础上提供先入先出和单接收方语义。 虽然实际接收语法相同,但初始化略有不同。

from azure.servicebus import ServiceBusClient, ServiceBusMessage

import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_SESSION_QUEUE_NAME']
session_id = os.environ['SERVICE_BUS_SESSION_ID']

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_sender(queue_name) as sender:
        sender.send_messages(ServiceBusMessage("Session Enabled Message", session_id=session_id))

    # If session_id is null here, will receive from the first available session.
    with client.get_queue_receiver(queue_name, session_id=session_id) as receiver:
        for msg in receiver:
            print(str(msg))

注意:从会话接收的消息不需要像非会话接收方一样续订锁;相反,锁管理发生在会话级别,其会话锁可能使用 续订 receiver.session.renew_lock()

使用 主题订阅

注意:有关主题和订阅请参阅参考文档。

主题和订阅提供了用于发送和接收消息的队列的替代方法。 请参阅 此处 的文档,了解更多总体详细信息,以及它们与队列有何不同。

from azure.servicebus import ServiceBusClient, ServiceBusMessage

import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_topic_sender(topic_name) as sender:
        sender.send_messages(ServiceBusMessage("Data"))

    # If session_id is null here, will receive from the first available session.
    with client.get_subscription_receiver(topic_name, subscription_name) as receiver:
        for msg in receiver:
            print(str(msg))

在收到邮件后结算邮件

从队列接收时,可以对收到的消息执行多个操作。

注意:只能确定 ServiceBusReceivedMessageServiceBusReceiveMode.PEEK_LOCK 模式下接收的对象, (这是默认) 。 ServiceBusReceiveMode.RECEIVE_AND_DELETE mode 在收到时从队列中删除消息。 ServiceBusReceivedMessage 无法解决从 peek_messages() 返回的消息,因为消息锁不像在上述接收方法中那样。

如果消息具有如上所述的锁,则如果消息锁已过期,则解决将失败。 如果处理所需的时间超过锁定持续时间,则必须在过期之前通过 receiver.renew_message_lock 进行维护。 锁定持续时间在 Azure 中针对队列或主题本身设置。 请参阅 AutoLockRenewer ,获取帮助程序在后台自动执行此操作。

完成

将消息处理声明为成功完成,从队列中删除消息。

from azure.servicebus import ServiceBusClient

import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        for msg in receiver:
            print(str(msg))
            receiver.complete_message(msg)

放弃

暂时放弃对消息的处理,立即将消息返回队列,由另一个 (或同一) 接收方选取。

from azure.servicebus import ServiceBusClient

import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        for msg in receiver:
            print(str(msg))
            receiver.abandon_message(msg)

DeadLetter

将消息从主队列传输到一个特殊的“死信子队列”中,可以使用带 参数sub_queue=ServiceBusSubQueue.DEAD_LETTERServiceBusClient.get_<queue|subscription>_receiver函数访问该消息,并像从任何其他接收方一样使用。 (请参阅此处 的示例)

from azure.servicebus import ServiceBusClient

import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        for msg in receiver:
            print(str(msg))
            receiver.dead_letter_message(msg)

延迟

延迟与之前的结算方法略有不同。 它通过将消息放在一边,防止消息从队列中直接接收,以便必须在调用 ServiceBusReceiver.receive_deferred_messages (按序列号接收该消息,请参阅 此处 的示例)

from azure.servicebus import ServiceBusClient

import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        for msg in receiver:
            print(str(msg))
            receiver.defer_message(msg)

自动续订消息或会话锁

注意: 请参阅 自动锁定续订的参考文档。

AutoLockRenewer 是一种简单的方法,用于确保消息或会话在长时间内保持锁定状态(如果调用 receiver.renew_message_lock/receiver.session.renew_lock 不切实际或不需要)。 在内部,如果对象即将过期,创建并发监视器以执行锁续订,这并不多大。 它应按如下所示使用:

  • 消息锁定自动续订
from azure.servicebus import ServiceBusClient, AutoLockRenewer

import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

# Can also be called via "with AutoLockRenewer() as renewer" to automate closing.
renewer = AutoLockRenewer()
with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        for msg in receiver.receive_messages():
            renewer.register(receiver, msg, max_lock_renewal_duration=60)
            # Do your application logic here
            receiver.complete_message(msg)
renewer.close()
  • 会话锁自动续订
from azure.servicebus import ServiceBusClient, AutoLockRenewer

import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
session_queue_name = os.environ['SERVICE_BUS_SESSION_QUEUE_NAME']
session_id = os.environ['SERVICE_BUS_SESSION_ID']

# Can also be called via "with AutoLockRenewer() as renewer" to automate closing.
renewer = AutoLockRenewer()
with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(session_queue_name, session_id=session_id) as receiver:
        renewer.register(receiver, receiver.session, max_lock_renewal_duration=300) # Duration for how long to maintain the lock for, in seconds.

        for msg in receiver.receive_messages():
            # Do your application logic here
            receiver.complete_message(msg)
renewer.close()

如果由于任何原因自动续订中断或失败,可以通过正在续订的对象上的 属性或通过在续订程序初始化时传递对 on_lock_renew_failure 参数的回调来观察auto_renew_error这一点。 尝试执行操作 ((例如在指定对象上完成消息) )时,也会显示该消息。

疑难解答

日志记录

  • 启用 azure.servicebus 记录器以从库收集跟踪。
  • 启用 uamqp 记录器以从基础 uAMQP 库收集跟踪。
  • 在创建客户端时通过设置 logging_enable=True 来启用 AMQP 帧级跟踪。
  • 在某些情况下,你认为 uamqp 日志记录过于详细。 若要禁止不必要的日志记录,请将以下代码片段添加到代码顶部:
import logging

# The logging levels below may need to be changed based on the logging that you want to suppress.
uamqp_logger = logging.getLogger('uamqp')
uamqp_logger.setLevel(logging.ERROR)

# or even further fine-grained control, suppressing the warnings in uamqp.connection module
uamqp_connection_logger = logging.getLogger('uamqp.connection')
uamqp_connection_logger.setLevel(logging.ERROR)

超时

库中用户应注意各种超时。

  • 10 分钟服务端链接关闭:链接一旦打开,将在空闲 10 分钟后关闭,以保护服务免受资源泄漏。 这在很大程度上应该对用户是透明的,但如果你注意到在这样一段时间后发生了重新连接,这就是原因。 对链接执行任何操作(包括管理操作)将延长此超时时间。
  • max_wait_time:在创建接收方或调用 receive_messages()时提供,该时间之后接收消息将在无流量后停止。 这既适用于命令性 receive_messages() 函数,也适用于没有消息时生成器样式的接收在退出前运行的长度。 传递 None (默认) 将永久等待,如果未执行其他操作,则直到 10 分钟的阈值。

注意:如果消息或会话的处理时间足够长,导致超时,则作为手动调用receiver.renew_message_lock/receiver.session.renew_lock的替代方法,可以利用上面详述的AutoLockRenewer的功能。

常见异常

服务总线 API 在 azure.servicebus.exceptions 中生成以下异常:

  • ServiceBusConnectionError: 连接到服务时出错。 这可能是由暂时性网络问题或服务问题引起的。 建议重试。
  • ServiceBusAuthorizationError: 授权与服务的连接时出错。 这可能是由于凭据没有执行操作的适当权限导致的。 建议检查凭据的权限。
  • ServiceBusAuthenticationError: 对服务的连接进行身份验证时出错。 这可能是由于凭据不正确导致的。 建议检查凭据。
  • OperationTimeoutError: 这表示服务未在预期时间内响应操作。 这可能是由暂时性网络问题或服务问题引起的。 服务可能已成功完成请求,也可能未成功完成请求;状态未知。 建议尝试验证当前状态,并在必要时重试。
  • MessageSizeExceededError: 这表示消息内容大于服务总线帧大小。 如果批处理中发送的服务总线消息过多,或者传递到 主体 Message 中的内容太大,则可能会发生这种情况。 建议减少在批处理中发送的消息数或减少传入单个 ServiceBusMessage的内容的大小。
  • MessageAlreadySettled: 这表示未能解决消息。 尝试解决已解决的消息时,可能会发生这种情况。
  • MessageLockLostError: 消息的锁已过期,并且已释放回队列。 需要再次收到它才能解决它。 应注意消息的锁定持续时间,并在处理时间过长的情况下,在过期之前继续续订锁。 AutoLockRenewer 有助于自动续订消息的锁。
  • SessionLockLostError: 会话上的锁已过期。 无法再解决已收到的所有未解决消息。 如有必要,如果再次收到消息,建议重新连接到会话。 应注意会话的锁定持续时间,并在处理时间过长的情况下,在过期之前继续续订锁。 AutoLockRenewer 有助于自动续订会话的锁。
  • MessageNotFoundError: 尝试接收具有特定序列号的消息。 找不到此消息。 确保该消息尚未接收。 检查死信队列,以确定该消息是否被视为死信。
  • MessagingEntityNotFoundError: 与操作关联的实体不存在或已被删除。 请确保实体存在。
  • MessagingEntityDisabledError: 请求对已禁用实体执行运行时操作。 请激活实体。
  • ServiceBusQuotaExceededError: 消息传送实体已达到其允许的最大大小,或者已超出命名空间的最大连接数。 通过从实体或其子队列接收消息在该实体中创建空间。
  • ServiceBusServerBusyError: 服务目前无法处理请求。 客户端可以等待一段时间,并重试操作。
  • ServiceBusCommunicationError: 客户端无法与服务总线建立连接。 确保提供的主机名正确并且主机可访问。 如果你的代码在使用防火墙/代理的环境中运行,请确保到服务总线域/IP 地址和端口的流量未被阻止。
  • SessionCannotBeLockedError: 尝试连接到具有特定会话 ID 的会话,但会话当前已被另一个客户端锁定。 确保该会话未由其他客户端锁定。
  • AutoLockRenewFailed: 尝试在后台续订消息或会话的锁定失败。 当 使用的 AutoLockRenewer 接收方关闭或续订的锁已过期时,可能会发生这种情况。 建议通过接收该消息或再次连接到会话实体来重新注册可续订的消息或会话。
  • AutoLockRenewTimeout: 分配的续订消息或会话锁的时间已过。 可以重新注册想要自动锁定续订的对象,或提前延长超时时间。
  • ServiceBusError: 所有其他与服务总线相关的错误。 它是上述所有错误的根错误类。

有关常见异常类型的详细说明,请查看 异常参考文档

后续步骤

更多示例代码

请在演示常见服务总线方案 例如发送、接收、会话管理和消息处理)的示例目录中查找更多示例。

其他文档

有关服务总线服务的更多文档,请参阅有关 docs.microsoft.com 的服务总线文档

管理功能和文档

对于寻求对 ServiceBus 执行管理操作 (创建队列/主题/等、更改筛选规则、枚举实体) 的用户,请参阅 azure-mgmt-servicebus 文档 以获取 API 文档。 也可以 在此处 找到 Terse 用法示例。

纯 Python AMQP 传输和向后兼容性支持

Azure 服务总线客户端库现在基于纯 Python AMQP 实现。 uAMQP 已删除为必需的依赖项。

使用 uAMQP 作为基础传输:

  1. 使用 pip 进行安装 uamqp
$ pip install uamqp
  1. 在客户端构造过程中传递 uamqp_transport=True
from azure.servicebus import ServiceBusClient
connection_str = '<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>'
queue_name = '<< NAME OF THE QUEUE >>'
client = ServiceBusClient.from_connection_string(
    connection_str, uamqp_transport=True
)

注意:message之前公开 的 uamqp.Message上的ServiceBusReceivedMessageServiceBusMessage/ServiceBusMessageBatch/ 属性已弃用。 引入了属性返回的 message “旧版”对象,以帮助促进转换。

从源生成 uAMQP 滚轮

azure-servicebus 依赖于 AMQP 协议实现的 uAMQP。 uAMQP 滚轮适用于大多数主要操作系统,并在安装 时自动安装 azure-servicebus。 如果 uAMQP 打算用作的基础 AMQP 协议实现, azure-servicebus则可以为大多数主要操作系统找到 uAMQP 轮。

如果在未提供 uAMQP 滚轮的平台上运行,请按照如果你打算使用 uAMQP ,并且你运行的平台上没有提供 uAMQP 滚轮,请按照 uAMQP 安装 指南从源安装。

贡献

本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 https://cla.microsoft.com

提交拉取请求时,CLA 机器人将自动确定你是否需要提供 CLA,并相应地修饰 PR(例如标签、注释)。 直接按机器人提供的说明操作。 只需使用 CLA 对所有存储库执行一次这样的操作。

此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。