ServiceBusClientBuilder.ServiceBusProcessorClientBuilder Class
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusClientBuilder. ServiceBusProcessorClientBuilder
- com.
public final class ServiceBusClientBuilder.ServiceBusProcessorClientBuilder
Builder for creating ServiceBusProcessorClient to consume messages from a Service Bus entity. ServiceBusProcessorClient provides a push-based mechanism that notifies the message processing callback when a message is received or the error handle when an error is observed. To create an instance, therefore, configuring the two callbacks - processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage) and processError(Consumer<ServiceBusErrorContext> processError) are necessary. By default, a ServiceBusProcessorClient is configured with auto-completion and auto-lock renewal capabilities.
Sample code to instantiate a processor client and receive in PeekLock mode
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (RuntimeException error) {
System.out.printf("Completion of the message %s failed.%n Error: %s%n",
message.getMessageId(), error);
}
} else {
try {
context.abandon();
} catch (RuntimeException error) {
System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
message.getMessageId(), error);
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
Sample code to instantiate a processor client and receive in ReceiveAndDelete mode
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
Method Summary
Methods inherited from java.lang.Object
Method Details
buildProcessorClient
public ServiceBusProcessorClient buildProcessorClient()
Creates Service Bus message processor responsible for reading ServiceBusReceivedMessage from a specific queue or subscription.
Returns:
disableAutoComplete
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder disableAutoComplete()
Disables auto-complete and auto-abandon of received messages. By default, a successfully processed message is complete(). If an error happens when the message is processed, it is abandon().
Returns:
maxAutoLockRenewDuration
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)
Sets the amount of time to continue auto-renewing the lock. Setting Duration#ZERO or null
disables auto-renewal. For RECEIVE_AND_DELETE mode, auto-renewal is disabled.
Parameters:
null
indicates that auto-renewal is disabled.
Returns:
maxConcurrentCalls
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)
Max concurrent messages that this processor should process. By default, this is set to 1.
Parameters:
Returns:
prefetchCount
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder prefetchCount(int prefetchCount)
Sets the prefetch count of the processor. For both PEEK_LOCK and RECEIVE_AND_DELETE modes the default value is 0. Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when and before the application starts the processor. Setting a non-zero value will prefetch that number of messages. Setting the value to zero turns prefetch off.
Parameters:
Returns:
processError
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processError(Consumer
The error handler for the processor which will be invoked in the event of an error while receiving messages.
Parameters:
Returns:
processMessage
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processMessage(Consumer
The message processing callback for the processor which will be executed when a message is received.
Parameters:
Returns:
queueName
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder queueName(String queueName)
Sets the name of the queue to create a processor for.
Parameters:
Returns:
receiveMode
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode)
Sets the receive mode for the processor.
Parameters:
Returns:
subQueue
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder subQueue(SubQueue subQueue)
Sets the type of the SubQueue to connect to. Azure Service Bus queues and subscriptions provide a secondary sub-queue, called a dead-letter queue (DLQ).
Parameters:
Returns:
subscriptionName
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder subscriptionName(String subscriptionName)
Sets the name of the subscription in the topic to listen to. topicName(String topicName) must also be set.
Parameters:
Returns:
topicName
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder topicName(String topicName)
Sets the name of the topic. subscriptionName(String subscriptionName) must also be set.
Parameters:
Returns:
Applies to
Azure SDK for Java
Váš názor
https://aka.ms/ContentUserFeedback.
Připravujeme: V průběhu roku 2024 budeme postupně vyřazovat problémy z GitHub coby mechanismus zpětné vazby pro obsah a nahrazovat ho novým systémem zpětné vazby. Další informace naleznete v tématu:Odeslat a zobrazit názory pro