Python 用 Service Bus ライブラリService Bus libraries for Python

Microsoft Azure Service Bus は、信頼性の高いメッセージ キュー機能や永続的なパブリッシュ/サブスクライブ メッセージング機能など、クラウドベースのメッセージ指向ミドルウェアの一連のテクノロジをサポートしています。Microsoft Azure Service Bus supports a set of cloud-based, message-oriented middleware technologies including reliable message queuing and durable publish/subscribe messaging.

v0.50.0 の新機能What's new in v0.50.0?

バージョン 0.50.0 では、メッセージの送受信に、新しい AMQP ベースの API を使用できるようになりました。As of version 0.50.0 a new AMQP-based API is available for sending and receiving messages. この更新には、破壊的変更が含まれています。This update involves breaking changes.

現時点でのアップグレードが適切であるかどうかを判断するには、「v0.21.1 から v0.50.0 への移行」をお読みください。Please read Migration from v0.21.1 to v0.50.0 to determine if upgrading is right for you at this time.

新しい AMQP ベースの API では、メッセージの受け渡しの信頼性やパフォーマンスが向上し、今後の機能サポートも強化されます。The new AMQP-based API offers improved message passing reliability, performance and expanded feature support going forward. また、新しい API では、メッセージの送受信や処理に対する非同期操作 (asyncio に基づく) もサポートされます。The new API also offers support for asynchronous operations (based on asyncio) for sending, receiving and handling messages.

従来の HTTP ベースの操作については、「HTTP ベースの操作による従来の API の使用」を参照してください。For documentation on the legacy HTTP-based operations please see Using HTTP-based operations of the legacy API.

前提条件Prerequisites

インストールInstallation

pip install azure-servicebus

Azure Service Bus に接続するConnect to Azure Service Bus

資格情報の取得Get credentials

次の Azure CLI スニペットを使用して、環境変数に Service Bus 接続文字列を設定します (この値は、Azure portal で見つけることもできます)。Use the Azure CLI snippet below to populate an environment variable with the Service Bus connection string (you can also find this value in the Azure portal). このスニペットは Bash シェル用にフォーマットされています。The snippet is formatted for the Bash shell.

RES_GROUP=<resource-group-name>
NAMESPACE=<servicebus-namespace>

export SB_CONN_STR=$(az servicebus namespace authorization-rule keys list \
 --resource-group $RES_GROUP \
 --namespace-name $NAMESPACE \
 --name RootManageSharedAccessKey \
 --query primaryConnectionString \
 --output tsv)

クライアントの作成Create client

SB_CONN_STR 環境変数を設定したら、ServiceBusClient を作成できます。Once you've populated the SB_CONN_STR environment variable, you can create the ServiceBusClient.

import os
from azure.servicebus import ServiceBusClient

connection_str = os.environ['SB_CONN_STR']

sb_client = ServiceBusClient.from_connection_string(connection_str)

非同期操作を使用する場合は、azure.servicebus.aio 名前空間を使用してください。If you wish to use asynchronous operations, please use the azure.servicebus.aio namespace.

import os
from azure.servicebus.aio import ServiceBusClient

connection_str = os.environ['SB_CONN_STR']

sb_client = ServiceBusClient.from_connection_string(connection_str)

Service Bus キューService Bus queues

プッシュ型配信 (ロング ポーリング) を使用した高度なメッセージング機能が要求されるシナリオでは、Storage キューよりも Service Bus キューの方が適している場合があります。より大きなメッセージ サイズへの対応、メッセージの順序付け、単一操作での破壊的な読み取り、スケジュール指定配信は、そのような機能の例です。Service Bus queues are an alternative to Storage queues that might be useful in scenarios where more advanced messaging features are needed (larger message sizes, message ordering, single-operation destructive reads, scheduled delivery) using push-style delivery (using long polling).

キューの作成Create queue

これにより、Service Bus 名前空間内に新しいキューが作成されます。This creates a new queue within the Service Bus namespace. 同じ名前のキューが既に名前空間内に存在する場合は、エラーが発生します。If a queue of the same name already exists within the namespace an error will be raised.

sb_client.create_queue("MyQueue")

省略可能なパラメーターを指定して、キューの動作を構成することもできます。Optional parameters to configure the queue behavior can also be specified.

sb_client.create_queue(
    "MySessionQueue",
    requires_session=True  # Create a sessionful queue
    max_delivery_count=5  # Max delivery attempts per message
)

キュー クライアントの取得Get a queue client

キューとの間でのメッセージの送受信や、その他の操作を実行するには、QueueClient を使用できます。A QueueClient can be used to send and receive messages from the queue, along with other operations.

queue_client = sb_client.get_queue("MyQueue")

メッセージの送信Sending messages

キュー クライアントでは、一度に 1 つ以上のメッセージを送信できます。The queue client can send one or more messages at a time:

from azure.servicebus import Message

message = Message("Hello World")
queue_client.send(message)

message_one = Message("First")
message_two = Message("Second")
queue_client.send([message_one, message_two])

QueueClient.send に対する呼び出しごとに、新しいサービス接続が作成されます。Each call to QueueClient.send will create a new service connection. 複数の送信呼び出しに同じ接続を再利用するために、送信側を開くことができます。To reuse the same connection for multiple send calls, you can open a sender:

message_one = Message("First")
message_two = Message("Second")

with queue_client.get_sender() as sender:
    sender.send(message_one)
    sender.send(message_two)

非同期クライアントを使用している場合は、上述の操作で非同期の構文を使用します。If you are using an asynchronous client, the above operations will use async syntax:

from azure.servicebus.aio import Message

message = Message("Hello World")
await queue_client.send(message)

message_one = Message("First")
message_two = Message("Second")
async with queue_client.get_sender() as sender:
    await sender.send(message_one)
    await sender.send(message_two)

メッセージの受信Receiving messages

メッセージは、継続的な反復子としてキューから受信できます。Messages can be received from a queue as a continuous iterator. メッセージ受信の既定のモードは PeekLock です。この場合、メッセージがキューから削除されるように、各メッセージを明示的に完了する必要があります。The default mode for message receiving is PeekLock, which requires each message to be explicitly completed in order that it be removed from the queue.

messages = queue_client.get_receiver()
for message in messages:
    print(message)
    message.complete()

サービス接続は、反復子全体にわたって開かれたままになります。The service connection will remain open for the entirety of the iterator. メッセージ ストリームが部分的に反復処理されている場合は、with ステートメントで受信側を実行して、接続を確実に閉じる必要があります。If you find yourself only partially iterating the message stream, you should run the receiver in a with statement to ensure the connection is closed:

with queue_client.get_receiver() as messages:
    for message in messages:
        print(message)
        message.complete()
        break

非同期クライアントを使用している場合は、上述の操作で非同期の構文を使用します。If you are using an asynchronous client, the above operations will use async syntax:

async with queue_client.get_receiver() as messages:
    async for message in messages:
        print(message)
        await message.complete()
        break

Service Bus トピックとサブスクリプションService Bus topics and subscriptions

Service Bus のトピックとサブスクリプションは、発行とサブスクライブのパターンで一対多形式の通信を提供する Service Bus キュー上の抽象化です。Service Bus topics and subscriptions are an abstraction on top of Service Bus queues that provide a one-to-many form of communication, in a publish/subscribe pattern. メッセージはトピックに送信された後、関連付けられている 1 つ以上のサブスクリプションに配信されますが、この方法は多数の受信者にスケーリングする場合に便利です。Messages are sent to a topic and delivered to one or more associated subscriptions, which is useful for scaling to large numbers of recipients.

トピックの作成Create topic

これにより、Service Bus 名前空間内に新しいトピックが作成されます。This creates a new topic within the Service Bus namespace. 同じ名前のトピックが既に存在する場合は、エラーが発生します。If a topic of the same name already exists an error will be raised.

sb_client.create_topic("MyTopic")

トピック クライアントの取得Get a topic client

トピックへのメッセージの送信や、その他の操作を実行するには、TopicClient を使用できます。A TopicClient can be used to send messages to the topic, along with other operations.

topic_client = sb_client.get_topic("MyTopic")

サブスクリプションの作成Create subscription

これにより、Service Bus 名前空間内に、指定したトピックに対する新しいサブスクリプションが作成されます。This creates a new subscription for the specified topic within the Service Bus namespace.

sb_client.create_subscription("MyTopic", "MySubscription")

サブスクリプション クライアントの取得Get a subscription client

トピックからのメッセージの受信や、その他の操作を実行するには、SubscriptionClient を使用できます。A SubscriptionClient can be used to receive messages from the topic, along with other operations.

topic_client = sb_client.get_subscription("MyTopic", "MySubscription")

v0.21.1 から v0.50.0 への移行Migration from v0.21.1 to v0.50.0

バージョン 0.50.0 には、重要な破壊的変更が導入されました。Major breaking changes were introduced in version 0.50.0. 元の HTTP ベースの API はv0.50.0 でも引き続き使用できますが、新しい名前空間 azure.servicebus.control_client に存在します。The original HTTP-based API is still available in v0.50.0 - however it now exists under a new namesapce: azure.servicebus.control_client.

アップグレードの必要性についてShould I upgrade?

新しいパッケージ (v0.50.0) を v0.21.1 と比較した場合、HTTP ベースの操作に関しては改善点はありません。The new package (v0.50.0) offers no improvements in HTTP-based operations over v0.21.1. HTTP ベースの API は、新しい名前空間に存在するという点を除いては同じです。The HTTP-based API is identical except that it now exists under a new namespace. このため、HTTP ベースの操作 (create_queuedelete_queue など) のみを使用する場合は、現時点でアップグレードを行うメリットはありません。For this reason if you only wish to use HTTP-based operations (create_queue, delete_queue etc) - there will be no additional benefit in upgrading at this time.

コードを新しいバージョンに移行する方法How do I migrate my code to the new version?

v0.21.0 に対して作成したコードは、次のようにインポートの名前空間を変更するだけで、バージョン 0.50.0 に移植できます。Code written against v0.21.0 can be ported to version 0.50.0 by simply changing the import namespace:

# from azure.servicebus import ServiceBusService  <- This will now raise an ImportError
from azure.servicebus.control_client import ServiceBusService

key_name = 'RootManageSharedAccessKey' # SharedAccessKeyName from Azure portal
key_value = '' # SharedAccessKey from Azure portal
sbs = ServiceBusService(service_namespace,
                        shared_access_key_name=key_name,
                        shared_access_key_value=key_value)

HTTP ベースの操作による従来の API の使用Using HTTP-based operations of the legacy API

以下の説明は従来の API についてのもので、追加変更を加えずに既存のコードを v0.50.0 に移植することを希望するユーザーを対象としています。The following documentation describes the legacy API and should be used for those wishing to port existing code to v0.50.0 without making any additional changes. このリファレンスは、v0.21.1 を使用しているユーザーもガイダンスとしても使用できます。This reference can also be used as guidance by those using v0.21.1. 新しいコードを作成するユーザーについては、上で説明した新しい API の使用をお勧めします。For those writing new code, we recommend using the new API described above.

Service Bus キューService Bus queues

Shared Access Signature (SAS) 認証Shared Access Signature (SAS) authentication

Shared Access Signature 認証を使用するには、次のコードで Service Bus サービスを作成します。To use Shared Access Signature authentication, create the service bus service with:

from azure.servicebus.control_client import ServiceBusService

key_name = 'RootManageSharedAccessKey' # SharedAccessKeyName from Azure portal
key_value = '' # SharedAccessKey from Azure portal
sbs = ServiceBusService(service_namespace,
                        shared_access_key_name=key_name,
                        shared_access_key_value=key_value)

Azure Access Control Service (ACS) 認証Access Control Service (ACS) authentication

新しい Service Bus 名前空間では、ACS はサポートされていません。ACS is not supported on new Service Bus namespaces. アプリケーションを SAS 認証に移行することをお勧めします。We recommend migrating applications to SAS authentication. 以前の Service Bus 名前空間内で ACS 認証を使用するには、以下を使用して ServiceBusService を作成します。To use ACS authentication within an older Service Bus namesapce, create the ServiceBusService with:

from azure.servicebus.control_client import ServiceBusService

account_key = '' # DEFAULT KEY from Azure portal
issuer = 'owner' # DEFAULT ISSUER from Azure portal
sbs = ServiceBusService(service_namespace,
                        account_key=account_key,
                        issuer=issuer)

メッセージの送受信Sending and receiving messages

キューは、create_queue メソッドを使用して作成できます。The create_queue method can be used to ensure a queue exists:

sbs.create_queue('taskqueue')

そのキューにメッセージを挿入するには、send_queue_message メソッドを呼び出します。The send_queue_message method can then be called to insert the message into the queue:

from azure.servicebus.control_client import Message

msg = Message('Hello World!')
sbs.send_queue_message('taskqueue', msg)

send_queue_message_batch メソッドを使用すると、複数のメッセージを一度に送信することができます。The send_queue_message_batch method can then be called to send several messages at once:

from azure.servicebus.control_client import Message

msg1 = Message('Hello World!')
msg2 = Message('Hello World again!')
sbs.send_queue_message_batch('taskqueue', [msg1, msg2])

その後、receive_queue_message メソッドを呼び出すことで、メッセージをデキューすることができます。It is then possible to call the receive_queue_message method to dequeue the message.

msg = sbs.receive_queue_message('taskqueue')

Service Bus トピックService Bus topics

サーバー側のトピックは、create_topic メソッドを使用して作成できます。The create_topic method can be used to create a server-side topic:

sbs.create_topic('taskdiscussion')

トピックには、send_topic_message メソッドを使用してメッセージを送信できます。The send_topic_message method can be used to send a message to a topic:

from azure.servicebus.control_client import Message

msg = Message(b'Hello World!')
sbs.send_topic_message('taskdiscussion', msg)

send_topic_message_batch メソッドを使用すると、複数のメッセージを一度に送信することができます。The send_topic_message_batch method can be used to send several messages at once:

from azure.servicebus.control_client import Message

msg1 = Message(b'Hello World!')
msg2 = Message(b'Hello World again!')
sbs.send_topic_message_batch('taskdiscussion', [msg1, msg2])

Python 3 では文字列型のメッセージが UTF-8 でエンコーディングされますが、Python 2 ではエンコーディングを独自に行う必要があることに注意してください。Please consider that in Python 3 a str message will be utf-8 encoded and you should have to manage your encoding yourself in Python 2.

その後クライアントは、create_subscription メソッドを呼び出した後、receive_subscription_message メソッドを呼び出すことで、サブスクリプションを作成し、メッセージを取り出すことができます。A client can then create a subscription and start consuming messages by calling the create_subscription method followed by the receive_subscription_message method. サブスクリプションが作成される前に送信されたメッセージは受信されないので注意してください。Please note that any messages sent before the subscription is created will not be received.

from azure.servicebus.control_client import Message

sbs.create_subscription('taskdiscussion', 'client1')
msg = Message('Hello World!')
sbs.send_topic_message('taskdiscussion', msg)
msg = sbs.receive_subscription_message('taskdiscussion', 'client1')

イベント ハブEvent Hub

Event Hubs を使用すると、デバイスとサービスの多様なセットから高速でイベント ストリームを収集できます。Event Hubs enable the collection of event streams at high throughput, from a diverse set of devices and services.

イベント ハブは、create_event_hub メソッドを使用して作成できます。The create_event_hub method can be used to create an event hub:

sbs.create_event_hub('myhub')

イベントを送信するには、次のようにします。To send an event:

sbs.send_event('myhub', '{ "DeviceId":"dev-01", "Temperature":"37.0" }')

イベントの内容は、複数のメッセージを含んだ JSON エンコード文字列またはイベント メッセージです。The event content is the event message or JSON-encoded string that contains multiple messages.

高度な機能Advanced features

ブローカーのプロパティとユーザーのプロパティBroker properties and user properties

ここでは、こちらに定義されているブローカーのプロパティとユーザーのプロパティの使い方について説明します。This section describes how to use Broker and User properties defined here:

sent_msg = Message(b'This is the third message',
                   broker_properties={'Label': 'M3'},
                   custom_properties={'Priority': 'Medium',
                                      'Customer': 'ABC'}
            )

datetime、int、float、boolean を使用することができます。You can use datetime, int, float or boolean

props = {'hello': 'world',
         'number': 42,
         'active': True,
         'deceased': False,
         'large': 8555111000,
         'floating': 3.14,
         'dob': datetime(2011, 12, 14),
         'double_quote_message': 'This "should" work fine',
         'quote_message': "This 'should' work fine"}
sent_msg = Message(b'message with properties', custom_properties=props)

このライブラリの旧バージョンとの互換性を確保するために、broker_properties を JSON 文字列として定義してもかまいません。For compatibility reason with old version of this library, broker_properties could also be defined as a JSON string. その場合は、有効な JSON 文字列をご自身で確実に記述してください。RestAPI を送信する前に、Python によるチェックは実行されません。If this situation, you're responsible to write a valid JSON string, no check will be made by Python before sending to the RestAPI.

broker_properties = '{"ForcePersistence": false, "Label": "My label"}'
sent_msg = Message(b'receive message',
                   broker_properties = broker_properties
)

次の手順Next Steps