Azure Service Bus キューとの間でメッセージを送受信する (Python)

このチュートリアルでは、次の手順を実行します。

  1. Azure Portal を使用して Service Bus 名前空間を作成する。
  2. Azure Portal を使用して Service Bus キューを作成する。
  3. 次の目的で、azure-servicebus パッケージを使用する Python コードを記述する。
    1. メッセージのセットをキューに送信する。
    2. キューからそれらのメッセージを受信する。

注意

このクイック スタートでは、メッセージを Service Bus キューに送信し、それらを受信するという簡単なシナリオでのステップ バイ ステップの手順を説明します。 Azure Service Bus の事前構築済みの JavaScript および TypeScript サンプルが、GitHub の Azure SDK for Python リポジトリに用意されています。

前提条件

このサービスを初めて使用する場合は、このクイックスタートを実行する前に、[Service Bus の概要](service-bus-messaging-overview.md)に関する記事を参照してください。

自分の Azure アカウントでこのクイックスタートを使用するには:

  • 開発者マシンにパスワードレス認証を提供する Azure CLI をインストールします。
  • ターミナルまたはコマンド プロンプトで、az login を使って Azure アカウントでサインインします。
  • リソースに適切なデータ ロールを追加するときは、同じアカウントを使います。
  • 同じターミナルまたはコマンド プロンプトでコードを実行します。
  • Service Bus 名前空間のキューの名前を書き留めてください。 コードでそれが必要になります。

注意

このチュートリアルでは、コピーして Python を使用して実行できるサンプルを扱います。 Python アプリケーションを作成する手順については、Python アプリケーションを作成して Azure Web サイトにデプロイする方法に関する記事を参照してください。 このチュートリアルで使用するパッケージのインストールの詳細については、Python インストール ガイドを参照してください。

Azure Portal での名前空間の作成

Azure の Service Bus メッセージング エンティティを使用するには、Azure 全体で一意となる名前を備えた名前空間を最初に作成しておく必要があります。 名前空間により、ご利用のアプリケーション内に Service Bus リソース (キュー、トピックなど) 用のスコープ コンテナーが提供されます。

名前空間を作成するには:

  1. Azure portal にサインインします。

  2. [すべてのサービス] ページに移動します。

  3. 左側のナビゲーション バーで、カテゴリの一覧から [統合] を選択し、[Service Bus] 上にマウス ポインターを置き、[Service Bus] タイルの [+] ボタンを選択します。

    メニューでの [リソースの作成]、[統合]、[Service Bus] の選択を示す画像。

  4. [名前空間の作成] ページの [基本] タブで、こちらの手順を実行します。

    1. [サブスクリプション] で、名前空間を作成する Azure サブスクリプションを選択します。

    2. [リソース グループ] では、既存のリソース グループを選ぶか、新しく作成します。

    3. 名前空間の名前を入力します。 名前空間名は次の名前付け規則に従う必要があります。

      • この名前は Azure 全体で一意である必要があります。 その名前が使用できるかどうかがすぐに自動で確認されます。
      • 名前の長さは 6 ~ 50 文字である。
      • この名前には、文字、数字、ハイフン - のみを含めることができます。
      • 名前の先頭は文字、末尾は文字または数字にする必要があります。
      • 名前の末尾を -sb または -mgmt にすることはできません。
    4. [場所] で、名前空間をホストするリージョンを選択します。

    5. [価格レベル] で、名前空間の価格レベル (Basic、Standard、Premium) を選択します。 このクイック スタートでは、 [Standard] を選択します。

    6. Premium レベルを選んだ場合は、名前空間の geo レプリケーションを有効にできるかどうかを選びます。 geo レプリケーション機能を使用すると、名前空間のメタデータとデータが、プライマリ Azure リージョンから 1 つ以上のセカンダリ Azure リージョンに、継続的にレプリケートされることが保証されます。

      重要

      トピックとサブスクリプションを使用する場合は、Standard または Premium を選択してください。 Basic 価格レベルでは、トピックとサブスクリプションはサポートされていません。

      [Premium] 価格レベルを選択した場合は、メッセージング ユニットの数を指定します。 Premium レベルでは、各ワークロードが分離した状態で実行されるように、CPU とメモリのレベルでリソースが分離されます。 このリソースのコンテナーをメッセージング ユニットと呼びます。 Premium 名前空間には、少なくとも 1 つのメッセージング ユニットがあります。 Service Bus の Premium 名前空間ごとに、1 個、2 個、4 個、8 個、または 16 個のメッセージング ユニットを選択できます。 詳細については、Service Bus の Premium メッセージングに関するページをご覧ください。

    7. ページ下部にある [確認と作成] を選択します。

      [名前空間の作成] ページを示す画像

    8. [確認および作成] ページで、設定を確認し、 [作成] を選択します。

  5. リソースのデプロイが成功したら、デプロイ ページで [リソースに移動] を選択します。

    [リソースに移動] リンクが表示された、デプロイが成功したページを示す画像。

  6. Service Bus 名前空間のホーム ページが表示されます。

    作成された Service Bus 名前空間のホーム ページを示す画像。

Azure portal でキューを作成する

  1. [Service Bus 名前空間] ページで、左側のナビゲーション メニューの [エンティティ] を展開して、[キュー] を選びます。

  2. [キュー] ページで、ツール バーの [+ キュー] を選択します。

  3. キューの名前を入力し、他の値は既定値のままにします。

  4. ここで、 [作成] を選択します。

    [キューの作成] ページを示すスクリーンショット。

Azure に対してアプリを認証する

このクイック スタートでは、Azure Service Bus に接続する 2 つの方法である、パスワードレス接続文字列について説明します。

最初のオプションでは、Microsoft Entra ID とロールベースのアクセス制御 (RBAC) でセキュリティ プリンシパルを使用して Service Bus 名前空間に接続する方法を示します。 コードや構成ファイル、または Azure Key Vault などのセキュリティで保護されたストレージに、ハードコーディングされた接続文字列を含める心配はありません。

2 番目のオプションでは、接続文字列を使用して Service Bus 名前空間に接続する方法を示します。 Azure を初めて使用する場合は、接続文字列オプションの方が理解しやすいかもしれません。 実際のアプリケーションと運用環境では、パスワードレス オプションを使用することをお勧めします。 詳細については、「認証と承認」を参照してください。 パスワードレス認証の詳細については、概要ページを参照してください。

Microsoft Entra ユーザーにロールを割り当てる

ローカルでの開発時には、Azure Service Bus に接続するユーザー アカウントに正しいアクセス許可があることを確認してください。 メッセージを送受信するには、Azure Service Bus データ所有者ロールが必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write アクションを含む別のロールが必要です。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 ロールの割り当てに使用できるスコープの詳細は、スコープの概要ページを参照してください。

次の例では、ユーザー アカウントに、Azure Service Bus Data Owner ロールを割り当てます。これにより、Azure Service Bus リソースへのフル アクセスが提供されます。 実際のシナリオでは、より安全な運用環境を実現するため、最小限の特権の原則に従って、必要な最小限のアクセス許可のみをユーザーに付与します。

Azure Service Bus 用の Azure 組み込みロール

Azure Service Bus の場合、名前空間およびそれに関連するすべてのリソースの Azure portal および Azure リソース管理 API による管理は、Azure RBAC モデルを使って既に保護されています。 Azure では、Service Bus 名前空間へのアクセスを承認するための次の Azure 組み込みロールが提供されています。

  • Azure Service Bus データ所有者:Service Bus 名前空間とそのエンティティ (キュー、トピック、サブスクリプション、およびフィルター) へのデータ アクセスが可能です。 このロールのメンバーは、キューまたはトピックやサブスクリプションとの間でメッセージを送受信できます。
  • Azure Service Bus データ送信者: このロールを使用して、Service Bus 名前空間とそのエンティティへの送信アクセスを許可します。
  • Azure Service Bus データ受信者: このロールを使用して、Service Bus 名前空間とそのエンティティへの受信アクセスを許可します。

カスタム ロールを作成する場合は、Service Bus 操作に必要な権限に関するページを参照してください。

Microsoft Entra ユーザーを Azure Service Bus 所有者ロールに追加する

Microsoft Entra ユーザー名を、Service Bus 名前空間レベルの Azure Service Bus データ所有者ロール に追加します。 これにより、ユーザー アカウントのコンテキストで実行されているアプリがキューまたはトピックにメッセージを送信し、キューまたはトピックのサブスクリプションからメッセージを受信できるようになります。

重要

ほとんどの場合、ロールの割り当てが Azure に反映されるまでの時間は 1、2 分です。 まれに、最大 8 分かかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。

  1. Azure portal で Service Bus 名前空間ページが開いていない場合は、メイン検索バーまたは左側のナビゲーションを使用して Service Bus 名前空間を見つけます。

  2. 概要ページで、左側のメニューから [アクセス制御 (IAM)] を選択します。

  3. [アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。

  4. 上部のメニューから [+ 追加] を選択し、次に結果のドロップダウン メニューから [ロールの割り当ての追加] を選択します。

    ロールを割り当てる方法を示すスクリーンショット。

  5. 検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、Azure Service Bus Data Owner を検索して一致する結果を選択します。 [次へ] を選びます。

  6. [アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。

  7. ダイアログで、自分の Microsoft Entra ユーザー名 (通常は user@domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。

  8. [レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。

pip を使用してパッケージをインストールする

  1. この Service Bus のチュートリアルに必要な Python パッケージをインストールするには、パスに Python が存在するコマンド プロンプトを開き、サンプルを格納するフォルダーにディレクトリを変更します。

  2. 次のパッケージをインストールします。

    pip install azure-servicebus
    pip install azure-identity
    pip install aiohttp
    

メッセージをキューに送信する

次のサンプル コードは、キューにメッセージを送信する方法を示しています。 Visual Studio Code などのお気に入りのエディターを開き、send.py ファイルを作成し、それに次のコードを追加します。

  1. import ステートメントを追加します。

    import asyncio
    from azure.servicebus.aio import ServiceBusClient
    from azure.servicebus import ServiceBusMessage
    from azure.identity.aio import DefaultAzureCredential
    
  2. 定数を追加し、資格情報を定義します。

    FULLY_QUALIFIED_NAMESPACE = "FULLY_QUALIFIED_NAMESPACE"
    QUEUE_NAME = "QUEUE_NAME"
    
    credential = DefaultAzureCredential()
    

    重要

    • FULLY_QUALIFIED_NAMESPACE を Service Bus 名前空間の完全修飾名前空間に置き換えます。
    • QUEUE_NAME をキューの名前に置き換えます。
  3. 単一のメッセージを送信するためのメソッドを追加します。

    async def send_single_message(sender):
        # Create a Service Bus message and send it to the queue
        message = ServiceBusMessage("Single Message")
        await sender.send_messages(message)
        print("Sent a single message")
    

    sender は、作成したキューのクライアントとしての役割を担うオブジェクトです。 それを後で作成し、引数としてこの関数に送信することになります。

  4. 一連のメッセージを送信するためのメソッドを追加します。

    async def send_a_list_of_messages(sender):
        # Create a list of messages and send it to the queue
        messages = [ServiceBusMessage("Message in list") for _ in range(5)]
        await sender.send_messages(messages)
        print("Sent a list of 5 messages")
    
  5. メッセージのバッチを送信するためのメソッドを追加します。

    async def send_batch_message(sender):
        # Create a batch of messages
        async with sender:
            batch_message = await sender.create_message_batch()
            for _ in range(10):
                try:
                    # Add a message to the batch
                    batch_message.add_message(ServiceBusMessage("Message inside a ServiceBusMessageBatch"))
                except ValueError:
                    # ServiceBusMessageBatch object reaches max_size.
                    # New ServiceBusMessageBatch object can be created here to send more data.
                    break
            # Send the batch of messages to the queue
            await sender.send_messages(batch_message)
        print("Sent a batch of 10 messages")
    
  6. Service Bus クライアントを作成し、メッセージを送信するためのキューの sender オブジェクトを作成します。

    async def run():
        # create a Service Bus client using the credential
        async with ServiceBusClient(
            fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
            credential=credential,
            logging_enable=True) as servicebus_client:
            # get a Queue Sender object to send messages to the queue
            sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
            async with sender:
                # send one message
                await send_single_message(sender)
                # send a list of messages
                await send_a_list_of_messages(sender)
                # send a batch of messages
                await send_batch_message(sender)
    
            # Close credential when no longer needed.
            await credential.close()
    
  7. run メソッドを呼び出し、メッセージを出力します。

    asyncio.run(run())
    print("Done sending messages")
    print("-----------------------")
    

キューからメッセージを受信する

次のサンプル コードは、キューからメッセージを受信する方法を示しています。 示されているコードは、新しいメッセージを 5 (max_wait_time) 秒間受信しなくなるまで、新しいメッセージを受信します。

Visual Studio Code などのお気に入りのエディターを開き、revc.py ファイルを作成し、それに次のコードを追加します。

  1. 送信サンプルと同様に、import ステートメントを追加し、独自の値に置き換える定数を定義し、資格情報を定義します。

    import asyncio
    
    from azure.servicebus.aio import ServiceBusClient
    from azure.identity.aio import DefaultAzureCredential
    
    FULLY_QUALIFIED_NAMESPACE = "FULLY_QUALIFIED_NAMESPACE"
    QUEUE_NAME = "QUEUE_NAME"
    
    credential = DefaultAzureCredential()
    
  2. Service Bus クライアントを、次にメッセージを受信するためのキューの receiver オブジェクトを作成します。

    async def run():
        # create a Service Bus client using the connection string
        async with ServiceBusClient(
            fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
            credential=credential,
            logging_enable=True) as servicebus_client:
    
            async with servicebus_client:
                # get the Queue Receiver object for the queue
                receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
                async with receiver:
                    received_msgs = await receiver.receive_messages(max_wait_time=5, max_message_count=20)
                    for msg in received_msgs:
                        print("Received: " + str(msg))
                        # complete the message so that the message is removed from the queue
                        await receiver.complete_message(msg)
    
            # Close credential when no longer needed.
            await credential.close()
    
  3. run メソッドを呼び出します。

    asyncio.run(run())
    

アプリを実行する

パスに Python が存在するコマンド プロンプトを開き、コードを実行してキューからメッセージを送受信します。

python send.py; python recv.py

次の出力が表示されます。

Sent a single message
Sent a list of 5 messages
Sent a batch of 10 messages
Done sending messages
-----------------------
Received: Single Message
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch

Azure portal で、使用する Service Bus 名前空間に移動します。 [概要] ページで、受信メッセージ数と送信メッセージ数が 16 になっていることを確認します。 これらのカウントが表示されない場合は、数分待ってから、ページを最新の情報に更新してください。

受信メッセージ数と送信メッセージ数

この [概要] ページでキューを選択して、 [Service Bus キュー] ページに移動します。 受信メッセージ数と送信メッセージ数は、このページで確認することもできます。 加えて、キューの現在のサイズアクティブなメッセージ数など、他の情報も表示されます。

キューの詳細

次の手順

次のドキュメントおよびサンプルを参照してください。