ServiceBusReceiverClient Class

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusReceiverClient

Implements

public final class ServiceBusReceiverClient
implements AutoCloseable

A synchronous receiver responsible for receiving ServiceBusReceivedMessage from a queue or topic/subscription on Azure Service Bus.

The examples shown in this document use a credential object named DefaultAzureCredential for authentication, which is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".

Sample: Create a receiver and receive messages

The following code sample demonstrates the creation and use of the synchronous client ServiceBusReceiverClient to receive messages from a Service Bus subscription. The receive operation returns when either 10 messages are received or 30 seconds has elapsed. By default, messages are received using PEEK_LOCK and customers must settle their messages using one of the settlement methods on the receiver client. " "Settling receive operations" provides additional information about message settlement.

TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .receiver()
     .topicName(topicName)
     .subscriptionName(subscriptionName)
     .buildClient();

 // Receives a batch of messages when 10 messages are received or until 30 seconds have elapsed, whichever
 // happens first.
 IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(10, Duration.ofSeconds(30));
 messages.forEach(message -> {
     System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(), message.getBody());

     // If able to process message, complete it. Otherwise, abandon it and allow it to be
     // redelivered.
     if (isMessageProcessed) {
         receiver.complete(message);
     } else {
         receiver.abandon(message);
     }
 });

 // When program ends, or you're done receiving all messages, dispose of the receiver.
 // Clients should be long-lived objects as they
 // require resources and time to establish a connection to the service.
 receiver.close();

Method Summary

Modifier and Type Method and Description
void abandon(ServiceBusReceivedMessage message)

Abandons a ServiceBusReceivedMessage.

void abandon(ServiceBusReceivedMessage message, AbandonOptions options)

Abandons a ServiceBusReceivedMessage and updates the message's properties.

void close()

Disposes of the consumer by closing the underlying links to the service.

void commitTransaction(ServiceBusTransactionContext transactionContext)

Commits the transaction and all the operations associated with it.

void complete(ServiceBusReceivedMessage message)

Completes a ServiceBusReceivedMessage.

void complete(ServiceBusReceivedMessage message, CompleteOptions options)

Completes a ServiceBusReceivedMessage.

ServiceBusTransactionContext createTransaction()

Starts a new transaction on Service Bus.

void deadLetter(ServiceBusReceivedMessage message)

Moves a ServiceBusReceivedMessage to the dead-letter sub-queue.

void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

Moves a ServiceBusReceivedMessage to the dead-letter sub-queue with dead-letter reason, error description, and/or modified properties.

void defer(ServiceBusReceivedMessage message)

Defers a ServiceBusReceivedMessage.

void defer(ServiceBusReceivedMessage message, DeferOptions options)

Defers a ServiceBusReceivedMessage using its lock token with modified message property.

String getEntityPath()

Gets the Service Bus resource this client interacts with.

String getFullyQualifiedNamespace()

Gets the fully qualified Service Bus namespace that the connection is associated with.

String getIdentifier()

Gets the identifier of the instance of ServiceBusReceiverClient.

String getSessionId()

Gets the SessionId of the session if this receiver is a session receiver.

byte[] getSessionState()

Gets the state of the session if this receiver is a session receiver.

ServiceBusReceivedMessage peekMessage()

Reads the next active message without changing the state of the receiver or the message source.

ServiceBusReceivedMessage peekMessage(long sequenceNumber)

Starting from the given sequence number, reads next the active message without changing the state of the receiver or the message source.

IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages)

Reads the next batch of active messages without changing the state of the receiver or the message source.

IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber)

Starting from the given sequence number, reads the next batch of active messages without changing the state of the receiver or the message source.

ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber)

Receives a deferred ServiceBusReceivedMessage.

IterableStream<ServiceBusReceivedMessage> receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers)

Receives a batch of deferred ServiceBusReceivedMessage.

IterableStream<ServiceBusReceivedMessage> receiveMessages(int maxMessages)

Receives an iterable stream of ServiceBusReceivedMessage from the Service Bus entity.

IterableStream<ServiceBusReceivedMessage> receiveMessages(int maxMessages, Duration maxWaitTime)

Receives an iterable stream of ServiceBusReceivedMessage from the Service Bus entity with a timout.

OffsetDateTime renewMessageLock(ServiceBusReceivedMessage message)

Renews the lock on the specified message.

void renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration, Consumer<Throwable> onError)

Starts the auto lock renewal for a message with the given lock.

OffsetDateTime renewSessionLock()

Sets the state of the session if this receiver is a session receiver.

void renewSessionLock(Duration maxLockRenewalDuration, Consumer<Throwable> onError)

Starts the auto lock renewal for the session that this receiver works for.

void rollbackTransaction(ServiceBusTransactionContext transactionContext)

Rollbacks the transaction given and all operations associated with it.

void setSessionState(byte[] sessionState)

Sets the state of the session if this receiver is a session receiver.

Methods inherited from java.lang.Object

Method Details

abandon

public void abandon(ServiceBusReceivedMessage message)

Abandons a ServiceBusReceivedMessage. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.

abandon

public void abandon(ServiceBusReceivedMessage message, AbandonOptions options)

Abandons a ServiceBusReceivedMessage and updates the message's properties. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.
options - The options to set while abandoning the message.

close

public void close()

Disposes of the consumer by closing the underlying links to the service.

commitTransaction

public void commitTransaction(ServiceBusTransactionContext transactionContext)

Commits the transaction and all the operations associated with it.

Creating and using a transaction

ServiceBusTransactionContext transaction = receiver.createTransaction();

 // Process messages and associate operations with the transaction.
 ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
 receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
 receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
 receiver.commitTransaction(transaction);

Parameters:

transactionContext - The transaction to be commit.

complete

public void complete(ServiceBusReceivedMessage message)

Completes a ServiceBusReceivedMessage. This will delete the message from the service.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.

complete

public void complete(ServiceBusReceivedMessage message, CompleteOptions options)

Completes a ServiceBusReceivedMessage. This will delete the message from the service.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.
options - Options used to complete the message.

createTransaction

public ServiceBusTransactionContext createTransaction()

Starts a new transaction on Service Bus. The ServiceBusTransactionContext should be passed along to all operations that need to be in this transaction.

Sample: Creating and using a transaction

ServiceBusTransactionContext transaction = receiver.createTransaction();

 // Process messages and associate operations with the transaction.
 ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
 receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
 receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
 receiver.commitTransaction(transaction);

Returns:

deadLetter

public void deadLetter(ServiceBusReceivedMessage message)

Moves a ServiceBusReceivedMessage to the dead-letter sub-queue.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.

deadLetter

public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

Moves a ServiceBusReceivedMessage to the dead-letter sub-queue with dead-letter reason, error description, and/or modified properties.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.
options - Options used to dead-letter the message.

defer

public void defer(ServiceBusReceivedMessage message)

Defers a ServiceBusReceivedMessage. This will move message into the deferred subqueue.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.

defer

public void defer(ServiceBusReceivedMessage message, DeferOptions options)

Defers a ServiceBusReceivedMessage using its lock token with modified message property. This will move message into the deferred sub-queue.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.
options - Options used to defer the message.

getEntityPath

public String getEntityPath()

Gets the Service Bus resource this client interacts with.

Returns:

The Service Bus resource this client interacts with.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Gets the fully qualified Service Bus namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net.

Returns:

The fully qualified Service Bus namespace that the connection is associated with.

getIdentifier

public String getIdentifier()

Gets the identifier of the instance of ServiceBusReceiverClient.

Returns:

The identifier that can identify the instance of ServiceBusReceiverClient.

getSessionId

public String getSessionId()

Gets the SessionId of the session if this receiver is a session receiver.

Returns:

The SessionId or null if this is not a session receiver.

getSessionState

public byte[] getSessionState()

Gets the state of the session if this receiver is a session receiver.

Returns:

The session state or null if there is no state set for the session.

peekMessage

public ServiceBusReceivedMessage peekMessage()

Reads the next active message without changing the state of the receiver or the message source. The first call to peekMessage() fetches the first active message for this receiver. Each subsequent call fetches the subsequent message in the entity.

Returns:

peekMessage

public ServiceBusReceivedMessage peekMessage(long sequenceNumber)

Starting from the given sequence number, reads next the active message without changing the state of the receiver or the message source.

Parameters:

sequenceNumber - The sequence number from where to read the message.

Returns:

peekMessages

public IterableStream peekMessages(int maxMessages)

Reads the next batch of active messages without changing the state of the receiver or the message source.

Parameters:

maxMessages - The maximum number of messages to peek.

Returns:

peekMessages

public IterableStream peekMessages(int maxMessages, long sequenceNumber)

Starting from the given sequence number, reads the next batch of active messages without changing the state of the receiver or the message source.

Parameters:

maxMessages - The number of messages.
sequenceNumber - The sequence number from where to start reading messages.

Returns:

receiveDeferredMessage

public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber)

Receives a deferred ServiceBusReceivedMessage. Deferred messages can only be received by using sequence number.

Parameters:

sequenceNumber - The getSequenceNumber() of the message.

Returns:

A deferred message with the matching sequenceNumber.

receiveDeferredMessageBatch

public IterableStream receiveDeferredMessageBatch(Iterable sequenceNumbers)

Receives a batch of deferred ServiceBusReceivedMessage. Deferred messages can only be received by using sequence number.

Parameters:

sequenceNumbers - The sequence numbers of the deferred messages.

Returns:

receiveMessages

public IterableStream receiveMessages(int maxMessages)

Receives an iterable stream of ServiceBusReceivedMessage from the Service Bus entity. The receive operation will wait for a default 1 minute for receiving a message before it times out. You can override it by using receiveMessages(int maxMessages, Duration maxWaitTime).

The 1-minute timeout is a client-side feature. Each time the application calls receiveMessages, a timer is started on the client that when expires will terminate the IterableStream returned from this method. Timeout being a client-side feature means it is impossible to cancel any message requests that already made it to the broker. The messages can still arrive in the background after the IterableStream is transitioned to terminated state due to the client-side timeout. If there is no active IterableStream, the client will attempt to release any buffered messages back to the broker to avoid messages from going to dead letter. While messages are being released, if a new active IterableStream appears (due to a new receiveMessages call) then client will stop further release, so application may receive some messages from the buffer or already in transit followed by previously released messages when broker redeliver them, which can appear as out of order delivery.

To keep the lock on each message received from a non-session resource (queue, topic subscription), the client will run a background task that will continuously renew the lock before it expires. By default, the lock renew task will run for a duration of 5 minutes, this duration can be adjusted using the ServiceBusReceiverClientBuilder#maxAutoLockRenewDuration(Duration) API or can be turned off by setting it Duration#ZERO. A higher maxMessages value means an equivalent number of lock renewal tasks running in the client, which may put more stress on low CPU environments. Given each lock renewal is a network call to the broker, a high number of lock renewal tasks making multiple lock renew calls also may have an adverse effect in namespace throttling. Additionally, if certain lock renewal tasks fail to renew the lock on time because of low CPU, service throttling or overloaded network, then client may lose the lock on the messages, which will cause the application's attempts to settle (e.g., complete, abandon) those messages to fail. The broker will redeliver those messages, but if the settling attempts fail repeatedly beyond the max delivery count, then the message will be transferred to dead letter queue. Keep this in mind when choosing maxMessages. You may consider disabling the client-side lock renewal using maxAutoLockRenewDuration(Duration.ZERO) if you can configure a lock duration at the resource (queue,topic subscription) level that at least exceeds the cumulative expected processing time for maxMessages messages.

The client uses an AMQP link underneath to receive the messages; the client will transparently transition to a new AMQP link if the current one encounters a retriable error. When the client experiences a non-retriable error or exhausts the retries, the iteration (e.g., forEach) on the IterableStream<T> returned by the further invocations of receiveMessages API will throw the error to the application. Once the application receives this error, the application should reset the client, i.e., close the current ServiceBusReceiverClient and create a new client to continue receiving messages.

Note: A few examples of non-retriable errors are - the application attempting to connect to a queue that does not exist, deleting or disabling the queue in the middle of receiving, the user explicitly initiating Geo-DR. These are certain events where the Service Bus communicates to the client that a non-retriable error occurred.

Parameters:

maxMessages - The maximum number of messages to receive.

Returns:

An IterableStream<T> of at most maxMessages messages from the Service Bus entity.

receiveMessages

public IterableStream receiveMessages(int maxMessages, Duration maxWaitTime)

Receives an iterable stream of ServiceBusReceivedMessage from the Service Bus entity with a timout. The default receive mode is PEEK_LOCK unless it is changed during creation of ServiceBusReceiverClient using ServiceBusReceiverClientBuilder#receiveMode(ServiceBusReceiveMode).

The support for timeout maxWaitTime is a client-side feature. Each time the application calls receiveMessages, a timer is started on the client that when expires will terminate the IterableStream returned from this method. Timeout being a client-side feature means it is impossible to cancel any message requests that already made it to the broker. The messages can still arrive in the background after the IterableStream is transitioned to terminated state due to the client-side timeout. If there is no active IterableStream, the client will attempt to release any buffered messages back to the broker to avoid messages from going to dead letter. While messages are being released, if a new active IterableStream appears (due to a new receiveMessages call) then client will stop further release, so application may receive some messages from the buffer or already in transit followed by previously released messages when broker redeliver them, which can appear as out of order delivery. Consider these when choosing the timeout. For example, a small timeout with a higher maxMessages value while there are a lot of messages in the entity can increase the release network calls to the broker that might have adverse effect in namespace throttling and increases the chances of out of order deliveries. Also, frequent receiveMessages with low timeout means frequent scheduling of timer tasks, which may put more stress on low CPU environments.

To keep the lock on each message received from a non-session resource (queue, topic subscription), the client will run a background task that will continuously renew the lock before it expires. By default, the lock renew task will run for a duration of 5 minutes, this duration can be adjusted using the ServiceBusReceiverClientBuilder#maxAutoLockRenewDuration(Duration) API or can be turned off by setting it Duration#ZERO. A higher maxMessages value means an equivalent number of lock renewal tasks running in the client, which may put more stress on low CPU environments. Given each lock renewal is a network call to the broker, a high number of lock renewal tasks making multiple lock renew calls also may have an adverse effect in namespace throttling. Additionally, if certain lock renewal tasks fail to renew the lock on time because of low CPU, service throttling or overloaded network, then client may lose the lock on the messages, which will cause the application's attempts to settle (e.g., complete, abandon) those messages to fail. The broker will redeliver those messages, but if the settling attempts fail repeatedly beyond the max delivery count, then the message will be transferred to dead letter queue. Keep this in mind when choosing maxMessages. You may consider disabling the client-side lock renewal using maxAutoLockRenewDuration(Duration.ZERO) if you can configure a lock duration at the resource (queue,topic subscription) level that at least exceeds the cumulative expected processing time for maxMessages messages.

The client uses an AMQP link underneath to receive the messages; the client will transparently transition to a new AMQP link if the current one encounters a retriable error. When the client experiences a non-retriable error or exhausts the retries, the iteration (e.g., forEach) on the IterableStream<T> returned by the further invocations of receiveMessages API will throw the error to the application. Once the application receives this error, the application should reset the client, i.e., close the current ServiceBusReceiverClient and create a new client to continue receiving messages.

Note: A few examples of non-retriable errors are - the application attempting to connect to a queue that does not exist, deleting or disabling the queue in the middle of receiving, the user explicitly initiating Geo-DR. These are certain events where the Service Bus communicates to the client that a non-retriable error occurred.

Parameters:

maxMessages - The maximum number of messages to receive.
maxWaitTime - The time the client waits for receiving a message before it times out.

Returns:

An IterableStream<T> of at most maxMessages messages from the Service Bus entity.

renewMessageLock

public OffsetDateTime renewMessageLock(ServiceBusReceivedMessage message)

Renews the lock on the specified message. The lock will be renewed based on the setting specified on the entity. When a message is received in PEEK_LOCK mode, the message is locked on the server for this receiver instance for a duration as specified during the Queue creation (LockDuration). If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the lock is reset to the entity's LockDuration value.

Parameters:

message - The ServiceBusReceivedMessage to perform lock renewal.

Returns:

The new expiration time for the message.

renewMessageLock

public void renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration, Consumer onError)

Starts the auto lock renewal for a message with the given lock.

Parameters:

message - The ServiceBusReceivedMessage to perform auto-lock renewal.
maxLockRenewalDuration - Maximum duration to keep renewing the lock token.
onError - A function to call when an error occurs during lock renewal.

renewSessionLock

public OffsetDateTime renewSessionLock()

Sets the state of the session if this receiver is a session receiver.

Returns:

The next expiration time for the session lock.

renewSessionLock

public void renewSessionLock(Duration maxLockRenewalDuration, Consumer onError)

Starts the auto lock renewal for the session that this receiver works for.

Parameters:

maxLockRenewalDuration - Maximum duration to keep renewing the session.
onError - A function to call when an error occurs during lock renewal.

rollbackTransaction

public void rollbackTransaction(ServiceBusTransactionContext transactionContext)

Rollbacks the transaction given and all operations associated with it.

Creating and using a transaction

ServiceBusTransactionContext transaction = receiver.createTransaction();

 // Process messages and associate operations with the transaction.
 ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
 receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
 receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
 receiver.commitTransaction(transaction);

Parameters:

transactionContext - The transaction to be rollback.

setSessionState

public void setSessionState(byte[] sessionState)

Sets the state of the session if this receiver is a session receiver.

Parameters:

sessionState - State to set on the session.

Applies to