你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder 类

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder

public final class ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder

用于创建 ServiceBusProcessorClient 以使用基于会话的服务总线实体的消息的生成器。 ServiceBusProcessorClient 通过 processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage) 和 处理消息和 processError(Consumer<ServiceBusErrorContext> processError)错误。 当处理器处理完会话时,它会尝试提取下一个要处理的会话。

默认情况下,处理器:

实例化启用了会话的处理器客户端

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
     ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 Consumer<ServiceBusErrorContext> onError = context -> {
     System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
         context.getFullyQualifiedNamespace(), context.getEntityPath());

     if (context.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) context.getException();

         System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", context.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .sessionProcessor()
     .queueName(sessionEnabledQueueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()
     .maxConcurrentSessions(2)
     .processMessage(onMessage)
     .processError(onError)
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 sessionProcessor.start();

 // Stop processor and dispose when done processing messages.
 sessionProcessor.stop();
 sessionProcessor.close();

方法摘要

修饰符和类型 方法和描述
ServiceBusProcessorClient buildProcessorClient()

创建一个会话 感知 服务总线处理器,负责从特定队列或订阅读取 ServiceBusReceivedMessage 数据。

ServiceBusSessionProcessorClientBuilder disableAutoComplete()

禁用自动完成和自动放弃收到的消息。

ServiceBusSessionProcessorClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)

设置继续自动续订锁的时间量。

ServiceBusSessionProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)

此处理器应处理的最大并发消息数。

ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConcurrentSessions)

通过最多 maxConcurrentSessions处理 来启用会话处理滚动更新。

ServiceBusSessionProcessorClientBuilder prefetchCount(int prefetchCount)

设置处理器的预提取计数。

ServiceBusSessionProcessorClientBuilder processError(Consumer<ServiceBusErrorContext> processError)

处理器的错误处理程序,在接收消息时出错时将调用该处理程序。

ServiceBusSessionProcessorClientBuilder processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage)

接收消息时将执行的处理器的消息处理回调。

ServiceBusSessionProcessorClientBuilder queueName(String queueName)

设置要为其创建处理器的队列的名称。

ServiceBusSessionProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode)

设置处理器的接收模式。

ServiceBusSessionProcessorClientBuilder sessionIdleTimeout(Duration sessionIdleTimeout)

设置等待当前活动会话收到消息的最长时间。

ServiceBusSessionProcessorClientBuilder subQueue(SubQueue subQueue)

设置要连接到的 SubQueue 的类型。

ServiceBusSessionProcessorClientBuilder subscriptionName(String subscriptionName)

设置主题中要侦听的订阅的名称。

ServiceBusSessionProcessorClientBuilder topicName(String topicName)

设置主题的名称。

方法继承自 java.lang.Object

方法详细信息

buildProcessorClient

public ServiceBusProcessorClient buildProcessorClient()

创建一个会话 感知 服务总线处理器,负责从特定队列或订阅读取 ServiceBusReceivedMessage 数据。

Returns:

从队列或订阅接收消息的新 ServiceBusProcessorClient

disableAutoComplete

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder disableAutoComplete()

禁用自动完成和自动放弃收到的消息。 默认情况下,已成功处理的消息为 complete()。 如果在处理消息时发生错误,则为 abandon()

Returns:

已修改的 ServiceBusSessionProcessorClientBuilder 对象。

maxAutoLockRenewDuration

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)

设置继续自动续订锁的时间量。 设置 Duration#ZEROnull 禁用自动续订。 对于 RECEIVE_AND_DELETE 模式,自动续订处于禁用状态。

Parameters:

maxAutoLockRenewDuration - 继续自动续订锁的时间量。 Duration#ZEROnull 指示已禁用自动续订。

Returns:

已更新的 ServiceBusSessionProcessorClientBuilder 对象。

maxConcurrentCalls

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)

此处理器应处理的最大并发消息数。

Parameters:

maxConcurrentCalls - 此处理器应处理的最大并发消息数。

Returns:

已更新的 ServiceBusSessionProcessorClientBuilder 对象。

maxConcurrentSessions

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConcurrentSessions)

通过最多 maxConcurrentSessions处理 来启用会话处理滚动更新。

Parameters:

maxConcurrentSessions - 在任何给定时间处理的最大并发会话数。

Returns:

已修改的 ServiceBusSessionProcessorClientBuilder 对象。

prefetchCount

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder prefetchCount(int prefetchCount)

设置处理器的预提取计数。 PEEK_LOCK对于 和 RECEIVE_AND_DELETE 模式,默认值为 0。 预提取旨在使消息在应用程序启动处理器时和之前随时可供本地检索,从而加快消息流的速度。 设置非零值将预提取该数量的消息。 将值设置为零会关闭预提取。 使用非零预提取可能会丢失消息,即使具有更好的性能。

Parameters:

prefetchCount - 预提取计数。

Returns:

已修改的 ServiceBusProcessorClientBuilder 对象。

processError

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder processError(Consumer processError)

处理器的错误处理程序,在接收消息时出错时将调用该处理程序。

Parameters:

processError - 发生错误时将执行的错误处理程序。

Returns:

更新 ServiceBusProcessorClientBuilder 的对象

processMessage

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder processMessage(Consumer processMessage)

接收消息时将执行的处理器的消息处理回调。

Parameters:

processMessage - 接收消息时将执行的消息处理使用者。

Returns:

已更新的 ServiceBusProcessorClientBuilder 对象。

queueName

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder queueName(String queueName)

设置要为其创建处理器的队列的名称。

Parameters:

queueName - 队列名称。

Returns:

已修改的 ServiceBusSessionProcessorClientBuilder 对象。

receiveMode

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode)

设置处理器的接收模式。

Parameters:

receiveMode - 接收消息的模式。

Returns:

已修改的 ServiceBusSessionProcessorClientBuilder 对象。

sessionIdleTimeout

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder sessionIdleTimeout(Duration sessionIdleTimeout)

设置等待当前活动会话收到消息的最长时间。 经过该时间后,处理器将关闭此会话并尝试处理另一个会话。

在处理器将消息传递给 processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage) 处理程序后,如果处理器无法从会话接收下一条消息,因为会话中没有下一条消息或处理当前消息所花费的时间超过该 sessionIdleTimeout 消息,则会话将超时。若要避免意外丢失会话,请选择 sessionIdleTimeout 大于消息处理时间的 。

如果未指定, AmqpRetryOptions#getTryTimeout() 将使用 。

Parameters:

sessionIdleTimeout - 会话空闲超时。

Returns:

已更新的 ServiceBusSessionProcessorClientBuilder 对象。

subQueue

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder subQueue(SubQueue subQueue)

设置要连接到的 SubQueue 的类型。 Azure 服务总线队列和订阅提供辅助子队列,称为死信队列 (DLQ) 。

Parameters:

subQueue - 子队列的类型。

Returns:

已修改的 ServiceBusSessionProcessorClientBuilder 对象。

subscriptionName

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder subscriptionName(String subscriptionName)

设置主题中要侦听的订阅的名称。 topicName(String topicName) 还必须进行设置。

Parameters:

subscriptionName - 订阅的名称。

Returns:

已修改的 ServiceBusSessionProcessorClientBuilder 对象。

topicName

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder topicName(String topicName)

设置主题的名称。 subscriptionName(String subscriptionName) 还必须进行设置。

Parameters:

topicName - 主题名称。

Returns:

已修改的 ServiceBusSessionProcessorClientBuilder 对象。

适用于