你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

ServiceBusReceiverAsyncClient 类

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient

实现

public final class ServiceBusReceiverAsyncClient
implements AutoCloseable

负责从Azure 服务总线队列或主题/订阅接收ServiceBusReceivedMessage异步接收方。

本文档中显示的示例使用名为 DefaultAzureCredential 的凭据对象进行身份验证,该对象适用于大多数方案,包括本地开发和生产环境。 此外,我们建议使用 托管标识 在生产环境中进行身份验证。 可以在 Azure 标识文档中找到有关不同身份验证方式及其相应凭据类型的详细信息。

示例:创建 ServiceBusReceiverAsyncClient

以下代码示例演示如何创建异步客户端 ServiceBusReceiverAsyncClientfullyQualifiedNamespace是服务总线命名空间的主机名。 通过 Azure 门户导航到事件中心命名空间后,它列在“概要”面板下。 使用的凭据是 DefaultAzureCredential 因为它合并了部署和开发中常用的凭据,并根据其运行环境选择要使用的凭据。 PEEK_LOCK (默认接收模式) ,disableAutoComplete()强烈建议用户控制消息解决。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .receiver()
     .disableAutoComplete()
     .queueName(queueName)
     .buildAsyncClient();

 // When users are done with the receiver, dispose of the receiver.
 // Clients should be long-lived objects as they require resources
 // and time to establish a connection to the service.
 asyncReceiver.close();

示例:从服务总线资源接收所有消息

这会从服务总线返回无限的消息流。 当释放订阅或其他终端方案时,流结束。 有关详细信息,请参阅receiveMessages()

// Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 // Consider using Flux.usingWhen to scope the creation, usage, and cleanup of the receiver.
 Disposable subscription = asyncReceiver.receiveMessages()
     .flatMap(message -> {
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());

         // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return asyncReceiver.complete(message);
         } else {
             return asyncReceiver.abandon(message);
         }
     })
     .subscribe(unused -> {
     }, error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

 // When program ends, or you're done receiving all messages, dispose of the receiver.
 // Clients should be long-lived objects as they
 // require resources and time to establish a connection to the service.
 asyncReceiver.close();

示例:从服务总线实体以模式 RECEIVE_AND_DELETE 接收消息

下面的代码示例演示如何使用 RECEIVE_AND_DELETE创建异步客户端ServiceBusReceiverAsyncClientfullyQualifiedNamespace是服务总线命名空间的主机名。 通过 Azure 门户导航到事件中心命名空间后,它列在“概要”面板下。 使用的凭据是 DefaultAzureCredential 因为它合并了部署和开发中常用的凭据,并根据其运行环境选择要使用的凭据。 有关使用此模式接收消息的详细信息,请参阅 RECEIVE_AND_DELETE 文档。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 Disposable subscription = Flux.usingWhen(
         Mono.fromCallable(() -> {
             // Setting the receiveMode when creating the receiver enables receive and delete mode. By default,
             // peek lock mode is used. In peek lock mode, users are responsible for settling messages.
             return new ServiceBusClientBuilder()
                 .credential(fullyQualifiedNamespace, credential)
                 .receiver()
                 .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                 .queueName(queueName)
                 .buildAsyncClient();
         }), receiver -> {
             return receiver.receiveMessages();
         }, receiver -> {
             return Mono.fromRunnable(() -> receiver.close());
         })
     .subscribe(message -> {
             // Messages received in RECEIVE_AND_DELETE mode do not have to be settled because they are automatically
             // removed from the queue.
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());
     },
         error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

示例:从特定会话接收消息

若要从特定会话提取消息,请切换到 ServiceBusSessionReceiverClientBuilder 并生成会话接收方客户端。 使用 acceptSession(String sessionId) 创建会话绑定 ServiceBusReceiverAsyncClient的 。 此示例假定在 创建队列时启用了服务总线会话。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

示例:从第一个可用会话接收消息

若要处理来自第一个可用会话的消息,请切换到 ServiceBusSessionReceiverClientBuilder 并生成会话接收方客户端。 使用 acceptNextSession() 查找要处理消息的第一个可用会话。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

示例:限制来自服务总线实体的消息的速率消耗

对于需要限制在给定时间接收的消息数的消息接收者,可以使用 BaseSubscriber#request(long)

// This is a non-blocking call. The program will move to the next line of code after setting up the operation.
 asyncReceiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() {
     private static final int NUMBER_OF_MESSAGES = 5;
     private final AtomicInteger currentNumberOfMessages = new AtomicInteger();

     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         // Tell the Publisher we only want 5 message at a time.
         request(NUMBER_OF_MESSAGES);
     }

     @Override
     protected void hookOnNext(ServiceBusReceivedMessage message) {
         // Process the ServiceBusReceivedMessage
         // If the number of messages we have currently received is a multiple of 5, that means we have reached
         // the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
         // that the subscriber is ready to get more messages from upstream.
         if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
             request(NUMBER_OF_MESSAGES);
         }
     }
 });

方法摘要

修饰符和类型 方法和描述
Mono<Void> abandon(ServiceBusReceivedMessage message)

放弃 ServiceBusReceivedMessage

Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions options)

ServiceBusReceivedMessage放弃更新消息的属性。

void close()

通过关闭指向服务的基础链接释放使用者。

Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext)

提交事务及其关联的所有操作。

Mono<Void> complete(ServiceBusReceivedMessage message)

完成 ServiceBusReceivedMessage

Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions options)

ServiceBusReceivedMessage使用给定选项完成 。

Mono<ServiceBusTransactionContext> createTransaction()

启动新的服务端事务。

Mono<Void> deadLetter(ServiceBusReceivedMessage message)

将 移动到 ServiceBusReceivedMessage 死信子队列。

Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

使用给定选项将 移动到 ServiceBusReceivedMessage 死信子队列。

Mono<Void> defer(ServiceBusReceivedMessage message)

ServiceBusReceivedMessage延迟 。

Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options)

延迟并 ServiceBusReceivedMessage 设置选项。

String getEntityPath()

获取此客户端与之交互的服务总线资源。

String getFullyQualifiedNamespace()

获取与连接关联的完全限定的服务总线命名空间。

String getIdentifier()

获取 实例的 ServiceBusReceiverAsyncClient标识符。

String getSessionId()

如果此接收方是会话接收器,则获取会话的会话 ID。

Mono<byte[]> getSessionState()

如果此接收器是会话接收器,则获取会话的状态。

Mono<ServiceBusReceivedMessage> peekMessage()

在不更改接收方或消息源的状态的情况下读取下一个活动消息。

Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber)

从给定序列号开始,在不更改接收方或消息源的状态的情况下,读取活动消息的下一步。

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages)

在不更改接收方或消息源的状态的情况下读取下一批活动消息。

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber)

从给定序列号开始,在不更改接收方或消息源的状态的情况下读取下一批活动消息。

Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber)

接收延迟 ServiceBusReceivedMessage的 。

Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers)

接收一批延迟 ServiceBusReceivedMessage的 。

Flux<ServiceBusReceivedMessage> receiveMessages()

从服务总线实体接收 ServiceBusReceivedMessage无限流。

Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)

异步续订消息的锁。

Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

启动 的 ServiceBusReceivedMessage自动锁定续订。

Mono<OffsetDateTime> renewSessionLock()

如果此接收方是会话接收器,则续订会话锁。

Mono<Void> renewSessionLock(Duration maxLockRenewalDuration)

为接收方工作的会话启动自动锁定续订。

Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext)

回滚给定的事务及其关联的所有操作。

Mono<Void> setSessionState(byte[] sessionState)

设置此接收方用于的会话的状态。

方法继承自 java.lang.Object

方法详细信息

abandon

public Mono abandon(ServiceBusReceivedMessage message)

放弃 ServiceBusReceivedMessage。 这会使消息再次可供处理。 放弃邮件会增加邮件的传递计数。

Parameters:

message - 要 ServiceBusReceivedMessage 执行此操作的 。

Returns:

Mono 服务总线放弃操作完成时完成的 。

abandon

public Mono abandon(ServiceBusReceivedMessage message, AbandonOptions options)

ServiceBusReceivedMessage放弃更新消息的属性。 这会使消息再次可供处理。 放弃邮件会增加邮件的传递计数。

Parameters:

message - ServiceBusReceivedMessage要执行此操作的 。
options - 放弃消息时要设置的选项。

Returns:

Mono 服务总线操作完成时完成的 。

close

public void close()

通过关闭指向服务的基础链接释放使用者。

commitTransaction

public Mono commitTransaction(ServiceBusTransactionContext transactionContext)

提交事务及其关联的所有操作。

创建和使用事务

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - 要提交的事务。

Returns:

Mono 服务总线资源上完成此操作的 。

complete

public Mono complete(ServiceBusReceivedMessage message)

完成 ServiceBusReceivedMessage。 这会从服务中删除消息。

Parameters:

message - ServiceBusReceivedMessage要执行此操作的 。

Returns:

Mono 服务总线上完成消息时完成的 。

complete

public Mono complete(ServiceBusReceivedMessage message, CompleteOptions options)

ServiceBusReceivedMessage使用给定选项完成 。 这会从服务中删除消息。

Parameters:

message - ServiceBusReceivedMessage要执行此操作的 。
options - 用于完成消息的选项。

Returns:

Mono 服务总线上完成消息时完成的 。

createTransaction

public Mono createTransaction()

启动新的服务端事务。 ServiceBusTransactionContext应将 传递给需要在此事务中的所有操作。

创建和使用事务

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Returns:

Mono 服务总线资源上完成此操作的 。

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message)

将 移动到 ServiceBusReceivedMessage 死信子队列。

Parameters:

message - ServiceBusReceivedMessage要执行此操作的 。

Returns:

Mono 死信操作完成时完成的 。

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

使用给定选项将 移动到 ServiceBusReceivedMessage 死信子队列。

Parameters:

message - ServiceBusReceivedMessage要执行此操作的 。
options - 用于对消息进行死信的选项。

Returns:

Mono 死信操作完成时完成的 。

defer

public Mono defer(ServiceBusReceivedMessage message)

ServiceBusReceivedMessage延迟 。 这会将消息移动到延迟的子队列中。

Parameters:

message - ServiceBusReceivedMessage要执行此操作的 。

Returns:

Mono 服务总线延迟操作完成时完成的 。

defer

public Mono defer(ServiceBusReceivedMessage message, DeferOptions options)

延迟并 ServiceBusReceivedMessage 设置选项。 这会将消息移动到延迟的子队列中。

Parameters:

message - ServiceBusReceivedMessage要执行此操作的 。
options - 用于延迟消息的选项。

Returns:

Mono 延迟操作完成时完成的 。

getEntityPath

public String getEntityPath()

获取此客户端与之交互的服务总线资源。

Returns:

此客户端与之交互的服务总线资源。

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

获取与连接关联的完全限定的服务总线命名空间。 这可能类似于 {yournamespace}.servicebus.windows.net

Returns:

与连接关联的完全限定的服务总线命名空间。

getIdentifier

public String getIdentifier()

获取 实例的 ServiceBusReceiverAsyncClient标识符。

Returns:

可以标识 实例的 ServiceBusReceiverAsyncClient标识符。

getSessionId

public String getSessionId()

如果此接收方是会话接收器,则获取会话的 SessionId。

Returns:

如果这不是会话接收器,则为 SessionId 或 null。

getSessionState

public Mono getSessionState()

如果此接收器是会话接收器,则获取会话的状态。

Returns:

会话状态或空 Mono(如果会话未设置状态)。

peekMessage

public Mono peekMessage()

在不更改接收方或消息源的状态的情况下读取下一个活动消息。 对 的第一次调用 peek() 提取此接收方的第一条活动消息。 每个后续调用都会提取实体中的后续消息。

Returns:

peekMessage

public Mono peekMessage(long sequenceNumber)

从给定序列号开始,在不更改接收方或消息源的状态的情况下,读取活动消息的下一步。

Parameters:

sequenceNumber - 从何处读取消息的序列号。

Returns:

peekMessages

public Flux peekMessages(int maxMessages)

在不更改接收方或消息源的状态的情况下读取下一批活动消息。

Parameters:

maxMessages - 消息数。

Returns:

FluxServiceBusReceivedMessage的 , 被扫视。

peekMessages

public Flux peekMessages(int maxMessages, long sequenceNumber)

从给定序列号开始,在不更改接收方或消息源的状态的情况下读取下一批活动消息。

Parameters:

maxMessages - 消息数。
sequenceNumber - 从何处开始读取消息的序列号。

Returns:

一个FluxServiceBusReceivedMessage扫视。

receiveDeferredMessage

public Mono receiveDeferredMessage(long sequenceNumber)

接收延迟 ServiceBusReceivedMessage的 。 只能使用序列号接收延迟的消息。

Parameters:

sequenceNumber - 消息的 getSequenceNumber()

Returns:

具有匹配 sequenceNumber的延迟消息。

receiveDeferredMessages

public Flux receiveDeferredMessages(Iterable sequenceNumbers)

接收一批延迟 ServiceBusReceivedMessage的 。 只能使用序列号接收延迟的消息。

Parameters:

sequenceNumbers - 延迟消息的序列号。

Returns:

Flux延迟 ServiceBusReceivedMessage的 。

receiveMessages

public Flux receiveMessages()

从服务总线实体接收 的ServiceBusReceivedMessage无限流。 此 Flux 持续接收来自服务总线实体的消息,直到:

  • 接收器已关闭。
  • 释放了 Flux 的订阅。
  • 来自下游订阅服务器的终端信号上游 (即传播。 Flux#take(long)Flux#take(Duration))。
  • 导致 AmqpException 接收链接停止的 。

客户端使用下面的 AMQP 链接来接收消息;如果当前链接遇到可重试的错误,客户端将以透明方式转换为新的 AMQP 链接。 当客户端遇到不可重试的错误或重试用完时,订阅服务器的 org.reactivestreams.Subscriber#onError(Throwable) 终端处理程序将收到此错误的通知。 终端事件后不会再将消息传递到 org.reactivestreams.Subscriber#onNext(Object) ;应用程序必须创建一个新客户端才能恢复接收。 重新订阅旧客户端的 Flux 将不起作用。

注意:不可重试错误的一些示例包括 - 应用程序尝试连接到不存在的队列,在接收时删除或禁用队列,用户显式启动异地 DR。 这些是服务总线向客户端传达发生不可重试错误的某些事件。

Returns:

来自服务总线实体的 无限 消息流。

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message)

异步续订消息的锁。 将根据实体上指定的设置续订锁。 当在 PEEK_LOCK 模式下收到消息时,此接收方实例在服务器上锁定消息,在实体创建期间 (LockDuration) 中指定的持续时间。 如果消息的处理时间超过此持续时间,则需要续订锁。 对于每次续订,锁将重置为实体的 LockDuration 值。

Parameters:

message - 要 ServiceBusReceivedMessage 执行自动锁定续订的 。

Returns:

消息的新过期时间。

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

启动 的 ServiceBusReceivedMessage自动锁定续订。

Parameters:

message - 要 ServiceBusReceivedMessage 执行此操作的 。
maxLockRenewalDuration - 持续续订锁定令牌的最长持续时间。

Returns:

在消息续订操作完成到 之前 maxLockRenewalDuration完成的 Mono。

renewSessionLock

public Mono renewSessionLock()

如果此接收方是会话接收器,则续订会话锁。

Returns:

会话锁的下一个过期时间。

renewSessionLock

public Mono renewSessionLock(Duration maxLockRenewalDuration)

为接收方工作的会话启动自动锁定续订。

Parameters:

maxLockRenewalDuration - 持续续订会话锁的最长持续时间。

Returns:

消息的锁定续订操作。

rollbackTransaction

public Mono rollbackTransaction(ServiceBusTransactionContext transactionContext)

回滚给定的事务及其关联的所有操作。

创建和使用事务

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - 要回滚的事务。

Returns:

Mono 服务总线资源完成此操作的 。

setSessionState

public Mono setSessionState(byte[] sessionState)

设置此接收方用于的会话的状态。

Parameters:

sessionState - 要对会话设置的状态。

Returns:

设置会话时完成的 Mono

适用于