你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

ServiceBusProcessorClient 类

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusProcessorClient

实现

public final class ServiceBusProcessorClient
implements AutoCloseable

用于处理服务总线消息的处理器客户端。 ServiceBusProcessorClient 提供了一种基于推送的机制,该机制在收到消息时调用消息处理回调,或者在接收消息时出错时调用错误处理程序。 可以创建 , ServiceBusProcessorClient 以处理已启用会话或未启用会话的服务总线实体的消息。 默认情况下,它支持消息的自动结算。

实例化处理器客户端并在 PeekLock 模式下接收的示例代码

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
     // handling message reaches desired state such that it doesn't require Service Bus to redeliver
     // the same message, then context.complete() should be called otherwise context.abandon().
     final boolean success = Math.random() < 0.5;
     if (success) {
         try {
             context.complete();
         } catch (RuntimeException error) {
             System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                 message.getMessageId(), error);
         }
     } else {
         try {
             context.abandon();
         } catch (RuntimeException error) {
             System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                 message.getMessageId(), error);
         }
     }
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

实例化处理器客户端并在 ReceiveAndDelete 模式下接收的示例代码

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

创建并运行已启用会话的处理器

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
     ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 Consumer<ServiceBusErrorContext> onError = context -> {
     System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
         context.getFullyQualifiedNamespace(), context.getEntityPath());

     if (context.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) context.getException();

         System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", context.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .sessionProcessor()
     .queueName(sessionEnabledQueueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()
     .maxConcurrentSessions(2)
     .processMessage(onMessage)
     .processError(onError)
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 sessionProcessor.start();

 // Stop processor and dispose when done processing messages.
 sessionProcessor.stop();
 sessionProcessor.close();

方法摘要

修饰符和类型 方法和描述
synchronized void close()

停止消息处理并关闭处理器。

synchronized String getIdentifier()

获取 实例的 ServiceBusProcessorClient标识符。

String getQueueName()

返回与此 实例 ServiceBusProcessorClient关联的队列名称。

String getSubscriptionName()

返回与此 实例 ServiceBusProcessorClient关联的订阅名称。

String getTopicName()

返回与此 实例 ServiceBusProcessorClient关联的主题名称。

synchronized boolean isRunning()

true如果处理器正在运行,则返回 。

synchronized void start()

在后台启动处理器。

synchronized void stop()

停止此处理器的消息处理。

方法继承自 java.lang.Object

方法详细信息

close

public synchronized void close()

停止消息处理并关闭处理器。 接收链接和会话已关闭,调用 start() 将创建包含新链接和新会话的新处理周期。

getIdentifier

public synchronized String getIdentifier()

获取 实例的 ServiceBusProcessorClient标识符。

Returns:

可以标识 实例的 ServiceBusProcessorClient标识符。

getQueueName

public String getQueueName()

返回与此 实例 ServiceBusProcessorClient关联的队列名称。

Returns:

与此 实例 ServiceBusProcessorClient 关联的队列名称,如果 null 处理器实例用于主题和订阅,则为 。

getSubscriptionName

public String getSubscriptionName()

返回与此 实例 ServiceBusProcessorClient关联的订阅名称。

Returns:

与此实例 ServiceBusProcessorClient 关联的订阅名称,如果 null 处理器实例用于队列,则为 。

getTopicName

public String getTopicName()

返回与此 实例 ServiceBusProcessorClient关联的主题名称。

Returns:

与此 实例 ServiceBusProcessorClient 关联的主题名称,如果 null 处理器实例用于队列,则为 。

isRunning

public synchronized boolean isRunning()

true如果处理器正在运行,则返回 。 如果处理器已停止或关闭,此方法将 false返回 。

Returns:

true 如果处理器正在运行,则为 ; false 否则。

start

public synchronized void start()

在后台启动处理器。 调用此方法时,处理器将启动消息接收器,该消息接收器将在新消息可用时调用消息处理程序。 此方法是幂等 (即。在处理器已运行后再次调用 start() 是无操作) 。

调用后调用start()stop()将使用相同的基础连接继续处理消息。

调用后调用start()close()将使用新连接启动处理器。

stop

public synchronized void stop()

停止此处理器的消息处理。 接收链接和会话保持活动状态,此处理器可以通过再次调用 start() 来恢复处理消息。

适用于