你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

ServiceBusSessionReceiverAsyncClient 类

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient

实现

public final class ServiceBusSessionReceiverAsyncClient
implements AutoCloseable

异步 会话接收方客户端用于从队列或主题获取会话锁,并创建 ServiceBusReceiverAsyncClient 绑定到锁定会话的实例。 会话可用作先入先出 (FIFO) 消息处理。 队列和主题/订阅支持服务总线会话,但是,必须在 创建实体时启用它。

本文档中显示的示例使用名为 DefaultAzureCredential 的凭据对象进行身份验证,该对象适用于大多数方案,包括本地开发和生产环境。 此外,我们建议使用 托管标识 在生产环境中进行身份验证。 可以在 Azure 标识文档中找到有关不同身份验证方式及其相应凭据类型的详细信息。

示例:从特定会话接收消息

如果知道会话 ID,则使用 acceptSession(String sessionId) 获取会话的锁。 PEEK_LOCK强烈建议使用 和 disableAutoComplete() ,以便用户可以控制消息解决。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

示例:从第一个可用会话接收消息

使用 acceptNextSession() 获取下一个可用会话的锁,而不指定会话 ID。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

方法摘要

修饰符和类型 方法和描述
Mono<ServiceBusReceiverAsyncClient> acceptNextSession()

获取下一个可用会话的会话锁,并创建 ServiceBusReceiverAsyncClient 用于从会话接收消息的 。

Mono<ServiceBusReceiverAsyncClient> acceptSession(String sessionId)

获取 的 sessionId 会话锁,并创建 以 ServiceBusReceiverAsyncClient 从会话接收消息。

void close()

方法继承自 java.lang.Object

方法详细信息

acceptNextSession

public Mono acceptNextSession()

获取下一个可用会话的会话锁,并创建 ServiceBusReceiverAsyncClient 用于从会话接收消息的 。 如果没有立即可用,它将等待会话可用。

Returns:

绑定到 ServiceBusReceiverAsyncClient 可用会话的 。

acceptSession

public Mono acceptSession(String sessionId)

获取 的 sessionId 会话锁,并创建 以 ServiceBusReceiverAsyncClient 从会话接收消息。 如果会话已被另一个客户端锁定, AmqpException 则会引发 。

Parameters:

sessionId - 会话 ID。

Returns:

绑定到 ServiceBusReceiverAsyncClient 指定会话的 。

close

public void close()

适用于