ServiceBusClientBuilder クラス
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusClientBuilder
- com.
実装
public final class ServiceBusClientBuilder
implements TokenCredentialTrait<ServiceBusClientBuilder>, AzureNamedKeyCredentialTrait<ServiceBusClientBuilder>, ConnectionStringTrait<ServiceBusClientBuilder>, AzureSasCredentialTrait<ServiceBusClientBuilder>, AmqpTrait<ServiceBusClientBuilder>, ConfigurationTrait<ServiceBusClientBuilder>
このクラスは、Service Bus エンティティとの間でメッセージを送受信するためのクライアントのインスタンス化を支援する fluent Builder API を提供します。
Azure Service Busに対して操作を実行するには、資格情報が必要です。 これらは、次のいずれかの方法を使用して設定できます。
- connectionString(String connectionString)Service Bus 名前空間への接続文字列を使用します。
- credential(String fullyQualifiedNamespace, TokenCredential credential)、 credential(String fullyQualifiedNamespace, AzureSasCredential credential)、および credential(String fullyQualifiedNamespace, AzureNamedKeyCredential credential) オーバーロードは、完全修飾 Service Bus 名前空間にアクセスできるそれぞれの資格情報と共に使用できます。
- credential(TokenCredential credential)、 credential(AzureSasCredential credential)、および credential(AzureNamedKeyCredential credential) オーバーロードは、それぞれの資格情報と共に使用できます。 fullyQualifiedNamespace(String fullyQualifiedNamespace)を設定する必要があります 。
次のサンプルで使用される資格情報は、認証用です DefaultAzureCredential
。 ローカルの開発環境や運用環境など、ほとんどのシナリオに適しています。 さらに、運用環境での認証には マネージド ID を 使用することをお勧めします。 さまざまな認証方法と、それに対応する資格情報の種類の詳細については、 Azure Identity のドキュメントを参照してください。
クライアントとサブビルダー
ServiceBusClientBuilder では、複数のクライアントをインスタンス化できます。 インスタンス化するクライアントは、ユーザーがメッセージを発行または受信しているかどうか、およびエンティティで Service Bus セッション が有効になっているかどうかによって異なります。
- メッセージの送信: サブビルダーを sender() 使用して と を作成 ServiceBusSenderAsyncClient します ServiceBusSenderClient。
- メッセージの受信: サブビルダーを receiver() 使用して と を作成 ServiceBusReceiverAsyncClient します ServiceBusReceiverAsyncClient。
- セッションが有効な Service Bus エンティティからのメッセージの受信: サブビルダーを sessionReceiver() 使用して と を作成 ServiceBusSessionReceiverAsyncClient します ServiceBusSessionReceiverClient。
- コールバック ベースのプロセッサを使用してメッセージを受信する: サブビルダーを processor() 使用して を作成 ServiceBusProcessorClientします。
- コールバック ベースのプロセッサを使用してセッション対応 Service Bus エンティティからメッセージを受信 する : サブビルダーを sessionProcessor() 使用して を作成 ServiceBusProcessorClientします。
メッセージを送信する
サンプル: 同期送信者をインスタンス化してメッセージを送信する
次のコード サンプルは、同期クライアント ServiceBusSenderClient の作成とメッセージの送信を示しています。 は fullyQualifiedNamespace
Service Bus 名前空間のホスト名です。 Azure Portal を使用して Service Bus 名前空間に移動した後、"Essentials" パネルの下に一覧表示されます。 使用される資格情報は、 DefaultAzureCredential
デプロイと開発でよく使用される資格情報を組み合わせ、実行環境に基づいて使用する資格情報を選択するためです。 パフォーマンスが重要な場合は、 を使用して ServiceBusMessageBatch 複数のメッセージを一度に発行することを検討してください。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sender()
.queueName(queueName)
.buildClient();
sender.sendMessage(new ServiceBusMessage("Foo bar"));
メッセージの使用
Service Bus エンティティからのメッセージを使用するためのクライアントは複数あります ( Service Bus セッション が有効になっていません)。
サンプル: 非同期レシーバーをインスタンス化する
次のコード例は、非同期レシーバーの作成を示しています。 使用される資格情報は認証用です DefaultAzureCredential
。 ローカルの開発環境や運用環境など、ほとんどのシナリオに適しています。 PEEK_LOCK と disableAutoComplete() は、ユーザーがメッセージ決済を制御できるように 強くお 勧めします。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.receiver()
.disableAutoComplete()
.queueName(queueName)
.buildAsyncClient();
// When users are done with the receiver, dispose of the receiver.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
asyncReceiver.close();
サンプル: インスタンス化 ServiceBusProcessorClient
次のコード例は、プロセッサ クライアントの作成を示しています。 プロセッサ クライアントは、接続回復を提供するため、ほとんどの運用シナリオに推奨されます。 使用される資格情報は認証用です DefaultAzureCredential
。 ローカルの開発環境や運用環境など、ほとんどのシナリオに適しています。 PEEK_LOCK と disableAutoComplete() は、ユーザーがメッセージ決済を制御できるように 強くお 勧めします。
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (RuntimeException error) {
System.out.printf("Completion of the message %s failed.%n Error: %s%n",
message.getMessageId(), error);
}
} else {
try {
context.abandon();
} catch (RuntimeException error) {
System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
message.getMessageId(), error);
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
セッションが有効な Service Bus エンティティからのメッセージの使用
Service Bus では、 Service Bus セッションを介した無制限のメッセージ シーケンスの共同および順序付けされた処理がサポートされています。 セッションは、メッセージの先入れ先出し (FIFO) 処理として使用できます。 キューとトピック/サブスクリプションは Service Bus セッションをサポートしていますが、 エンティティの作成時に有効にする必要があります。
サンプル: セッションが有効なキューへのメッセージの送信
次のスニペットは、 Service Bus セッション が有効なキューにメッセージを送信する方法を示しています。 プロパティを "greetings" に設定 setMessageId(String messageId) すると、ID が "greetings" の Service Bus セッションにメッセージが送信されます。
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
.sender()
.queueName(sessionEnabledQueueName)
.buildClient();
// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
.setSessionId("greetings");
sender.sendMessage(message);
// Dispose of the sender.
sender.close();
サンプル: 最初に使用可能なセッションからメッセージを受信する
最初に使用可能なセッションからのメッセージを処理するには、 に ServiceBusSessionReceiverClientBuilder 切り替えて、セッション レシーバー クライアントをビルドします。 を使用して acceptNextSession() 、メッセージを処理する最初の使用可能なセッションを見つけます。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// Creates a client to receive messages from the first available session. It waits until
// AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
// completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages().flatMap(message -> {
System.out.println("Received message: " + message.getBody());
// Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
}),
receiver -> Mono.fromRunnable(() -> {
// Dispose of the receiver and sessionReceiver when done receiving messages.
receiver.close();
sessionReceiver.close();
}));
// This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
// operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
// receiving messages.
Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
サンプル: すべてのセッションからのメッセージを処理する
次のコード サンプルは、キューで使用可能なすべてのセッションを ServiceBusProcessorClient 処理する を作成する方法を示しています。 ServiceBusSessionProcessorClientBuilder#maxConcurrentSessions(int) は、プロセッサが同時に処理するセッションの数を示します。 使用される資格情報は認証用です DefaultAzureCredential
。 ローカルの開発環境や運用環境など、ほとんどのシナリオに適しています。 PEEK_LOCK と disableAutoComplete() は、ユーザーがメッセージ決済を制御できるように 強くお 勧めします。
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
Consumer<ServiceBusErrorContext> onError = context -> {
System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
context.getFullyQualifiedNamespace(), context.getEntityPath());
if (context.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) context.getException();
System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", context.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.sessionProcessor()
.queueName(sessionEnabledQueueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete()
.maxConcurrentSessions(2)
.processMessage(onMessage)
.processError(onError)
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
sessionProcessor.start();
// Stop processor and dispose when done processing messages.
sessionProcessor.stop();
sessionProcessor.close();
接続共有
Service Bus への接続を作成するには、リソースが必要です。 アーキテクチャで許可されている場合、アプリケーションはクライアント間の接続を共有する必要があります。これは、次に示すように、最上位のビルダーを共有することで実現できます。
クライアント間の接続の共有
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// Any clients created from this builder will share the underlying connection.
ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential);
// Create receiver and sender which will share the connection.
ServiceBusReceiverClient receiver = sharedConnectionBuilder
.receiver()
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.queueName(queueName)
.buildClient();
ServiceBusSenderClient sender = sharedConnectionBuilder
.sender()
.queueName(queueName)
.buildClient();
// Use the clients and finally close them.
try {
sender.sendMessage(new ServiceBusMessage("payload"));
receiver.receiveMessages(1);
} finally {
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
sender.close();
receiver.close();
}
コンストラクターの概要
コンストラクター | 説明 |
---|---|
ServiceBusClientBuilder() |
既定のトランスポート AMQPを使用して新しいインスタンスを作成します。 |
メソッドの概要
メソッドの継承元: java.lang.Object
コンストラクターの詳細
ServiceBusClientBuilder
public ServiceBusClientBuilder()
既定のトランスポート AMQPを使用して新しいインスタンスを作成します。
メソッドの詳細
clientOptions
public ServiceBusClientBuilder clientOptions(ClientOptions clientOptions)
ClientOptionsこのビルダーからビルドされたクライアントから送信する を設定し、特定のプロパティのカスタマイズを有効にし、カスタム ヘッダー情報の追加をサポートします。 詳細については、 ClientOptions ドキュメントを参照してください。
Parameters:
Returns:
configuration
public ServiceBusClientBuilder configuration(Configuration configuration)
サービス クライアントの構築中に使用される構成ストアを設定します。 指定しない場合は、既定の構成ストアを使用して Service Bus クライアントを構成します。 構築中に構成設定を使用してバイパスするには、 を使用 NONE します。
Parameters:
Returns:
connectionString
public ServiceBusClientBuilder connectionString(String connectionString)
Service Bus 名前空間または特定の Service Bus リソースの接続文字列を設定します。
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(AzureNamedKeyCredential credential)
Service Bus リソースの共有アクセス ポリシーを使用して資格情報を設定します。 共有アクセス ポリシーは、Azure portal または Azure CLI で確認できます。 たとえば、ポータルの "Shared Access policies" には "policy" があり、その "主キー" と "セカンダリ キー" があります。 の 'name' 属性 AzureNamedKeyCredential はポータルの 'policy' であり、'key' 属性には '主キー' または 'セカンダリ キー' を指定できます。 このメソッドは、 connectionString(String connectionString) 異なる形式で同じ情報を取得します。 ただし、名前とキーを更新できます。
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(AzureSasCredential credential)
Service Bus リソースの Shared Access Signature を使用して資格情報を設定します。 「Shared Access Signature を使用した Service Bus アクセス制御」を参照してください。
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(TokenCredential credential)
サービスに TokenCredential 送信された要求を承認するために使用される を設定します。 型の適切な使用方法の詳細については、Azure SDK for Java の ID と認証 に関するドキュメントを TokenCredential 参照してください。
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureNamedKeyCredential credential)
Service Bus リソースの共有アクセス ポリシーを使用して資格情報を設定します。 共有アクセス ポリシーは、Azure portal または Azure CLI で確認できます。 たとえば、ポータルの "Shared Access policies" には "policy" があり、その "主キー" と "セカンダリ キー" があります。 の 'name' 属性 AzureNamedKeyCredential はポータルの 'policy' であり、'key' 属性には '主キー' または 'セカンダリ キー' を指定できます。 このメソッドは、 connectionString(String connectionString) 異なる形式で同じ情報を取得します。 ただし、名前とキーを更新できます。
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureSasCredential credential)
Service Bus リソースの Shared Access Signature を使用して資格情報を設定します。 「Shared Access Signature を使用した Service Bus アクセス制御」を参照してください。
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, TokenCredential credential)
Service Bus リソースの を TokenCredential 使用して資格情報を設定します。 azure-identity には、Service Bus リソースへのアクセスを認証するために使用できる複数 TokenCredential の実装があります。
Parameters:
Returns:
customEndpointAddress
public ServiceBusClientBuilder customEndpointAddress(String customEndpointAddress)
Service Bus サービスに接続するときのカスタム エンドポイント アドレスを設定します。 これは、ネットワークが標準のAzure Service Busエンドポイント アドレスへの接続を許可せず、中継局経由での接続を許可している場合に便利です。 (例: https://my.custom.endpoint.com:55300)。
ポートが指定されていない場合は、 の既定の transportType(AmqpTransportType transportType) ポートが使用されます。
Parameters:
Returns:
enableCrossEntityTransactions
public ServiceBusClientBuilder enableCrossEntityTransactions()
Service Bus への接続でクロス エンティティ トランザクションを有効にします。 この機能は、トランザクション スコープが異なる Service Bus エンティティにまたがる場合にのみ使用します。 この機能は、次に説明するように、サーバー側の 1 つの "send-via" エンティティを介してすべてのメッセージをルーティングすることによって実現されます。 複数のエンティティに対してクライアントが作成されると、操作が行われる最初のエンティティは、後続のすべての送信がルーティングされるエンティティになります ('send-via' エンティティ)。 これにより、サービスは複数のエンティティにまたがるトランザクションを実行できます。 つまり、最初の操作を実行する後続のエンティティは送信者である必要があります。受信者である場合は、すべての送信がルーティングされる最初のエンティティと同じエンティティ上に存在する必要があります (それ以外の場合、サービスは別のエンティティを介して受信操作をルーティングできないため、トランザクションがコミットされることを保証できません)。 たとえば、エンティティ間トランザクションが有効になっているクライアントから作成された SenderA (エンティティ A の場合) と ReceiverB (エンティティ B の場合) がある場合、これを機能させるには、まず ReceiverB で受信する必要があります。 最初にエンティティ A に送信した後、エンティティ B から受信しようとすると、例外がスローされます。
このクライアントでトランザクション以外の API を使用しないようにする
この機能は、この機能を有効にするために最適化された Service Bus への接続を設定するためです。 すべてのクライアントがセットアップされると、最初に使用された受信者または送信者は、"send-via" キューを 1 つのメッセージ転送エンティティとして初期化します。 すべてのメッセージは、このキューを介して流れます。 したがって、このクライアントは、トランザクション以外の API には適していません。
この機能を有効にしない場合
トランザクションが 1 つの Service Bus エンティティにのみ関係している場合。 たとえば、1 つのキュー/サブスクリプションから受信していて、1 つのトランザクションの一部である独自のメッセージを決済する必要があります。
Returns:
fullyQualifiedNamespace
public ServiceBusClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace)
Service Bus の完全修飾名前空間を設定します。
Parameters:
Returns:
processor
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processor()
インスタンスの構成ServiceBusProcessorClientに使用される のServiceBusProcessorClientBuilder新しいインスタンス。
Returns:
proxyOptions
public ServiceBusClientBuilder proxyOptions(ProxyOptions proxyOptions)
に使用するプロキシ構成を設定します ServiceBusSenderAsyncClient。 プロキシが構成されている場合は、 AMQP_WEB_SOCKETS トランスポートの種類に を使用する必要があります。
Parameters:
Returns:
receiver
public ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiver()
Service Bus メッセージ レシーバーの構成に使用される の ServiceBusReceiverClientBuilder 新しいインスタンス。
Returns:
retryOptions
public ServiceBusClientBuilder retryOptions(AmqpRetryOptions retryOptions)
Service Bus クライアントの再試行オプションを設定します。 指定しない場合は、既定の再試行オプションが使用されます。
Parameters:
Returns:
ruleManager
public ServiceBusClientBuilder.ServiceBusRuleManagerBuilder ruleManager()
Service Bus ルール マネージャー インスタンスの構成に使用される の ServiceBusRuleManagerBuilder 新しいインスタンス。
Returns:
sender
public ServiceBusClientBuilder.ServiceBusSenderClientBuilder sender()
Service Bus メッセージ送信者の構成に使用される の ServiceBusSenderClientBuilder 新しいインスタンス。
Returns:
sessionProcessor
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder sessionProcessor()
セッションを処理する Service Bus プロセッサ インスタンスの構成に使用される の新しい ServiceBusSessionProcessorClientBuilder インスタンス。
Returns:
sessionReceiver
public ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiver()
セッション対応 Service Bus メッセージ レシーバーの構成に使用される のServiceBusSessionReceiverClientBuilder新しいインスタンス。
Returns:
transportType
public ServiceBusClientBuilder transportType(AmqpTransportType transportType)
Azure Service Busとの通信がすべて行われるトランスポートの種類を設定します。 既定値は AMQP です。
Parameters:
Returns:
適用対象
Azure SDK for Java