你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
ServiceBusProcessorClient 类
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusProcessorClient
- com.
实现
public final class ServiceBusProcessorClient
implements AutoCloseable
用于处理服务总线消息的处理器客户端。 ServiceBusProcessorClient 提供了一种基于推送的机制,该机制在收到消息时调用消息处理回调,或者在接收消息时出错时调用错误处理程序。 可以创建 , ServiceBusProcessorClient 以处理已启用会话或未启用会话的服务总线实体的消息。 默认情况下,它支持消息的自动结算。
实例化处理器客户端并在 PeekLock 模式下接收的示例代码
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (RuntimeException error) {
System.out.printf("Completion of the message %s failed.%n Error: %s%n",
message.getMessageId(), error);
}
} else {
try {
context.abandon();
} catch (RuntimeException error) {
System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
message.getMessageId(), error);
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
实例化处理器客户端并在 ReceiveAndDelete 模式下接收的示例代码
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
创建并运行已启用会话的处理器
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
Consumer<ServiceBusErrorContext> onError = context -> {
System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
context.getFullyQualifiedNamespace(), context.getEntityPath());
if (context.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) context.getException();
System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", context.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.sessionProcessor()
.queueName(sessionEnabledQueueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete()
.maxConcurrentSessions(2)
.processMessage(onMessage)
.processError(onError)
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
sessionProcessor.start();
// Stop processor and dispose when done processing messages.
sessionProcessor.stop();
sessionProcessor.close();
方法摘要
修饰符和类型 | 方法和描述 |
---|---|
synchronized void |
close()
停止消息处理并关闭处理器。 |
synchronized String |
getIdentifier()
获取 实例的 ServiceBusProcessorClient标识符。 |
String |
getQueueName()
返回与此 实例 ServiceBusProcessorClient关联的队列名称。 |
String |
getSubscriptionName()
返回与此 实例 ServiceBusProcessorClient关联的订阅名称。 |
String |
getTopicName()
返回与此 实例 ServiceBusProcessorClient关联的主题名称。 |
synchronized boolean |
isRunning()
|
synchronized void |
start()
在后台启动处理器。 |
synchronized void |
stop()
停止此处理器的消息处理。 |
方法继承自 java.lang.Object
方法详细信息
close
public synchronized void close()
停止消息处理并关闭处理器。 接收链接和会话已关闭,调用 start() 将创建包含新链接和新会话的新处理周期。
getIdentifier
public synchronized String getIdentifier()
获取 实例的 ServiceBusProcessorClient标识符。
Returns:
getQueueName
public String getQueueName()
返回与此 实例 ServiceBusProcessorClient关联的队列名称。
Returns:
null
处理器实例用于主题和订阅,则为 。getSubscriptionName
public String getSubscriptionName()
返回与此 实例 ServiceBusProcessorClient关联的订阅名称。
Returns:
null
处理器实例用于队列,则为 。getTopicName
public String getTopicName()
返回与此 实例 ServiceBusProcessorClient关联的主题名称。
Returns:
null
处理器实例用于队列,则为 。isRunning
public synchronized boolean isRunning()
true
如果处理器正在运行,则返回 。 如果处理器已停止或关闭,此方法将 false
返回 。
Returns:
true
如果处理器正在运行,则为 ; false
否则。start
stop
public synchronized void stop()
停止此处理器的消息处理。 接收链接和会话保持活动状态,此处理器可以通过再次调用 start() 来恢复处理消息。