Библиотеки служебной шины для PythonService Bus libraries for Python

Служебная шина Microsoft Azure поддерживает набор облачных технологий промежуточного уровня, ориентированных на обработку сообщений. Эти технологии представлены надежными очередями сообщений, а также возможностями публикации и подписки в рамках обмена сообщениями.Microsoft Azure Service Bus supports a set of cloud-based, message-oriented middleware technologies including reliable message queuing and durable publish/subscribe messaging.

Новые возможности версии 0.50.0What's new in v0.50.0?

Начиная с версии 0.50.0, новый API на основе AMQP можно использовать для отправки и получения сообщений.As of version 0.50.0 a new AMQP-based API is available for sending and receiving messages. Это обновление включает в себя критические изменения.This update involves breaking changes.

Чтобы узнать, подходит ли для вас сейчас обновление, см. руководство по переходу с версии 0.21.1 на версию 0.50.0.Please read Migration from v0.21.1 to v0.50.0 to determine if upgrading is right for you at this time.

Новое предложение API на основе AMQP предлагает надежную передачу сообщений, высокую производительность и расширенную поддержку соответствующих возможностей в будущем.The new AMQP-based API offers improved message passing reliability, performance and expanded feature support going forward. Новое предложение API также поддерживает асинхронные операции (на основе asyncio) для отправки, получения и обработки сообщений.The new API also offers support for asynchronous operations (based on asyncio) for sending, receiving and handling messages.

См. дополнительные сведения об операциях на основе HTTP с использованием устаревшего API.For documentation on the legacy HTTP-based operations please see Using HTTP-based operations of the legacy API.

Предварительные требованияPrerequisites

УстановкаInstallation

pip install azure-servicebus

Подключение к служебной шине AzureConnect to Azure Service Bus

Получение учетных данныхGet credentials

Используйте приведенный ниже фрагмент кода Azure CLI для заполнения переменной среды строкой подключения служебной шины (вы можете найти это значение на портале Azure).Use the Azure CLI snippet below to populate an environment variable with the Service Bus connection string (you can also find this value in the Azure portal). Фрагмент кода отформатирован для оболочки Bash.The snippet is formatted for the Bash shell.

RES_GROUP=<resource-group-name>
NAMESPACE=<servicebus-namespace>

export SB_CONN_STR=$(az servicebus namespace authorization-rule keys list \
 --resource-group $RES_GROUP \
 --namespace-name $NAMESPACE \
 --name RootManageSharedAccessKey \
 --query primaryConnectionString \
 --output tsv)

Создание клиентаCreate client

Когда вы заполните переменную среды SB_CONN_STR, создайте ServiceBusClient.Once you've populated the SB_CONN_STR environment variable, you can create the ServiceBusClient.

import os
from azure.servicebus import ServiceBusClient

connection_str = os.environ['SB_CONN_STR']

sb_client = ServiceBusClient.from_connection_string(connection_str)

Если вы хотите выполнять асинхронные операции, используйте пространство имен azure.servicebus.aio.If you wish to use asynchronous operations, please use the azure.servicebus.aio namespace.

import os
from azure.servicebus.aio import ServiceBusClient

connection_str = os.environ['SB_CONN_STR']

sb_client = ServiceBusClient.from_connection_string(connection_str)

Очереди служебной шиныService Bus queues

Очереди служебной шины — это альтернатива очередям хранилища. Они используются, когда требуются более сложные функции обмена сообщениями (обработка сообщений большиих размеров, упорядочение сообщений, чтение сообщений с удалением сообщений, запланированная доставка) с использованием принудительной доставки (с помощью продолжительного опроса).Service Bus queues are an alternative to Storage queues that might be useful in scenarios where more advanced messaging features are needed (larger message sizes, message ordering, single-operation destructive reads, scheduled delivery) using push-style delivery (using long polling).

Создание очередиCreate queue

Вы можете создать очередь в пространстве имен служебной шины.This creates a new queue within the Service Bus namespace. Если очередь с тем же именем уже существует в пространстве имен, возникает ошибка.If a queue of the same name already exists within the namespace an error will be raised.

sb_client.create_queue("MyQueue")

Также можно указать необязательные параметры, чтобы настроить поведение очереди.Optional parameters to configure the queue behavior can also be specified.

sb_client.create_queue(
    "MySessionQueue",
    requires_session=True  # Create a sessionful queue
    max_delivery_count=5  # Max delivery attempts per message
)

Получение клиента очередиGet a queue client

QueueClient можно использовать для обмена сообщениями в очереди, а также других операций.A QueueClient can be used to send and receive messages from the queue, along with other operations.

queue_client = sb_client.get_queue("MyQueue")

Отправка сообщенийSending messages

Клиент очереди может отправлять одно или несколько сообщений одновременно.The queue client can send one or more messages at a time:

from azure.servicebus import Message

message = Message("Hello World")
queue_client.send(message)

message_one = Message("First")
message_two = Message("Second")
queue_client.send([message_one, message_two])

Каждый вызов к QueueClient.send создает новое подключение к службе.Each call to QueueClient.send will create a new service connection. Чтобы повторно использовать одно подключение для нескольких вызовов отправки, можно открыть отправителя.To reuse the same connection for multiple send calls, you can open a sender:

message_one = Message("First")
message_two = Message("Second")

with queue_client.get_sender() as sender:
    sender.send(message_one)
    sender.send(message_two)

При использовании асинхронного клиента, в указанных выше операциях используется синтаксис async.If you are using an asynchronous client, the above operations will use async syntax:

from azure.servicebus.aio import Message

message = Message("Hello World")
await queue_client.send(message)

message_one = Message("First")
message_two = Message("Second")
async with queue_client.get_sender() as sender:
    await sender.send(message_one)
    await sender.send(message_two)

Получение сообщенийReceiving messages

Сообщения можно получать из очереди в рамках цикла непрерывной итерации.Messages can be received from a queue as a continuous iterator. Режим по умолчанию для приема сообщений — PeekLock. Для включения этого режима требуется, чтобы каждое сообщение было явно завершено для его удаления из очереди.The default mode for message receiving is PeekLock, which requires each message to be explicitly completed in order that it be removed from the queue.

messages = queue_client.get_receiver()
for message in messages:
    print(message)
    message.complete()

Подключение службы будет оставаться открытым на протяжении всего цикла итерации.The service connection will remain open for the entirety of the iterator. Если для потока сообщений выполняется только частичная итерация, запустите получатель в операторе with, чтобы закрыть подключение.If you find yourself only partially iterating the message stream, you should run the receiver in a with statement to ensure the connection is closed:

with queue_client.get_receiver() as messages:
    for message in messages:
        print(message)
        message.complete()
        break

При использовании асинхронного клиента, в указанных выше операциях используется синтаксис async.If you are using an asynchronous client, the above operations will use async syntax:

async with queue_client.get_receiver() as messages:
    async for message in messages:
        print(message)
        await message.complete()
        break

Разделы и подписки служебной шиныService Bus topics and subscriptions

Наряду с очередями служебной шины разделы и подписки служебной шины представляют еще один уровень абстракции. Эти сущности обеспечивают разновидность взаимодействия "один ко многим" в рамках схемы публикации и подписки.Service Bus topics and subscriptions are an abstraction on top of Service Bus queues that provide a one-to-many form of communication, in a publish/subscribe pattern. Сообщения отправляются в раздел и доставляются в одну или несколько связанных подписок. Это особенно удобно при масштабировании с учетом большого количества получателей.Messages are sent to a topic and delivered to one or more associated subscriptions, which is useful for scaling to large numbers of recipients.

Создание разделаCreate topic

Вы можете создать раздел в пространстве имен служебной шины.This creates a new topic within the Service Bus namespace. Если раздел с тем же именем уже существует, возникает ошибка.If a topic of the same name already exists an error will be raised.

sb_client.create_topic("MyTopic")

Получение клиента разделаGet a topic client

TopicClient можно использовать для отправки сообщений в раздел, а также других операций.A TopicClient can be used to send messages to the topic, along with other operations.

topic_client = sb_client.get_topic("MyTopic")

Создание подпискиCreate subscription

Вы можете создать подписку для указанного раздела в пространстве имен служебной шины.This creates a new subscription for the specified topic within the Service Bus namespace.

sb_client.create_subscription("MyTopic", "MySubscription")

Получение клиента подпискиGet a subscription client

SubscriptionClient можно использовать для получения сообщений из раздела, а также других операций.A SubscriptionClient can be used to receive messages from the topic, along with other operations.

topic_client = sb_client.get_subscription("MyTopic", "MySubscription")

Переход с версии 0.21.1 на версию 0.50.0Migration from v0.21.1 to v0.50.0

В версии 0.50.0 представлены основные критические изменения.Major breaking changes were introduced in version 0.50.0. Исходный API на основе HTTP по-прежнему доступен в версии 0.50.0, но теперь он включен в новое пространство имен: azure.servicebus.control_client.The original HTTP-based API is still available in v0.50.0 - however it now exists under a new namesapce: azure.servicebus.control_client.

Следует ли выполнять обновление?Should I upgrade?

Новый пакет (версии 0.50.0) не включает улучшения операций на основе HTTP в сравнении с версией 0.21.1.The new package (v0.50.0) offers no improvements in HTTP-based operations over v0.21.1. API на основе HTTP является идентичным, за исключением того, что этот интерфейс включен в новое пространство имен.The HTTP-based API is identical except that it now exists under a new namespace. Поэтому, если вы хотите использовать операции на основе HTTP (create_queue, delete_queue и т.д.), сейчас вы не получите дополнительные преимущества от обновления.For this reason if you only wish to use HTTP-based operations (create_queue, delete_queue etc) - there will be no additional benefit in upgrading at this time.

Как перенести код в новую версию?How do I migrate my code to the new version?

Код, написанный для версии 0.21.0, можно перенести в версию 0.50.0. Для этого нужно просто изменить пространство имен для импорта.Code written against v0.21.0 can be ported to version 0.50.0 by simply changing the import namespace:

# from azure.servicebus import ServiceBusService  <- This will now raise an ImportError
from azure.servicebus.control_client import ServiceBusService

key_name = 'RootManageSharedAccessKey' # SharedAccessKeyName from Azure portal
key_value = '' # SharedAccessKey from Azure portal
sbs = ServiceBusService(service_namespace,
                        shared_access_key_name=key_name,
                        shared_access_key_value=key_value)

Использование операций на основе HTTP в устаревшей версии APIUsing HTTP-based operations of the legacy API

В следующей документации описана устаревшая версия API. Она предназначена для тех, кто хочет перенести существующий код в версию 0.50.0 без внесения дополнительных изменений.The following documentation describes the legacy API and should be used for those wishing to port existing code to v0.50.0 without making any additional changes. Эти справочные материалы также можно использовать как рекомендации для пользователей версии 0.21.1.This reference can also be used as guidance by those using v0.21.1. Для тех, кто создает новый код, мы рекомендуем использовать новый API, описанный выше.For those writing new code, we recommend using the new API described above.

Очереди служебной шиныService Bus queues

Аутентификация SASShared Access Signature (SAS) authentication

Чтобы использовать аутентификацию SAS, создайте служебную шину:To use Shared Access Signature authentication, create the service bus service with:

from azure.servicebus.control_client import ServiceBusService

key_name = 'RootManageSharedAccessKey' # SharedAccessKeyName from Azure portal
key_value = '' # SharedAccessKey from Azure portal
sbs = ServiceBusService(service_namespace,
                        shared_access_key_name=key_name,
                        shared_access_key_value=key_value)

Аутентификация службы контроля доступа (ACS)Access Control Service (ACS) authentication

ACS не поддерживается в новых пространствах имен Служебной шины.ACS is not supported on new Service Bus namespaces. Мы рекомендуем перенести приложения для использования аутентификации SAS.We recommend migrating applications to SAS authentication. Чтобы использовать аутентификацию ACS в предыдущем пространстве имен служебной шины, создайте ServiceBusService:To use ACS authentication within an older Service Bus namesapce, create the ServiceBusService with:

from azure.servicebus.control_client import ServiceBusService

account_key = '' # DEFAULT KEY from Azure portal
issuer = 'owner' # DEFAULT ISSUER from Azure portal
sbs = ServiceBusService(service_namespace,
                        account_key=account_key,
                        issuer=issuer)

Отправка и получение сообщенийSending and receiving messages

Метод create_queue можно использовать, чтобы проверить наличие очереди:The create_queue method can be used to ensure a queue exists:

sbs.create_queue('taskqueue')

Затем можно вызвать метод send_queue_message, чтобы включить сообщение в очередь:The send_queue_message method can then be called to insert the message into the queue:

from azure.servicebus.control_client import Message

msg = Message('Hello World!')
sbs.send_queue_message('taskqueue', msg)

После этого можно вызвать метод send_queue_message_batch, чтобы одновременно отправить несколько сообщений:The send_queue_message_batch method can then be called to send several messages at once:

from azure.servicebus.control_client import Message

msg1 = Message('Hello World!')
msg2 = Message('Hello World again!')
sbs.send_queue_message_batch('taskqueue', [msg1, msg2])

Также можно вызвать метод receive_queue_message, чтобы исключить сообщение из очереди.It is then possible to call the receive_queue_message method to dequeue the message.

msg = sbs.receive_queue_message('taskqueue')

Разделы служебной шиныService Bus topics

Метод create_topic можно использовать, чтобы создать раздел на стороне сервера:The create_topic method can be used to create a server-side topic:

sbs.create_topic('taskdiscussion')

Метод send_topic_message можно использовать, чтобы отправить сообщение в раздел:The send_topic_message method can be used to send a message to a topic:

from azure.servicebus.control_client import Message

msg = Message(b'Hello World!')
sbs.send_topic_message('taskdiscussion', msg)

Метод send_topic_message_batch можно использовать, чтобы отправить несколько сообщений одновременно:The send_topic_message_batch method can be used to send several messages at once:

from azure.servicebus.control_client import Message

msg1 = Message(b'Hello World!')
msg2 = Message(b'Hello World again!')
sbs.send_topic_message_batch('taskdiscussion', [msg1, msg2])

Учтите, что строковые сообщения в Python 3 отправляются в кодировке utf-8. В Python 2 кодировкой управляет пользователь.Please consider that in Python 3 a str message will be utf-8 encoded and you should have to manage your encoding yourself in Python 2.

Клиент затем может создать подписку и начать обрабатывать сообщения, вызывая методы create_subscription и receive_subscription_message.A client can then create a subscription and start consuming messages by calling the create_subscription method followed by the receive_subscription_message method. Обратите внимание на то, что все сообщения, отправленные до создания подписки, не будут получены.Please note that any messages sent before the subscription is created will not be received.

from azure.servicebus.control_client import Message

sbs.create_subscription('taskdiscussion', 'client1')
msg = Message('Hello World!')
sbs.send_topic_message('taskdiscussion', msg)
msg = sbs.receive_subscription_message('taskdiscussion', 'client1')

Концентратор событийEvent Hub

Центры событий собирают потоки событий с высокой пропускной способностью из разнородного набора устройств и служб.Event Hubs enable the collection of event streams at high throughput, from a diverse set of devices and services.

Метод create_event_hub можно использовать, чтобы создать концентратор событий:The create_event_hub method can be used to create an event hub:

sbs.create_event_hub('myhub')

Отправка событияTo send an event:

sbs.send_event('myhub', '{ "DeviceId":"dev-01", "Temperature":"37.0" }')

Содержимое событий — это сообщение о событии или строка в кодировке JSON, содержащая несколько сообщений.The event content is the event message or JSON-encoded string that contains multiple messages.

Дополнительные функцииAdvanced features

Свойства Broker и UserBroker properties and user properties

В этом разделе описано, как использовать свойства Broker и User, определенные здесь:This section describes how to use Broker and User properties defined here:

sent_msg = Message(b'This is the third message',
                   broker_properties={'Label': 'M3'},
                   custom_properties={'Priority': 'Medium',
                                      'Customer': 'ABC'}
            )

Можно использовать значение даты и времени, целое число, число с плавающей запятой или логическое значение.You can use datetime, int, float or boolean

props = {'hello': 'world',
         'number': 42,
         'active': True,
         'deceased': False,
         'large': 8555111000,
         'floating': 3.14,
         'dob': datetime(2011, 12, 14),
         'double_quote_message': 'This "should" work fine',
         'quote_message': "This 'should' work fine"}
sent_msg = Message(b'message with properties', custom_properties=props)

Чтобы обеспечить совместимость со старой версией этой библиотеки, broker_properties также можно определить как строку JSON.For compatibility reason with old version of this library, broker_properties could also be defined as a JSON string. В такой ситуации создавать допустимую строку JSON должен пользователь. Средства Python не будут выполнять проверку до отправки в RestAPI.If this situation, you're responsible to write a valid JSON string, no check will be made by Python before sending to the RestAPI.

broker_properties = '{"ForcePersistence": false, "Label": "My label"}'
sent_msg = Message(b'receive message',
                   broker_properties = broker_properties
)

Дальнейшие действияNext Steps