你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

用于 Python 的服务总线库Service Bus libraries for Python

Microsoft Azure 服务总线支持一组基于云的、面向消息的中间件技术,包括可靠的消息队列和持久的发布/订阅消息。Microsoft Azure Service Bus supports a set of cloud-based, message-oriented middleware technologies including reliable message queuing and durable publish/subscribe messaging.

v0.50.0 中有哪些新增功能?What's new in v0.50.0?

从版本 0.50.0 开始,可以通过新的基于 AMQP 的 API 来发送和接收消息。As of version 0.50.0 a new AMQP-based API is available for sending and receiving messages. 此更新涉及中断性变更This update involves breaking changes.

请阅读从 v0.21.1 迁移到 v0.50.0,确定目前是否适合升级。Please read Migration from v0.21.1 to v0.50.0 to determine if upgrading is right for you at this time.

新的基于 AMQP 的 API 提供改进的消息传递可靠性、性能以及将来的扩展功能支持。The new AMQP-based API offers improved message passing reliability, performance and expanded feature support going forward. 新 API 还提供基于 asyncio 的异步操作支持,可以通过异步操作发送、接收和处理消息。The new API also offers support for asynchronous operations (based on asyncio) for sending, receiving and handling messages.

有关基于 HTTP 的旧操作的文档,请参阅使用旧 API 的基于 HTTP 的操作For documentation on the legacy HTTP-based operations please see Using HTTP-based operations of the legacy API.

先决条件Prerequisites

安装Installation

pip install azure-servicebus

连接到 Azure 服务总线Connect to Azure Service Bus

获取凭据Get credentials

使用下面的 Azure CLI 代码片段为环境变量填充服务总线连接字符串(也可在 Azure 门户中找到此值)。Use the Azure CLI snippet below to populate an environment variable with the Service Bus connection string (you can also find this value in the Azure portal). 此代码片段已针对 Bash shell 格式化。The snippet is formatted for the Bash shell.

RES_GROUP=<resource-group-name>
NAMESPACE=<servicebus-namespace>

export SB_CONN_STR=$(az servicebus namespace authorization-rule keys list \
 --resource-group $RES_GROUP \
 --namespace-name $NAMESPACE \
 --name RootManageSharedAccessKey \
 --query primaryConnectionString \
 --output tsv)

创建客户端Create client

填充 SB_CONN_STR 环境变量后,即可创建 ServiceBusClient。Once you've populated the SB_CONN_STR environment variable, you can create the ServiceBusClient.

import os
from azure.servicebus import ServiceBusClient

connection_str = os.environ['SB_CONN_STR']

sb_client = ServiceBusClient.from_connection_string(connection_str)

如果希望使用异步操作,请使用 azure.servicebus.aio 命名空间。If you wish to use asynchronous operations, please use the azure.servicebus.aio namespace.

import os
from azure.servicebus.aio import ServiceBusClient

connection_str = os.environ['SB_CONN_STR']

sb_client = ServiceBusClient.from_connection_string(connection_str)

服务总线队列Service Bus queues

服务总线队列是存储队列的替代方法,可能在需要使用推送型传递(使用长轮询)执行更高级的消息功能(较大的消息大小、消息排序、单操作破坏性读取、按计划执行的传送)时十分有用。Service Bus queues are an alternative to Storage queues that might be useful in scenarios where more advanced messaging features are needed (larger message sizes, message ordering, single-operation destructive reads, scheduled delivery) using push-style delivery (using long polling).

创建队列Create queue

这样会在服务总线命名空间中创建新的队列。This creates a new queue within the Service Bus namespace. 如果命名空间中已经存在相同名称的队列,则会引发错误。If a queue of the same name already exists within the namespace an error will be raised.

sb_client.create_queue("MyQueue")

也可指定用于配置队列行为的可选参数。Optional parameters to configure the queue behavior can also be specified.

sb_client.create_queue(
    "MySessionQueue",
    requires_session=True  # Create a sessionful queue
    max_delivery_count=5  # Max delivery attempts per message
)

获取队列客户端Get a queue client

可以使用 QueueClient 通过队列发送和接收消息,以及执行其他操作。A QueueClient can be used to send and receive messages from the queue, along with other operations.

queue_client = sb_client.get_queue("MyQueue")

发送消息Sending messages

此队列客户端可以一次发送一条或多条消息:The queue client can send one or more messages at a time:

from azure.servicebus import Message

message = Message("Hello World")
queue_client.send(message)

message_one = Message("First")
message_two = Message("Second")
queue_client.send([message_one, message_two])

每次调用 QueueClient.send 都会创建新的服务连接。Each call to QueueClient.send will create a new service connection. 若要将同一连接重复用于多个发送调用,可以打开一个发送程序:To reuse the same connection for multiple send calls, you can open a sender:

message_one = Message("First")
message_two = Message("Second")

with queue_client.get_sender() as sender:
    sender.send(message_one)
    sender.send(message_two)

如果使用异步客户端,则上述操作会使用异步语法:If you are using an asynchronous client, the above operations will use async syntax:

from azure.servicebus.aio import Message

message = Message("Hello World")
await queue_client.send(message)

message_one = Message("First")
message_two = Message("Second")
async with queue_client.get_sender() as sender:
    await sender.send(message_one)
    await sender.send(message_two)

接收消息Receiving messages

可以从队列以连续迭代器的方式接收消息。Messages can be received from a queue as a continuous iterator. 进行消息接收的默认模式是 PeekLock,此模式要求显式完成每条消息,以便将其从队列中删除。The default mode for message receiving is PeekLock, which requires each message to be explicitly completed in order that it be removed from the queue.

messages = queue_client.get_receiver()
for message in messages:
    print(message)
    message.complete()

在迭代器运行期间,服务连接将始终保持打开状态。The service connection will remain open for the entirety of the iterator. 如果发现自己只对消息流进行了部分迭代,则应在 with 语句中运行接收器,确保连接关闭:If you find yourself only partially iterating the message stream, you should run the receiver in a with statement to ensure the connection is closed:

with queue_client.get_receiver() as messages:
    for message in messages:
        print(message)
        message.complete()
        break

如果使用异步客户端,则上述操作会使用异步语法:If you are using an asynchronous client, the above operations will use async syntax:

async with queue_client.get_receiver() as messages:
    async for message in messages:
        print(message)
        await message.complete()
        break

服务总线主题和订阅Service Bus topics and subscriptions

服务总线主题和订阅是基于服务总线队列的抽象,该队列以“发布/订阅”模式提供一对多的通信形式。Service Bus topics and subscriptions are an abstraction on top of Service Bus queues that provide a one-to-many form of communication, in a publish/subscribe pattern. 消息发送到一个主题并传送到一个或多个关联订阅,这可以用来扩大接收方的数目。Messages are sent to a topic and delivered to one or more associated subscriptions, which is useful for scaling to large numbers of recipients.

创建主题Create topic

这样会在服务总线命名空间中创建新的主题。This creates a new topic within the Service Bus namespace. 如果已经存在相同名称的主题,则会引发错误。If a topic of the same name already exists an error will be raised.

sb_client.create_topic("MyTopic")

获取主题客户端Get a topic client

可以使用 TopicClient 将消息发送到主题,以及执行其他操作。A TopicClient can be used to send messages to the topic, along with other operations.

topic_client = sb_client.get_topic("MyTopic")

创建订阅Create subscription

这样会在服务总线命名空间中为指定主题创建新的订阅。This creates a new subscription for the specified topic within the Service Bus namespace.

sb_client.create_subscription("MyTopic", "MySubscription")

获取订阅客户端Get a subscription client

可以使用 SubscriptionClient 通过主题接收消息,以及执行其他操作。A SubscriptionClient can be used to receive messages from the topic, along with other operations.

topic_client = sb_client.get_subscription("MyTopic", "MySubscription")

从 v0.21.1 迁移到 v0.50.0Migration from v0.21.1 to v0.50.0

版本 0.50.0 中引入了主要的中断性变更。Major breaking changes were introduced in version 0.50.0. 原始的基于 HTTP 的 API 在 v0.50.0 中仍可用 - 不过,它现在存在于新的命名空间:azure.servicebus.control_clientThe original HTTP-based API is still available in v0.50.0 - however it now exists under a new namesapce: azure.servicebus.control_client.

是否应该升级?Should I upgrade?

与 v0.21.1 相比,新包 (v0.50.0) 没有在基于 HTTP 的操作方面提供任何改进。The new package (v0.50.0) offers no improvements in HTTP-based operations over v0.21.1. 基于 HTTP 的 API 未做更改,只是现在存在于新的命名空间。The HTTP-based API is identical except that it now exists under a new namespace. 因此,如果只是希望使用基于 HTTP 的操作(create_queuedelete_queue 等),则现在升级没有额外的好处。For this reason if you only wish to use HTTP-based operations (create_queue, delete_queue etc) - there will be no additional benefit in upgrading at this time.

如何将代码迁移到新版本?How do I migrate my code to the new version?

针对 v0.21.0 编写的代码可以移植到版本 0.50.0,只需更改导入命名空间即可:Code written against v0.21.0 can be ported to version 0.50.0 by simply changing the import namespace:

# from azure.servicebus import ServiceBusService  <- This will now raise an ImportError
from azure.servicebus.control_client import ServiceBusService

key_name = 'RootManageSharedAccessKey' # SharedAccessKeyName from Azure portal
key_value = '' # SharedAccessKey from Azure portal
sbs = ServiceBusService(service_namespace,
                        shared_access_key_name=key_name,
                        shared_access_key_value=key_value)

使用旧 API 的基于 HTTP 的操作Using HTTP-based operations of the legacy API

以下文档介绍旧版 API,应该适用于那些希望将现有代码移植到 v0.50.0 但不做任何其他更改的用户。The following documentation describes the legacy API and should be used for those wishing to port existing code to v0.50.0 without making any additional changes. 此参考也可用作那些使用 v0.21.1 的用户的指南。This reference can also be used as guidance by those using v0.21.1. 对于那些编写新代码的用户,建议使用上面介绍的新 API。For those writing new code, we recommend using the new API described above.

服务总线队列Service Bus queues

共享访问签名 (SAS) 身份验证Shared Access Signature (SAS) authentication

若要使用共享访问签名身份验证,请使用以下代码创建服务总线服务:To use Shared Access Signature authentication, create the service bus service with:

from azure.servicebus.control_client import ServiceBusService

key_name = 'RootManageSharedAccessKey' # SharedAccessKeyName from Azure portal
key_value = '' # SharedAccessKey from Azure portal
sbs = ServiceBusService(service_namespace,
                        shared_access_key_name=key_name,
                        shared_access_key_value=key_value)

访问控制服务 (ACS) 身份验证Access Control Service (ACS) authentication

新的服务总线命名空间不支持 ACS。ACS is not supported on new Service Bus namespaces. 建议将应用程序迁移到 SAS 身份验证We recommend migrating applications to SAS authentication. 若要在旧版服务总线命名空间中使用 ACS 身份验证,请使用以下命令创建 ServiceBusService:To use ACS authentication within an older Service Bus namesapce, create the ServiceBusService with:

from azure.servicebus.control_client import ServiceBusService

account_key = '' # DEFAULT KEY from Azure portal
issuer = 'owner' # DEFAULT ISSUER from Azure portal
sbs = ServiceBusService(service_namespace,
                        account_key=account_key,
                        issuer=issuer)

发送和接收消息Sending and receiving messages

create_queue 方法可用于确保队列存在:The create_queue method can be used to ensure a queue exists:

sbs.create_queue('taskqueue')

然后可以调用 send_queue_message 方法将消息插入到队列:The send_queue_message method can then be called to insert the message into the queue:

from azure.servicebus.control_client import Message

msg = Message('Hello World!')
sbs.send_queue_message('taskqueue', msg)

然后可以调用 send_queue_message_batch 方法来一次发送多条消息:The send_queue_message_batch method can then be called to send several messages at once:

from azure.servicebus.control_client import Message

msg1 = Message('Hello World!')
msg2 = Message('Hello World again!')
sbs.send_queue_message_batch('taskqueue', [msg1, msg2])

然后可以调用 receive_queue_message 方法将消息取消排队。It is then possible to call the receive_queue_message method to dequeue the message.

msg = sbs.receive_queue_message('taskqueue')

服务总线主题Service Bus topics

create_topic 方法可用于创建服务器端主题:The create_topic method can be used to create a server-side topic:

sbs.create_topic('taskdiscussion')

send_topic_message 方法可用于向主题发送一个消息:The send_topic_message method can be used to send a message to a topic:

from azure.servicebus.control_client import Message

msg = Message(b'Hello World!')
sbs.send_topic_message('taskdiscussion', msg)

send_topic_message_batch 方法可用于一次发送多条消息:The send_topic_message_batch method can be used to send several messages at once:

from azure.servicebus.control_client import Message

msg1 = Message(b'Hello World!')
msg2 = Message(b'Hello World again!')
sbs.send_topic_message_batch('taskdiscussion', [msg1, msg2])

请考虑,在 Python 3 中,字符串消息将进行 utf-8 编码,并且你必须在 Python 2 中自行管理你的编码。Please consider that in Python 3 a str message will be utf-8 encoded and you should have to manage your encoding yourself in Python 2.

然后,客户端可以创建订阅并开始通过调用 create_subscription 方法(后跟 receive_subscription_message 方法)使用消息。A client can then create a subscription and start consuming messages by calling the create_subscription method followed by the receive_subscription_message method. 请注意,创建订阅之前发送的任何消息均不会被收到。Please note that any messages sent before the subscription is created will not be received.

from azure.servicebus.control_client import Message

sbs.create_subscription('taskdiscussion', 'client1')
msg = Message('Hello World!')
sbs.send_topic_message('taskdiscussion', msg)
msg = sbs.receive_subscription_message('taskdiscussion', 'client1')

事件中心Event Hub

使用事件中心能够以高呑吐量从一组不同的设备和服务收集事件流。Event Hubs enable the collection of event streams at high throughput, from a diverse set of devices and services.

create_event_hub 方法可用于创建事件中心:The create_event_hub method can be used to create an event hub:

sbs.create_event_hub('myhub')

若要发送事件,请执行以下操作:To send an event:

sbs.send_event('myhub', '{ "DeviceId":"dev-01", "Temperature":"37.0" }')

事件内容是事件消息或包含多个消息的 JSON 编码字符串。The event content is the event message or JSON-encoded string that contains multiple messages.

高级功能Advanced features

Broker 属性和 User 属性Broker properties and user properties

本部分介绍如何使用此处定义的 Broker 和 User 属性:This section describes how to use Broker and User properties defined here:

sent_msg = Message(b'This is the third message',
                   broker_properties={'Label': 'M3'},
                   custom_properties={'Priority': 'Medium',
                                      'Customer': 'ABC'}
            )

可以使用 datetime、int、float 或 booleanYou can use datetime, int, float or boolean

props = {'hello': 'world',
         'number': 42,
         'active': True,
         'deceased': False,
         'large': 8555111000,
         'floating': 3.14,
         'dob': datetime(2011, 12, 14),
         'double_quote_message': 'This "should" work fine',
         'quote_message': "This 'should' work fine"}
sent_msg = Message(b'message with properties', custom_properties=props)

出于与此库的旧版本的兼容性原因,broker_properties 也可以定义为 JSON 字符串。For compatibility reason with old version of this library, broker_properties could also be defined as a JSON string. 如果是这种情况,那么你要负责编写有效的 JSON 字符串,因为在发送到 RestAPI 之前,Python 不会执行任何检查。If this situation, you're responsible to write a valid JSON string, no check will be made by Python before sending to the RestAPI.

broker_properties = '{"ForcePersistence": false, "Label": "My label"}'
sent_msg = Message(b'receive message',
                   broker_properties = broker_properties
)

后续步骤Next Steps