你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
ServiceBusClientBuilder.ServiceBusProcessorClientBuilder 类
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusClientBuilder. ServiceBusProcessorClientBuilder
- com.
public final class ServiceBusClientBuilder.ServiceBusProcessorClientBuilder
用于创建 ServiceBusProcessorClient 以使用来自服务总线实体的消息的生成器。 ServiceBusProcessorClient 提供一种基于推送的机制,该机制在收到消息时通知消息处理回调,或者在观察到错误时通知错误句柄。 因此,若要创建实例,必须配置两个回调 - processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage) 和 processError(Consumer<ServiceBusErrorContext> processError) 。 默认情况下, ServiceBusProcessorClient 配置了具有自动完成和自动锁定续订功能。
实例化处理器客户端并在 PeekLock 模式下接收的示例代码
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (RuntimeException error) {
System.out.printf("Completion of the message %s failed.%n Error: %s%n",
message.getMessageId(), error);
}
} else {
try {
context.abandon();
} catch (RuntimeException error) {
System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
message.getMessageId(), error);
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
实例化处理器客户端并在 ReceiveAndDelete 模式下接收的示例代码
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
方法摘要
方法继承自 java.lang.Object
方法详细信息
buildProcessorClient
public ServiceBusProcessorClient buildProcessorClient()
创建负责从特定队列或订阅读取 ServiceBusReceivedMessage 的服务总线消息处理器。
Returns:
disableAutoComplete
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder disableAutoComplete()
禁用自动完成和自动放弃收到的消息。 默认情况下,已成功处理的消息为 complete()。 如果在处理消息时发生错误,则为 abandon()。
Returns:
maxAutoLockRenewDuration
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)
设置继续自动续订锁的时间量。 设置 Duration#ZERO 或 null
禁用自动续订。 对于 RECEIVE_AND_DELETE 模式,自动续订处于禁用状态。
Parameters:
null
指示已禁用自动续订。
Returns:
maxConcurrentCalls
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)
此处理器应处理的最大并发消息数。 默认情况下,此值设置为 1。
Parameters:
Returns:
prefetchCount
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder prefetchCount(int prefetchCount)
设置处理器的预提取计数。 PEEK_LOCK对于 和 RECEIVE_AND_DELETE 模式,默认值为 0。 预提取旨在使消息在应用程序启动处理器时和之前随时可供本地检索,从而加快消息流的速度。 设置非零值将预提取该数量的消息。 将值设置为零会关闭预提取。
Parameters:
Returns:
processError
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processError(Consumer
处理器的错误处理程序,在接收消息时出错时将调用该处理程序。
Parameters:
Returns:
processMessage
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processMessage(Consumer
处理器的消息处理回调,在收到消息时将执行该回调。
Parameters:
Returns:
queueName
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder queueName(String queueName)
设置要为其创建处理器的队列的名称。
Parameters:
Returns:
receiveMode
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode)
设置处理器的接收模式。
Parameters:
Returns:
subQueue
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder subQueue(SubQueue subQueue)
设置要连接到的 SubQueue 的类型。 Azure 服务总线队列和订阅提供辅助子队列,称为死信队列 (DLQ) 。
Parameters:
Returns:
subscriptionName
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder subscriptionName(String subscriptionName)
设置主题中要侦听的订阅的名称。 topicName(String topicName) 还必须进行设置。
Parameters:
Returns:
topicName
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder topicName(String topicName)
设置主题的名称。 subscriptionName(String subscriptionName) 还必须进行设置。
Parameters:
Returns: