Azure Service Bus トピックへのメッセージ送信とトピックのサブスクリプションからのメッセージ受信 (Java)

このクイックスタートでは、Azure Service Bus トピックにメッセージを送信してそのトピックのサブスクリプションからメッセージを受信する Java コードを、azure-messaging-servicebus パッケージを使用して作成します。

注意

このクイック スタートでは、メッセージのバッチを Service Bus トピックに送信し、それらのメッセージをトピックのサブスクリプションから受信するという単純なシナリオの手順を説明します。 Azure Service Bus の事前構築済みの Java サンプルが、GitHub の Azure SDK for Java リポジトリに用意されています。

ヒント

Spring アプリケーションで Azure Service Bus リソースを操作している場合は、Spring Cloud Azure を代替手段として検討することをお勧めします。 Spring Cloud Azure は、Spring と Azure サービスのシームレスな統合を実現するオープンソース プロジェクトです。 Spring Cloud Azure の詳細と Service Bus の使用例については、「Azure Service Bus を使用する Spring Cloud Stream with」を参照してください。

前提条件

Azure Portal での名前空間の作成

Azure の Service Bus メッセージング エンティティを使用するには、Azure 全体で一意となる名前を備えた名前空間を最初に作成しておく必要があります。 名前空間により、ご利用のアプリケーション内に Service Bus リソース (キュー、トピックなど) 用のスコープ コンテナーが提供されます。

名前空間を作成するには:

  1. Azure portal にサインインします。

  2. [すべてのサービス] ページに移動します。

  3. 左側のナビゲーション バーで、カテゴリの一覧から [統合] を選択し、[Service Bus] 上にマウス ポインターを置き、[Service Bus] タイルの [+] ボタンを選択します。

    Image showing selection of Create a resource, Integration, and then Service Bus in the menu.

  4. [名前空間の作成] ページの [基本] タブで、こちらの手順を実行します。

    1. [サブスクリプション] で、名前空間を作成する Azure サブスクリプションを選択します。

    2. [リソース グループ] で、名前空間を追加する既存のリソース グループを選択するか、新しいリソース グループを作成します。

    3. 名前空間の名前を入力します。 名前空間名は次の名前付け規則に従う必要があります。

      • この名前は Azure 全体で一意である必要があります。 その名前が使用できるかどうかがすぐに自動で確認されます。
      • 名前の長さは 6 ~ 50 文字である。
      • この名前には、文字、数字、ハイフン "-" のみを含めることができます。
      • 名前の先頭は文字、末尾は文字または数字にする必要があります。
      • 名前の末尾は “-sb“ または “-mgmt“ にはできません。
    4. [場所] で、名前空間をホストするリージョンを選択します。

    5. [価格レベル] で、名前空間の価格レベル (Basic、Standard、Premium) を選択します。 このクイック スタートでは、 [Standard] を選択します。

      重要

      トピックとサブスクリプションを使用する場合は、Standard または Premium を選択してください。 Basic 価格レベルでは、トピックとサブスクリプションはサポートされていません。

      [Premium] 価格レベルを選択した場合は、メッセージング ユニットの数を指定します。 Premium レベルでは、各ワークロードが分離した状態で実行されるように、CPU とメモリのレベルでリソースが分離されます。 このリソースのコンテナーをメッセージング ユニットと呼びます。 Premium 名前空間には、少なくとも 1 つのメッセージング ユニットがあります。 Service Bus の Premium 名前空間ごとに、1 個、2 個、4 個、8 個、または 16 個のメッセージング ユニットを選択できます。 詳細については、Service Bus の Premium メッセージングに関するページをご覧ください。

    6. ページ下部にある [確認と作成] を選択します。

      Image showing the Create a namespace page

    7. [確認および作成] ページで、設定を確認し、 [作成] を選択します。

  5. リソースのデプロイが成功したら、デプロイ ページで [リソースに移動] を選択します。

    Image showing the deployment succeeded page with the Go to resource link.

  6. Service Bus 名前空間のホーム ページが表示されます。

    Image showing the home page of the Service Bus namespace created.

Azure Portal を使用したトピックの作成

  1. [Service Bus 名前空間] ページで、左側のメニューの [トピック] を選択します。

  2. ツール バーの [+ トピック] を選択します。

  3. トピックの名前を入力します。 他のオプションは既定値のままにしてください。

  4. [作成] を選択します

    Image showing the Create topic page.

トピックに対するサブスクリプションの作成

  1. 前のセクションで作成したトピックを選択します。

    Image showing the selection of topic from the list of topics.

  2. [Service Bus トピック] ページで、ツール バーの [+ サブスクリプション] を選択します。

    Image showing the Add subscription button.

  3. [サブスクリプションの作成] ページで、次の手順に従います。

    1. サブスクリプションの名前として「S1」と入力します。

    2. [最大配信数] に「3」と入力します。

    3. 次に、 [作成] を選択してサブスクリプションを作成します。

      Image showing the Create subscription page.

Azure に対してアプリを認証する

このクイック スタートでは、Azure Service Bus に接続する 2 つの方法である、パスワードレス接続文字列について説明します。

最初のオプションでは、Microsoft Entra ID とロールベースのアクセス制御 (RBAC) でセキュリティ プリンシパルを使用して Service Bus 名前空間に接続する方法を示します。 コードや構成ファイル、または Azure Key Vault などのセキュリティで保護されたストレージに、ハードコーディングされた接続文字列を含める心配はありません。

2 番目のオプションでは、接続文字列を使用して Service Bus 名前空間に接続する方法を示します。 Azure を初めて使用する場合は、接続文字列オプションの方が理解しやすいかもしれません。 実際のアプリケーションと運用環境では、パスワードレス オプションを使用することをお勧めします。 詳細については、「認証と承認」を参照してください。 パスワードレス認証の詳細については、概要ページを参照してください。

Microsoft Entra ユーザーにロールを割り当てる

ローカルでの開発時には、Azure Service Bus に接続するユーザー アカウントに正しいアクセス許可があることを確認してください。 メッセージを送受信するには、Azure Service Bus データ所有者ロールが必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write アクションを含む別のロールが必要です。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 ロールの割り当てに使用できるスコープの詳細は、スコープの概要ページを参照してください。

次の例では、ユーザー アカウントに、Azure Service Bus Data Owner ロールを割り当てます。これにより、Azure Service Bus リソースへのフル アクセスが提供されます。 実際のシナリオでは、より安全な運用環境を実現するため、最小限の特権の原則に従って、必要な最小限のアクセス許可のみをユーザーに付与します。

Azure Service Bus 用の Azure 組み込みロール

Azure Service Bus の場合、名前空間およびそれに関連するすべてのリソースの Azure portal および Azure リソース管理 API による管理は、Azure RBAC モデルを使って既に保護されています。 Azure では、Service Bus 名前空間へのアクセスを承認するための次の Azure 組み込みロールが提供されています。

  • Azure Service Bus データ所有者:Service Bus 名前空間とそのエンティティ (キュー、トピック、サブスクリプション、およびフィルター) へのデータ アクセスが可能です。 このロールのメンバーは、キューまたはトピックやサブスクリプションとの間でメッセージを送受信できます。
  • Azure Service Bus データ送信者: このロールを使用して、Service Bus 名前空間とそのエンティティへの送信アクセスを許可します。
  • Azure Service Bus データ受信者: このロールを使用して、Service Bus 名前空間とそのエンティティへの受信アクセスを許可します。

カスタム ロールを作成する場合は、Service Bus 操作に必要な権限に関するページを参照してください。

Microsoft Entra ユーザーを Azure Service Bus 所有者ロールに追加する

Microsoft Entra ユーザー名を、Service Bus 名前空間レベルの Azure Service Bus データ所有者ロール に追加します。 これにより、ユーザー アカウントのコンテキストで実行されているアプリがキューまたはトピックにメッセージを送信し、キューまたはトピックのサブスクリプションからメッセージを受信できるようになります。

重要

ほとんどの場合、ロールの割り当てが Azure に反映されるまでの時間は 1、2 分です。 まれに、最大 8 分かかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。

  1. Azure portal で Service Bus 名前空間ページが開いていない場合は、メイン検索バーまたは左側のナビゲーションを使用して Service Bus 名前空間を見つけます。

  2. 概要ページで、左側のメニューから [アクセス制御 (IAM)] を選択します。

  3. [アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。

  4. 上部のメニューから [+ 追加] を選択し、次に結果のドロップダウン メニューから [ロールの割り当ての追加] を選択します。

    A screenshot showing how to assign a role.

  5. 検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、Azure Service Bus Data Owner を検索して一致する結果を選択します。 [次へ] を選びます。

  6. [アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。

  7. ダイアログで、自分の Microsoft Entra ユーザー名 (通常は user@domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。

  8. [レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。

メッセージをトピックに送信する

このセクションでは、Java コンソール プロジェクトを作成し、既に作成してあるトピックにメッセージを送信するコードを追加します。

Java コンソール プロジェクトを作成する

Eclipse または任意のツールを使用して Java プロジェクトを作成します。

Service Bus を使用するようにアプリケーションを構成する

Azure Core ライブラリおよび Azure Service Bus ライブラリへの参照を追加します。

Eclipse を使用して Java コンソール アプリケーションを作成した場合は、Java プロジェクトを Maven に変換します。[パッケージ エクスプローラー] ウィンドウでプロジェクトを右クリックし、[構成]->[Maven プロジェクトへの変換] を選択します。 その後、これら 2 つのライブラリへの依存関係を追加します。その例を次に示します。

pom.xml ファイルを更新して、Azure Service Bus と Azure ID パッケージに依存関係を追加します。

    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.13.3</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-identity</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

トピックにメッセージを送信するためのコードを追加する

  1. 次の import ステートメントを Java ファイルの冒頭に追加します。

    import com.azure.messaging.servicebus.*;
    import com.azure.identity.*;
    
    import java.util.concurrent.TimeUnit;
    import java.util.Arrays;
    import java.util.List;
    
  2. クラスで、接続文字列を保持する変数 (パスワードレス シナリオでは不要)、トピック名、サブスクリプション名を定義します。

    static String topicName = "<TOPIC NAME>";
    static String subName = "<SUBSCRIPTION NAME>";
    

    重要

    <TOPIC NAME> をトピックの名前に置き換え、<SUBSCRIPTION NAME> をトピックのサブスクリプションの名前に置き換えます。

  3. 1 つのメッセージをトピックに送信するためのメソッドを、sendMessage という名前でこのクラスに追加します。

    重要

    NAMESPACENAME を、実際の Service Bus 名前空間の名前に置き換えます。

    static void sendMessage()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .sender()
                .topicName(topicName)
                .buildClient();
    
        // send one message to the topic
        senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
        System.out.println("Sent a single message to the topic: " + topicName);
    }
    
    
  4. 一連のメッセージを作成するためのメソッドを、createMessages という名前でこのクラスに追加します。 通常、これらのメッセージはアプリケーションのさまざまな部分から届きます。 ここでは、次のように一連のサンプル メッセージを作成します。

    static List<ServiceBusMessage> createMessages()
    {
        // create a list of messages and return it to the caller
        ServiceBusMessage[] messages = {
                new ServiceBusMessage("First message"),
                new ServiceBusMessage("Second message"),
                new ServiceBusMessage("Third message")
        };
        return Arrays.asList(messages);
    }
    
  5. 作成したトピックにメッセージを送信するためのメソッドを、sendMessageBatch という名前で追加します。 このメソッドは、トピックの ServiceBusSenderClient を作成し、createMessages メソッドを呼び出して一連のメッセージを取得した後、1 つまたは複数のバッチを用意して、そのバッチをトピックに送信します。

    重要

    NAMESPACENAME を、実際の Service Bus 名前空間の名前に置き換えます。

    static void sendMessageBatch()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .sender()
                .topicName(topicName)
                .buildClient();
    
        // Creates an ServiceBusMessageBatch where the ServiceBus.
        ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();
    
        // create a list of messages
        List<ServiceBusMessage> listOfMessages = createMessages();
    
        // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when
        // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all
        // messages are sent.
        for (ServiceBusMessage message : listOfMessages) {
            if (messageBatch.tryAddMessage(message)) {
                continue;
            }
    
            // The batch is full, so we create a new batch and send the batch.
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the topic: " + topicName);
    
            // create a new batch
            messageBatch = senderClient.createMessageBatch();
    
            // Add that message that we couldn't before.
            if (!messageBatch.tryAddMessage(message)) {
                System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes());
            }
        }
    
        if (messageBatch.getCount() > 0) {
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the topic: " + topicName);
        }
    
        //close the client
        senderClient.close();
    }
    

サブスクリプションからメッセージを受信する

このセクションでは、トピックのサブスクリプションからメッセージを取得するコードを追加します。

  1. サブスクリプションからメッセージを受信するメソッドを、receiveMessages という名前で追加します。 このメソッドは、メッセージを処理するためのハンドラーとエラーを処理するためのハンドラーを指定してサブスクリプションの ServiceBusProcessorClient を作成します。 次に、プロセッサを起動して数秒間待機し、受信したメッセージを出力した後、プロセッサを停止して終了します。

    重要

    • NAMESPACENAME を、実際の Service Bus 名前空間の名前に置き換えます。
    • コードに出現する ServiceBusTopicTest::processMessageServiceBusTopicTest の部分は、実際のクラスの名前に置き換えてください。
    // handles received messages
    static void receiveMessages() throws InterruptedException
    {
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        // Create an instance of the processor through the ServiceBusClientBuilder
        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
            .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
            .credential(credential)
            .processor()
            .topicName(topicName)
            .subscriptionName(subName)
            .processMessage(context -> processMessage(context))
            .processError(context -> processError(context))
            .buildProcessorClient();
    
        System.out.println("Starting the processor");
        processorClient.start();
    
        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping and closing the processor");
        processorClient.close();
    }
    
  2. Service Bus サブスクリプションから受信したメッセージを処理するための processMessage メソッドを追加します。

    private static void processMessage(ServiceBusReceivedMessageContext context) {
        ServiceBusReceivedMessage message = context.getMessage();
        System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
            message.getSequenceNumber(), message.getBody());
    }
    
  3. エラー メッセージを処理するための processError メソッドを追加します。

    private static void processError(ServiceBusErrorContext context) {
        System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
            context.getFullyQualifiedNamespace(), context.getEntityPath());
    
        if (!(context.getException() instanceof ServiceBusException)) {
            System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException());
            return;
        }
    
        ServiceBusException exception = (ServiceBusException) context.getException();
        ServiceBusFailureReason reason = exception.getReason();
    
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
            || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
            || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n",
                reason, exception.getMessage());
        } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            System.out.printf("Message lock lost for message: %s%n", context.getException());
        } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
            try {
                // Choosing an arbitrary amount of time to wait until trying again.
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                System.err.println("Unable to sleep for period of time");
            }
        } else {
            System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(),
                reason, context.getException());
        }
    }
    
  4. sendMessagesendMessageBatchreceiveMessages の各メソッドを呼び出し、InterruptedException をスローするように main メソッドを更新します。

    public static void main(String[] args) throws InterruptedException {
        sendMessage();
        sendMessageBatch();
        receiveMessages();
    }
    

アプリを実行する

プログラムを実行すると、次の出力に似た出力が表示されます。

  1. Eclipse を使用している場合は、プロジェクトを右クリックし、[エクスポート] を選択し、[Java] を展開して、[実行可能 JAR ファイル] を選択して、実行可能 JAR ファイルを作成する手順に従います。

  2. Azure Service Bus データ所有者ロールに追加されたユーザー アカウントとは異なるユーザー アカウントを使ってマシンにサインインしている場合は、以下の手順に従います。 それ以外の場合は、この手順をスキップし、次の手順で Jar ファイルを実行します。

    1. お使いのマシンに Azure CLI をインストールします。

    2. 次の CLI コマンドを使用して、Azure にサインインします。 Azure Service Bus データ所有者ロールに追加したのと同じユーザー アカウントを使用します。

      az login
      
  3. 次のコマンドを使用して Jar ファイルを実行します。

    java -jar <JAR FILE NAME>
    
  4. コンソール ウィンドウに次の出力が表示されます。

    Sent a single message to the topic: mytopic
    Sent a batch of messages to the topic: mytopic
    Starting the processor
    Processing message. Session: e0102f5fbaf646988a2f4b65f7d32385, Sequence #: 1. Contents: Hello, World!
    Processing message. Session: 3e991e232ca248f2bc332caa8034bed9, Sequence #: 2. Contents: First message
    Processing message. Session: 56d3a9ea7df446f8a2944ee72cca4ea0, Sequence #: 3. Contents: Second message
    Processing message. Session: 7bd3bd3e966a40ebbc9b29b082da14bb, Sequence #: 4. Contents: Third message
    

Azure portal の Service Bus 名前空間の [概要] ページで、受信メッセージ数と送信メッセージ数を確認できます。 1 分ほど待ってからページを更新すると、最新の値が表示されます。

Incoming and outgoing message count

下部中央のペインの [トピック] タブに切り替えてトピックを選択すると、そのトピックの [Service Bus トピック] ページが表示されます。 このページの [メッセージ] グラフに 4 つの受信メッセージと 4 つの送信メッセージが確認できると思います。

Incoming and outgoing messages

main メソッドの receiveMessages 呼び出しをコメント アウトして再度アプリを実行した場合、 [Service Bus トピック] ページには、8 つの受信メッセージ (うち 4 つは新規) が表示されますが、送信メッセージは 4 つと表示されます。

Updated topic page

このページでいずれかのサブスクリプションを選択すると、その [Service Bus Subscription](Service Bus サブスクリプション) ページが表示されます。 このページで、アクティブなメッセージ数や配信不能メッセージ数を確認できます。 この例では、受信者がまだ受け取っていないアクティブなメッセージが 4 つ存在します。

Active message count

次のステップ

次のドキュメントおよびサンプルを参照してください。