ServiceBusClientBuilder.ServiceBusProcessorClientBuilder クラス

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusProcessorClientBuilder

public final class ServiceBusClientBuilder.ServiceBusProcessorClientBuilder

Service Bus エンティティからのメッセージを ServiceBusProcessorClient 使用するを作成するためのビルダー。 ServiceBusProcessorClient は、メッセージの受信時にメッセージ処理コールバックを通知するプッシュベースのメカニズム、またはエラーが観察されたときにエラー ハンドルを提供します。 したがって、インスタンスを作成するには、 と processError(Consumer<ServiceBusErrorContext> processError) という 2 つのコールバックをprocessMessage(Consumer<ServiceBusReceivedMessageContext> processMessage)構成する必要があります。 既定では、 ServiceBusProcessorClient は自動補完機能と自動ロック更新機能で構成されています。

プロセッサ クライアントをインスタンス化し、PeekLock モードで受信するサンプル コード

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
     // handling message reaches desired state such that it doesn't require Service Bus to redeliver
     // the same message, then context.complete() should be called otherwise context.abandon().
     final boolean success = Math.random() < 0.5;
     if (success) {
         try {
             context.complete();
         } catch (RuntimeException error) {
             System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                 message.getMessageId(), error);
         }
     } else {
         try {
             context.abandon();
         } catch (RuntimeException error) {
             System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                 message.getMessageId(), error);
         }
     }
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

プロセッサ クライアントをインスタンス化し、ReceiveAndDelete モードで受信するサンプル コード

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

メソッドの概要

修飾子と型 メソッドと説明
ServiceBusProcessorClient buildProcessorClient()

特定のキューまたはサブスクリプションからの読み取 ServiceBusReceivedMessage りを担当する Service Bus メッセージ プロセッサを作成します。

ServiceBusProcessorClientBuilder disableAutoComplete()

受信したメッセージの自動完了と自動破棄を無効にします。

ServiceBusProcessorClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)

ロックの自動更新を続行する時間を設定します。

ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)

このプロセッサで処理する必要がある同時実行メッセージの最大数。

ServiceBusProcessorClientBuilder prefetchCount(int prefetchCount)

プロセッサのプリフェッチ数を設定します。

ServiceBusProcessorClientBuilder processError(Consumer<ServiceBusErrorContext> processError)

メッセージの受信中にエラーが発生した場合に呼び出されるプロセッサのエラー ハンドラー。

ServiceBusProcessorClientBuilder processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage)

メッセージの受信時に実行されるプロセッサのメッセージ処理コールバック。

ServiceBusProcessorClientBuilder queueName(String queueName)

プロセッサを作成するキューの名前を設定します。

ServiceBusProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode)

プロセッサの受信モードを設定します。

ServiceBusProcessorClientBuilder subQueue(SubQueue subQueue)

接続する の SubQueue 種類を設定します。

ServiceBusProcessorClientBuilder subscriptionName(String subscriptionName)

リッスンするトピック内のサブスクリプションの名前を設定します。

ServiceBusProcessorClientBuilder topicName(String topicName)

トピックの名前を設定します。

メソッドの継承元: java.lang.Object

メソッドの詳細

buildProcessorClient

public ServiceBusProcessorClient buildProcessorClient()

特定のキューまたはサブスクリプションからの読み取 ServiceBusReceivedMessage りを担当する Service Bus メッセージ プロセッサを作成します。

Returns:

キューまたはサブスクリプションからのメッセージを処理する新しい ServiceBusProcessorClient

disableAutoComplete

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder disableAutoComplete()

受信したメッセージの自動完了と自動破棄を無効にします。 既定では、正常に処理されたメッセージは です complete()。 メッセージの処理中にエラーが発生した場合は、 です abandon()

Returns:

変更された ServiceBusProcessorClientBuilder オブジェクトです。

maxAutoLockRenewDuration

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)

ロックの自動更新を続行する時間を設定します。 自動更新を設定 Duration#ZERO または null 無効にします。 モードの場合 RECEIVE_AND_DELETE 、自動更新は無効になっています。

Parameters:

maxAutoLockRenewDuration - ロックの自動更新を続行する時間。 Duration#ZERO または null は、自動更新が無効になっていることを示します。

Returns:

更新後の ServiceBusProcessorClientBuilder オブジェクト。

maxConcurrentCalls

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)

このプロセッサで処理する必要がある同時実行メッセージの最大数。 既定値は 1 です。

Parameters:

maxConcurrentCalls - このプロセッサで処理する必要がある最大同時メッセージ数。

Returns:

更新後の ServiceBusProcessorClientBuilder オブジェクト。

prefetchCount

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder prefetchCount(int prefetchCount)

プロセッサのプリフェッチ数を設定します。 モードと RECEIVE_AND_DELETE モードの両方PEEK_LOCKの場合、既定値は 0 です。 プリフェッチは、アプリケーションがプロセッサを起動する前に、ローカルの取得にメッセージをすぐに使用できるようにすることを目的として、メッセージ フローを高速化します。 0 以外の値を設定すると、その数のメッセージがプリフェッチされます。 値を 0 に設定すると、プリフェッチがオフになります。

Parameters:

prefetchCount - プリフェッチ数。

Returns:

変更された ServiceBusProcessorClientBuilder オブジェクトです。

processError

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processError(Consumer processError)

メッセージの受信中にエラーが発生した場合に呼び出されるプロセッサのエラー ハンドラー。

Parameters:

processError - エラーが発生したときに実行されるエラー ハンドラー。

Returns:

更新された ServiceBusProcessorClientBuilder オブジェクト

processMessage

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processMessage(Consumer processMessage)

メッセージの受信時に実行されるプロセッサのメッセージ処理コールバック。

Parameters:

processMessage - メッセージの受信時に実行されるメッセージ処理コンシューマー。

Returns:

更新後の ServiceBusProcessorClientBuilder オブジェクト。

queueName

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder queueName(String queueName)

プロセッサを作成するキューの名前を設定します。

Parameters:

queueName - キューの名前。

Returns:

変更された ServiceBusProcessorClientBuilder オブジェクトです。

receiveMode

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode)

プロセッサの受信モードを設定します。

Parameters:

receiveMode - メッセージ受信用のモード。

Returns:

変更された ServiceBusProcessorClientBuilder オブジェクトです。

subQueue

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder subQueue(SubQueue subQueue)

接続する の SubQueue 種類を設定します。 Azure Service Bus キューとサブスクリプションは、配信不能キュー (DLQ) と呼ばれるセカンダリ サブキューを提供します。

Parameters:

subQueue - サブ キューの種類。

Returns:

変更された ServiceBusProcessorClientBuilder オブジェクトです。

subscriptionName

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder subscriptionName(String subscriptionName)

リッスンするトピック内のサブスクリプションの名前を設定します。 topicName(String topicName) も設定する必要があります。

Parameters:

subscriptionName - サブスクリプションの名前。

Returns:

変更された ServiceBusProcessorClientBuilder オブジェクトです。

topicName

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder topicName(String topicName)

トピックの名前を設定します。 subscriptionName(String subscriptionName) も設定する必要があります。

Parameters:

topicName - トピックの名前。

Returns:

変更された ServiceBusProcessorClientBuilder オブジェクトです。

適用対象