Java 用Azure Service Bus クライアント ライブラリ - バージョン 7.14.4

Microsoft Azure Service Bus は、完全なマネージド エンタープライズ統合メッセージ ブローカーです。 Service Bus は、アプリケーションとサービスを分離できます。 Service Bus は、データと状態の非同期転送のための信頼性が高く安全なプラットフォームを提供します。 データは、メッセージを使用してさまざまなアプリとサービス間で転送されます。 Azure Service Busの詳細については、「Service Bus とは」を参照してください。

Azure Service Bus クライアント ライブラリでは、Azure Service Bus メッセージの送受信が可能であり、次の目的で使用できます。

  • 販売または購入の注文、仕訳帳、在庫移動などのビジネス データを転送します。
  • アプリケーションを分離して、アプリケーションとサービスの信頼性とスケーラビリティを向上させます。 クライアントとサービスを同時にオンラインにする必要はありません。
  • 公開元とサブスクライバーの間で 1:n の関係が可能になります。
  • メッセージの順序付けやメッセージの遅延が必要なワークフローを実装します。

ソースコード | API リファレンス ドキュメント | 製品ドキュメント | サンプル | パッケージ (Maven)

作業の開始

前提条件

Azure で必要な Service Bus リソースをすばやく作成し、それらの接続文字列を受け取るために、次をクリックしてサンプル テンプレートをデプロイできます。

パッケージを組み込む

BOM ファイルを含める

ライブラリの一般提供 (GA) バージョンに依存するには、azure-sdk-bom をプロジェクトに含めてください。 次のスニペットでは、{bom_version_to_target} プレースホルダーをバージョン番号に置き換えます。 BOM の詳細については、 AZURE SDK BOM README に関するページを参照してください。

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

次に、バージョン タグのない依存関係セクションに直接依存関係を含めます。

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
  </dependency>
</dependencies>

直接依存関係を含める

BOM に存在しないライブラリの特定のバージョンに依存する場合は、次のように直接依存関係をプロジェクトに追加します。

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
    <version>7.14.4</version>
</dependency>

クライアントを認証する

Service Bus クライアント ライブラリが Service Bus と対話するには、それに接続して承認する方法を理解する必要があります。

接続文字列を使用して Service Bus クライアントを作成する

認証の最も簡単な方法は、Service Bus 名前空間の作成時に自動的に作成される接続文字列を使用することです。 Azure の共有アクセス ポリシーに慣れていない場合は、ステップ バイ ステップ ガイドに従って Service Bus 接続文字列を取得できます。

非同期および同期 Service Bus の送信側クライアントと受信側クライアントの両方が を使用して ServiceBusClientBuilderインスタンス化されます。 次のスニペットでは、それぞれ同期 Service Bus 送信者と非同期レシーバーが作成されます。

ServiceBusSenderClient sender = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .sender()
    .queueName("<< QUEUE NAME >>")
    .buildClient();
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .receiver()
    .topicName("<< TOPIC NAME >>")
    .subscriptionName("<< SUBSCRIPTION NAME >>")
    .buildAsyncClient();

Microsoft Id プラットフォーム (旧称 Azure Active Directory) を使用して Service Bus クライアントを作成する

Azure SDK for Java では Azure Identity パッケージがサポートされているため、Microsoft ID プラットフォームから資格情報を簡単に取得できます。 まず、パッケージを追加します。

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-identity</artifactId>
    <version>1.10.1</version>
</dependency>
  • 既知の問題: クライアント ライブラリの前azure-identityに、pom.xml ファイルが一覧表示azure-messaging-servicebusされます。 この問題は、 で azure-identity:1.2.1解決されています。 詳細については、こちらを参照してください。

資格情報を要求するために実装された方法は、パッケージの com.azure.identity.credential 下にあります。 次のサンプルは、Azure Active Directory (AAD) アプリケーション クライアント シークレットを使用して、Azure Service Busで承認する方法を示しています。

DefaultAzureCredential を使用した承認

承認は、 DefaultAzureCredential を使用するのが最も簡単です。 実行中の環境で使用するのに最適な資格情報が見つかります。 Service Bus での Azure Active Directory 承認の使用の詳細については、 関連するドキュメントを参照してください。

返されたトークン資格情報を使用して、クライアントを認証します。

TokenCredential credential = new DefaultAzureCredentialBuilder()
    .build();
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
    .credential("<<fully-qualified-namespace>>", credential)
    .receiver()
    .queueName("<<queue-name>>")
    .buildAsyncClient();

主要な概念

Service Bus 名前空間内のプライマリ リソースの種類とやり取りできます。その中には、複数のリソースが存在し、実際のメッセージ送信が行われます。 名前空間は、多くの場合、アプリケーション コンテナーとして機能します。

  • キューを使用すると、先入れ先出しで並べ替えられたメッセージの送受信を行うことができます。多くの場合、ポイントツーポイント通信に使用されます。
  • トピックは、パブリッシャーとサブスクライバーのシナリオに適しています。 トピックはメッセージをサブスクリプションに発行します。その中で、複数のメッセージが同時に存在する可能性があります。
  • サブスクリプションはトピックからメッセージを受信します。 各サブスクリプションは独立しており、トピックに送信されたメッセージのコピーを受け取ります。

Service Bus クライアント

ビルダー ServiceBusClientBuilder は、すべての Service Bus クライアントを作成するために使用されます。

メッセージを送信する

メッセージを送信するには、非同期 ServiceBusSenderAsyncClient または同期 ServiceBusSenderClient を作成する必要があります。 各送信者は、キューまたはトピックにメッセージを送信できます。

次のスニペットでは、メッセージをキューに発行するための同期 ServiceBusSenderClient が作成されます。

ServiceBusSenderClient sender = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .sender()
    .queueName("<< QUEUE NAME >>")
    .buildClient();
List<ServiceBusMessage> messages = Arrays.asList(
    new ServiceBusMessage("Hello world").setMessageId("1"),
    new ServiceBusMessage("Bonjour").setMessageId("2"));

sender.sendMessages(messages);

// When you are done using the sender, dispose of it.
sender.close();

メッセージを受信する

メッセージを受信するには、受信メッセージのコールバックと、プロセスで発生したエラーを含む を作成 ServiceBusProcessorClient する必要があります。 その後、必要に応じてクライアントを開始および停止できます。

PeekLock モードでメッセージを受信すると、アプリケーション ロジックが受信したメッセージを明示的に解決 (完了、破棄など) することをブローカーに通知します。

// Sample code that processes a single message which is received in PeekLock mode.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
    final ServiceBusReceivedMessage message = context.getMessage();
    // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
    // handling message reaches desired state such that it doesn't require Service Bus to redeliver
    // the same message, then context.complete() should be called otherwise context.abandon().
    final boolean success = Math.random() < 0.5;
    if (success) {
        try {
            context.complete();
        } catch (Exception completionError) {
            System.out.printf("Completion of the message %s failed\n", message.getMessageId());
            completionError.printStackTrace();
        }
    } else {
        try {
            context.abandon();
        } catch (Exception abandonError) {
            System.out.printf("Abandoning of the message %s failed\n", message.getMessageId());
            abandonError.printStackTrace();
        }
    }
};

// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
    System.err.println("Error occurred while receiving message: " + errorContext.getException());
};

// create the processor client via the builder and its sub-builder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
                                .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
                                .processor()
                                .queueName("<< QUEUE NAME >>")
                                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                                .disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
                                .processMessage(processMessage)
                                .processError(processError)
                                .disableAutoComplete()
                                .buildProcessorClient();

// Starts the processor in the background and returns immediately
processorClient.start();

ReceiveAndDelete モードでメッセージを受信する場合は、受信クライアントに送信するすべてのメッセージを、送信時に解決済みと見なすようにブローカーに指示します。

// Sample code that processes a single message which is received in ReceiveAndDelete mode.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
    final ServiceBusReceivedMessage message = context.getMessage();
    System.out.printf("handler processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
        message.getSequenceNumber(), message.getBody());
};

// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
    System.err.println("Error occurred while receiving message: " + errorContext.getException());
};

// create the processor client via the builder and its sub-builder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .processor()
    .queueName("<< QUEUE NAME >>")
    .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
    .processMessage(processMessage)
    .processError(processError)
    .disableAutoComplete()
    .buildProcessorClient();

// Starts the processor in the background and returns immediately
processorClient.start();

コールバックに渡されるメッセージ コンテキストで メソッドを使用してメッセージをセトリングするには、4 つの方法があります。

  • 完了 - キューまたはトピックからメッセージが削除されます。
  • 破棄 - メッセージに対する受信者のロックを解除し、他の受信者がメッセージを受信できるようにします。
  • Defer - 通常の方法でメッセージが受信されないようにします。 遅延メッセージを受信するには、メッセージのシーケンス番号を保持する必要があります。
  • 配信不能 - メッセージを 配信不能キューに移動します。 これにより、メッセージが再び受信されなくなります。 配信不能キューからメッセージを受信するには、配信不能キューをスコープとする受信者が必要です。

セッションが有効なキューまたはトピックからの送受信

セッションを使用するには、セッションが有効なキューまたはサブスクリプションを作成する必要があります。 これを構成する方法の詳細については、「メッセージ セッション」を参照してください。

Azure Service Bus セッションでは、関連メッセージのバインドなしシーケンスの結合および順序指定処理が可能です。 セッションは、先入れ先出し (FIFO) および要求 - 応答のパターンで使用できます。 プロパティをセッションに固有のアプリケーション定義識別子に設定 ServiceBusMessage.setSessionId(String) することで、トピックまたはキューにメッセージを送信するときに、すべての送信者がセッションを作成できます。

セッションが有効でないキューやサブスクリプションとは異なり、セッションから読み取ることができる受信者は 1 つだけです。 受信側がセッションをフェッチすると、Service Bus はそのレシーバーのセッションをロックし、そのセッション内のメッセージに排他的にアクセスできます。

セッションにメッセージを送信する

ServiceBusSenderClientセッションが有効なキューまたはトピック サブスクリプションの を作成します。 に をServiceBusMessage設定ServiceBusMessage.setSessionId(String)すると、そのセッションにメッセージが発行されます。 セッションが存在しない場合は、作成されます。

// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
    .setSessionId("greetings");

sender.sendMessage(message);

セッションからメッセージを受信する

セッションからのメッセージの受信は、セッションが有効になっていないキューまたはサブスクリプションからのメッセージの受信と似ています。 違いは、ビルダーと使用するクラスにあります。

非セッションの場合は、サブ ビルダー を使用します processor()。 セッションの場合は、サブビルダー sessionProcessor()を使用します。 どちらのサブビルダーも、セッションまたはセッション以外の Service Bus エンティティで動作するように構成された の ServiceBusProcessorClient インスタンスを作成します。 セッション プロセッサの場合は、プロセッサが同時に処理するセッションの最大数を渡すことができます。

配信不能キュー レシーバーを作成する

Azure Service Bus キューとトピック サブスクリプションは、配信不能キュー (DLQ) と呼ばれるセカンダリ サブキューを提供します。 配信不能キューを明示的に作成する必要はなく、削除したり、メイン エンティティとは別に管理したりすることはできません。 セッションが有効になっているか、セッション以外のキューまたはトピック サブスクリプションの場合、配信不能レシーバーは、次に示すのと同じ方法で作成できます。 配信不能キューの詳細については 、こちらを参照してください

ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .receiver() // Use this for session or non-session enabled queue or topic/subscriptions
    .topicName("<< TOPIC NAME >>")
    .subscriptionName("<< SUBSCRIPTION NAME >>")
    .subQueue(SubQueue.DEAD_LETTER_QUEUE)
    .buildClient();

クライアント間の接続の共有

Service Bus への物理接続を作成するには、リソースが必要です。 アプリケーションで接続を共有する必要がある
次に示すように、最上位レベルのビルダーを共有することで実現できるクライアント間。

// Create shared builder.
ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>");
// Create receiver and sender which will share the connection.
ServiceBusReceiverClient receiver = sharedConnectionBuilder
    .receiver()
    .queueName("<< QUEUE NAME >>")
    .buildClient();
ServiceBusSenderClient sender = sharedConnectionBuilder
    .sender()
    .queueName("<< QUEUE NAME >>")
    .buildClient();

'ServiceBusProcessorClient' を使用する場合。

'ServiceBusProcessorClient'、'ServiceBusReceiverClient'、または ServiceBusReceiverAsyncClient を使用する場合 プロセッサは 'ServiceBusReceiverAsyncClient' を使用して構築されており、'PEEK_LOCK' モードのメッセージ ロックの既定の自動完了と自動更新を使用してメッセージを受信する便利な方法を提供します。 プロセッサは、アプリケーションが非同期受信側クライアントに完全に移動せず、同期モードでメッセージを処理する場合に適しています。 プロセッサは、ネットワーク エラーから内部的に回復するため、メッセージを永遠に受信します。 メッセージごとに 'ServiceBusProcessorClient:processMessage()' 関数呼び出しが行われます。 または、'ServiceBusReceiverClient' を使用することもできます。これは下位レベルのクライアントであり、より広い範囲の API を提供します。 非同期処理が
アプリケーションに適しています。'ServiceBusReceiverAsyncClient' を使用できます。

トラブルシューティング

クライアントのログ記録を有効にする

Azure SDK for Java には、アプリケーション エラーのトラブルシューティングと解決の迅速化に役立つ一貫したログ記録のストーリーが用意されています。 生成されたログでは、最終状態に達する前のアプリケーションのフローがキャプチャされ、根本原因を特定するのに役立ちます。 ログ記録の有効化に関するガイダンスについては、ログ Wiki を参照してください。

AMQP トランスポート ログを有効にする

クライアント ログを有効にするだけでは問題を診断できない場合。 基になる AMQP ライブラリ Qpid Proton-J 内のファイルへのログ記録を有効にすることができます。 Qpid Proton-J では を使用します java.util.logging。 ログ記録を有効にするには、次の内容を含む構成ファイルを作成します。 または、 と、実装に必要な構成オプションを設定 proton.trace.level=ALL します java.util.logging.Handler 。 実装クラスとそのオプションは、 Java 8 SDK javadoc で確認できます。

AMQP トランスポート フレームをトレースするには、環境変数 を設定します。 PN_TRACE_FRM=1

サンプル "logging.properties" ファイル

以下の構成ファイルは、proton-j から "proton-trace.log" ファイルへのトレース出力をログに記録します。

handlers=java.util.logging.FileHandler
.level=OFF
proton.trace.level=ALL
java.util.logging.FileHandler.level=ALL
java.util.logging.FileHandler.pattern=proton-trace.log
java.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %3$s %4$s: %5$s %n

一般的な例外

AMQP 例外

これは、AMQP 関連のエラーの一般的な例外です。これには、 としての ErrorCondition AMQP エラーと、この例外の原因となったコンテキストが 含 AmqpErrorContextまれます。 isTransient は、例外が一時的なエラーであるかどうかを示すブール値です。 一時的な AMQP 例外が発生した場合、クライアント ライブラリは AmqpRetryOptions で許可されている回数だけ操作を再試行します。 その後、操作は失敗し、例外がユーザーに反映されます。

AmqpErrorCondition には、AMQP プロトコルに共通し、Azure サービスで使用されるエラー条件が含まれています。 AMQP 例外がスローされると、エラー条件フィールドを調べることで、AMQP 例外が発生した理由と可能であれば、この例外を軽減する方法を開発者に通知できます。 すべての AMQP 例外の一覧は 、OASIS AMQP バージョン 1.0 トランスポート エラーに関するページで確認できます。

AMQP 例外が表す特定の例外を解決するには、 Service Bus メッセージング 例外のガイダンスに従うことをお勧めします。

API の動作について

このドキュメントでは、同期 API を使用して複数のメッセージ (暗黙的なプリフェッチ) を取得する際の予想receiveMessagesされる動作に関する分析情報を提供します。

次の手順

説明した以外にも、Azure Service Bus クライアント ライブラリでは、Azure Service Bus サービスの完全な機能セットを活用するために役立つ多くの追加シナリオのサポートが提供されています。 これらのシナリオの一部を調べるのに役立つサンプルのセットは、 ここで入手できます。

共同作成

このプロジェクトの積極的なコントリビューターになりたい場合は、投稿 ガイドライン を参照してください。

インプレッション数