Compartir a través de


ServiceBusSessionReceiverAsyncClient Clase

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient

Implementaciones

public final class ServiceBusSessionReceiverAsyncClient
implements AutoCloseable

Este cliente receptor de sesión asincrónico se usa para adquirir bloqueos de sesión de una cola o tema y crear ServiceBusReceiverAsyncClient instancias vinculadas a las sesiones bloqueadas. Las sesiones se pueden usar como una primera entrada, primero en salir (FIFO) procesamiento de mensajes. Las colas y temas o suscripciones admiten sesiones de Service Bus; sin embargo, se debe habilitar en el momento de la creación de la entidad.

Los ejemplos que se muestran en este documento usan un objeto de credencial denominado DefaultAzureCredential para la autenticación, que es adecuado para la mayoría de los escenarios, incluidos los entornos de desarrollo y producción locales. Además, se recomienda usar la identidad administrada para la autenticación en entornos de producción. Puede encontrar más información sobre las distintas formas de autenticación y sus tipos de credenciales correspondientes en la documentación de Identidad de Azure".

Ejemplo: Recibir mensajes de una sesión específica

Use acceptSession(String sessionId) para adquirir el bloqueo de una sesión si conoce el identificador de sesión y PEEK_LOCKdisableAutoComplete() se recomienda encarecidamente para que los usuarios tengan control sobre la liquidación de mensajes.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

Ejemplo: Recibir mensajes de la primera sesión disponible

Use acceptNextSession() para adquirir el bloqueo de la siguiente sesión disponible sin especificar el identificador de sesión.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

Resumen del método

Modificador y tipo Método y descripción
Mono<ServiceBusReceiverAsyncClient> acceptNextSession()

Adquiere un bloqueo de sesión para la siguiente sesión disponible y crea un ServiceBusReceiverAsyncClient para recibir mensajes de la sesión.

Mono<ServiceBusReceiverAsyncClient> acceptSession(String sessionId)

Adquiere un bloqueo de sesión para sessionId y crea un ServiceBusReceiverAsyncClient para recibir mensajes de la sesión.

void close()

Métodos heredados de java.lang.Object

Detalles del método

acceptNextSession

public Mono acceptNextSession()

Adquiere un bloqueo de sesión para la siguiente sesión disponible y crea un ServiceBusReceiverAsyncClient para recibir mensajes de la sesión. Esperará hasta que una sesión esté disponible si no hay ninguna disponible inmediatamente.

Returns:

que ServiceBusReceiverAsyncClient está vinculado a la sesión disponible.

acceptSession

public Mono acceptSession(String sessionId)

Adquiere un bloqueo de sesión para sessionId y crea un ServiceBusReceiverAsyncClient para recibir mensajes de la sesión. Si la sesión ya está bloqueada por otro cliente, se produce una AmqpException excepción .

Parameters:

sessionId - Identificador de sesión.

Returns:

que ServiceBusReceiverAsyncClient está vinculado a la sesión especificada.

close

public void close()

Se aplica a