你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

ServiceBusClientBuilder.ServiceBusProcessorClientBuilder 类

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusProcessorClientBuilder

public final class ServiceBusClientBuilder.ServiceBusProcessorClientBuilder

用于创建 ServiceBusProcessorClient 以使用来自服务总线实体的消息的生成器。 ServiceBusProcessorClient 提供一种基于推送的机制,该机制在收到消息时通知消息处理回调,或者在观察到错误时通知错误句柄。 因此,若要创建实例,必须配置两个回调 - processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage)processError(Consumer<ServiceBusErrorContext> processError) 。 默认情况下, ServiceBusProcessorClient 配置了具有自动完成和自动锁定续订功能。

实例化处理器客户端并在 PeekLock 模式下接收的示例代码

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
     // handling message reaches desired state such that it doesn't require Service Bus to redeliver
     // the same message, then context.complete() should be called otherwise context.abandon().
     final boolean success = Math.random() < 0.5;
     if (success) {
         try {
             context.complete();
         } catch (RuntimeException error) {
             System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                 message.getMessageId(), error);
         }
     } else {
         try {
             context.abandon();
         } catch (RuntimeException error) {
             System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                 message.getMessageId(), error);
         }
     }
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

实例化处理器客户端并在 ReceiveAndDelete 模式下接收的示例代码

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

方法摘要

修饰符和类型 方法和描述
ServiceBusProcessorClient buildProcessorClient()

创建负责从特定队列或订阅读取 ServiceBusReceivedMessage 的服务总线消息处理器。

ServiceBusProcessorClientBuilder disableAutoComplete()

禁用自动完成和自动放弃收到的消息。

ServiceBusProcessorClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)

设置继续自动续订锁的时间量。

ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)

此处理器应处理的最大并发消息数。

ServiceBusProcessorClientBuilder prefetchCount(int prefetchCount)

设置处理器的预提取计数。

ServiceBusProcessorClientBuilder processError(Consumer<ServiceBusErrorContext> processError)

处理器的错误处理程序,在接收消息时出错时将调用该处理程序。

ServiceBusProcessorClientBuilder processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage)

处理器的消息处理回调,在收到消息时将执行该回调。

ServiceBusProcessorClientBuilder queueName(String queueName)

设置要为其创建处理器的队列的名称。

ServiceBusProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode)

设置处理器的接收模式。

ServiceBusProcessorClientBuilder subQueue(SubQueue subQueue)

设置要连接到的 SubQueue 的类型。

ServiceBusProcessorClientBuilder subscriptionName(String subscriptionName)

设置主题中要侦听的订阅的名称。

ServiceBusProcessorClientBuilder topicName(String topicName)

设置主题的名称。

方法继承自 java.lang.Object

方法详细信息

buildProcessorClient

public ServiceBusProcessorClient buildProcessorClient()

创建负责从特定队列或订阅读取 ServiceBusReceivedMessage 的服务总线消息处理器。

Returns:

处理来自队列或订阅的消息的新 ServiceBusProcessorClient

disableAutoComplete

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder disableAutoComplete()

禁用自动完成和自动放弃收到的消息。 默认情况下,已成功处理的消息为 complete()。 如果在处理消息时发生错误,则为 abandon()

Returns:

已修改的 ServiceBusProcessorClientBuilder 对象。

maxAutoLockRenewDuration

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)

设置继续自动续订锁的时间量。 设置 Duration#ZEROnull 禁用自动续订。 对于 RECEIVE_AND_DELETE 模式,自动续订处于禁用状态。

Parameters:

maxAutoLockRenewDuration - 继续自动续订锁的时间量。 Duration#ZEROnull 指示已禁用自动续订。

Returns:

已更新的 ServiceBusProcessorClientBuilder 对象。

maxConcurrentCalls

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)

此处理器应处理的最大并发消息数。 默认情况下,此值设置为 1。

Parameters:

maxConcurrentCalls - 此处理器应处理的最大并发消息数。

Returns:

已更新的 ServiceBusProcessorClientBuilder 对象。

prefetchCount

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder prefetchCount(int prefetchCount)

设置处理器的预提取计数。 PEEK_LOCK对于 和 RECEIVE_AND_DELETE 模式,默认值为 0。 预提取旨在使消息在应用程序启动处理器时和之前随时可供本地检索,从而加快消息流的速度。 设置非零值将预提取该数量的消息。 将值设置为零会关闭预提取。

Parameters:

prefetchCount - 预提取计数。

Returns:

已修改的 ServiceBusProcessorClientBuilder 对象。

processError

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processError(Consumer processError)

处理器的错误处理程序,在接收消息时出错时将调用该处理程序。

Parameters:

processError - 发生错误时将执行的错误处理程序。

Returns:

更新 ServiceBusProcessorClientBuilder 的对象

processMessage

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processMessage(Consumer processMessage)

处理器的消息处理回调,在收到消息时将执行该回调。

Parameters:

processMessage - 接收消息时将执行的消息处理使用者。

Returns:

已更新的 ServiceBusProcessorClientBuilder 对象。

queueName

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder queueName(String queueName)

设置要为其创建处理器的队列的名称。

Parameters:

queueName - 队列名称。

Returns:

已修改的 ServiceBusProcessorClientBuilder 对象。

receiveMode

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode)

设置处理器的接收模式。

Parameters:

receiveMode - 接收消息的模式。

Returns:

已修改的 ServiceBusProcessorClientBuilder 对象。

subQueue

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder subQueue(SubQueue subQueue)

设置要连接到的 SubQueue 的类型。 Azure 服务总线队列和订阅提供辅助子队列,称为死信队列 (DLQ) 。

Parameters:

subQueue - 子队列的类型。

Returns:

已修改的 ServiceBusProcessorClientBuilder 对象。

subscriptionName

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder subscriptionName(String subscriptionName)

设置主题中要侦听的订阅的名称。 topicName(String topicName) 还必须进行设置。

Parameters:

subscriptionName - 订阅的名称。

Returns:

已修改的 ServiceBusProcessorClientBuilder 对象。

topicName

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder topicName(String topicName)

设置主题的名称。 subscriptionName(String subscriptionName) 还必须进行设置。

Parameters:

topicName - 主题名称。

Returns:

已修改的 ServiceBusProcessorClientBuilder 对象。

适用于