Share via


ServiceBusReceiverAsyncClient Clase

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient

Implementaciones

public final class ServiceBusReceiverAsyncClient
implements AutoCloseable

Receptor asincrónico responsable de recibir ServiceBusReceivedMessage de una cola de Azure Service Bus o tema o suscripción.

Los ejemplos que se muestran en este documento usan un objeto de credencial denominado DefaultAzureCredential para la autenticación, que es adecuado para la mayoría de los escenarios, incluidos los entornos de desarrollo y producción locales. Además, se recomienda usar la identidad administrada para la autenticación en entornos de producción. Puede encontrar más información sobre las distintas formas de autenticación y sus tipos de credenciales correspondientes en la documentación de Azure Identity.

Ejemplo: Creación de un ServiceBusReceiverAsyncClient

En el ejemplo de código siguiente se muestra la creación del cliente ServiceBusReceiverAsyncClientasincrónico . fullyQualifiedNamespace es el nombre de host del espacio de nombres de Service Bus. Aparece en el panel "Essentials" después de navegar al espacio de nombres de Event Hubs a través de Azure Portal. La credencial usada se debe DefaultAzureCredential a que combina credenciales usadas habitualmente en la implementación y el desarrollo y elige la credencial que se usará en función de su entorno en ejecución. PEEK_LOCK (el modo de recepción predeterminado) y disableAutoComplete() se recomienda encarecidamente para que los usuarios tengan control sobre la liquidación de mensajes.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .receiver()
     .disableAutoComplete()
     .queueName(queueName)
     .buildAsyncClient();

 // When users are done with the receiver, dispose of the receiver.
 // Clients should be long-lived objects as they require resources
 // and time to establish a connection to the service.
 asyncReceiver.close();

Ejemplo: Recepción de todos los mensajes del recurso de Service Bus

Esto devuelve un flujo infinito de mensajes de Service Bus. La secuencia finaliza cuando se elimina la suscripción u otros escenarios de terminal. Consulte receiveMessages() para obtener más información.

// Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 // Consider using Flux.usingWhen to scope the creation, usage, and cleanup of the receiver.
 Disposable subscription = asyncReceiver.receiveMessages()
     .flatMap(message -> {
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());

         // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return asyncReceiver.complete(message);
         } else {
             return asyncReceiver.abandon(message);
         }
     })
     .subscribe(unused -> {
     }, error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

 // When program ends, or you're done receiving all messages, dispose of the receiver.
 // Clients should be long-lived objects as they
 // require resources and time to establish a connection to the service.
 asyncReceiver.close();

Ejemplo: Recepción de mensajes en RECEIVE_AND_DELETE modo desde una entidad de Service Bus

En el ejemplo de código siguiente se muestra la creación del cliente ServiceBusReceiverAsyncClient asincrónico mediante RECEIVE_AND_DELETE. fullyQualifiedNamespace es el nombre de host del espacio de nombres de Service Bus. Aparece en el panel "Essentials" después de navegar al espacio de nombres de Event Hubs a través de Azure Portal. La credencial usada se debe DefaultAzureCredential a que combina credenciales usadas habitualmente en la implementación y el desarrollo y elige la credencial que se usará en función de su entorno en ejecución. Consulte RECEIVE_AND_DELETE la documentación para obtener más información sobre cómo recibir mensajes mediante este modo.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 Disposable subscription = Flux.usingWhen(
         Mono.fromCallable(() -> {
             // Setting the receiveMode when creating the receiver enables receive and delete mode. By default,
             // peek lock mode is used. In peek lock mode, users are responsible for settling messages.
             return new ServiceBusClientBuilder()
                 .credential(fullyQualifiedNamespace, credential)
                 .receiver()
                 .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                 .queueName(queueName)
                 .buildAsyncClient();
         }), receiver -> {
             return receiver.receiveMessages();
         }, receiver -> {
             return Mono.fromRunnable(() -> receiver.close());
         })
     .subscribe(message -> {
             // Messages received in RECEIVE_AND_DELETE mode do not have to be settled because they are automatically
             // removed from the queue.
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());
     },
         error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

Ejemplo: Recepción de mensajes de una sesión específica

Para capturar mensajes de una sesión específica, cambie a ServiceBusSessionReceiverClientBuilder y compile el cliente receptor de sesión. Use acceptSession(String sessionId) para crear un enlazado a ServiceBusReceiverAsyncClientla sesión. En el ejemplo se supone que las sesiones de Service Bus se habilitaron en el momento de la creación de la cola.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

Ejemplo: Recepción de mensajes de la primera sesión disponible

Para procesar mensajes de la primera sesión disponible, cambie a ServiceBusSessionReceiverClientBuilder y compile el cliente receptor de sesión. Use acceptNextSession() para buscar la primera sesión disponible desde la que procesar los mensajes.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

Ejemplo: Limitación de velocidad del consumo de mensajes de una entidad de Service Bus

En el caso de los receptores de mensajes que necesitan limitar el número de mensajes que reciben en un momento dado, pueden usar BaseSubscriber#request(long).

// This is a non-blocking call. The program will move to the next line of code after setting up the operation.
 asyncReceiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() {
     private static final int NUMBER_OF_MESSAGES = 5;
     private final AtomicInteger currentNumberOfMessages = new AtomicInteger();

     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         // Tell the Publisher we only want 5 message at a time.
         request(NUMBER_OF_MESSAGES);
     }

     @Override
     protected void hookOnNext(ServiceBusReceivedMessage message) {
         // Process the ServiceBusReceivedMessage
         // If the number of messages we have currently received is a multiple of 5, that means we have reached
         // the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
         // that the subscriber is ready to get more messages from upstream.
         if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
             request(NUMBER_OF_MESSAGES);
         }
     }
 });

Resumen del método

Modificador y tipo Método y descripción
Mono<Void> abandon(ServiceBusReceivedMessage message)

Abandona un ServiceBusReceivedMessageobjeto .

Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions options)

Abandona una ServiceBusReceivedMessage actualización de las propiedades del mensaje.

void close()

Elimina el consumidor cerrando los vínculos subyacentes al servicio.

Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext)

Confirma la transacción y todas las operaciones asociadas a ella.

Mono<Void> complete(ServiceBusReceivedMessage message)

Completa un ServiceBusReceivedMessageobjeto .

Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions options)

Completa un ServiceBusReceivedMessage objeto con las opciones especificadas.

Mono<ServiceBusTransactionContext> createTransaction()

Inicia una nueva transacción del lado del servicio.

Mono<Void> deadLetter(ServiceBusReceivedMessage message)

Mueve un ServiceBusReceivedMessage objeto a la sub cola de mensajes fallidos.

Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

Mueve un ServiceBusReceivedMessage objeto a la sub cola de mensajes fallidos con las opciones especificadas.

Mono<Void> defer(ServiceBusReceivedMessage message)

Aplaza un ServiceBusReceivedMessageobjeto .

Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options)

Aplaza un ServiceBusReceivedMessage objeto con las opciones establecidas.

String getEntityPath()

Obtiene el recurso de Service Bus con el que interactúa este cliente.

String getFullyQualifiedNamespace()

Obtiene el espacio de nombres completo de Service Bus al que está asociada la conexión.

String getIdentifier()

Obtiene el identificador de la instancia de ServiceBusReceiverAsyncClient.

String getSessionId()

Obtiene elidentificador de sesión de la sesión si este receptor es un receptor de sesión.

Mono<byte[]> getSessionState()

Obtiene el estado de la sesión si este receptor es un receptor de sesión.

Mono<ServiceBusReceivedMessage> peekMessage()

Lee el siguiente mensaje activo sin cambiar el estado del receptor o el origen del mensaje.

Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber)

A partir del número de secuencia especificado, lee el siguiente mensaje activo sin cambiar el estado del receptor o el origen del mensaje.

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages)

Lee el siguiente lote de mensajes activos sin cambiar el estado del receptor o el origen del mensaje.

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber)

A partir del número de secuencia especificado, lee el siguiente lote de mensajes activos sin cambiar el estado del receptor o el origen del mensaje.

Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber)

Recibe un aplazado ServiceBusReceivedMessage.

Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers)

Recibe un lote de aplazado ServiceBusReceivedMessage.

Flux<ServiceBusReceivedMessage> receiveMessages()

Recibe un flujo infinito de ServiceBusReceivedMessage de la entidad de Service Bus.

Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)

Renueva de forma asincrónica el bloqueo en el mensaje.

Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

Inicia la renovación de bloqueo automático para .ServiceBusReceivedMessage

Mono<OffsetDateTime> renewSessionLock()

Renueva el bloqueo de sesión si este receptor es un receptor de sesión.

Mono<Void> renewSessionLock(Duration maxLockRenewalDuration)

Inicia la renovación de bloqueo automático para la sesión para la que funciona este receptor.

Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext)

Revierte la transacción dada y todas las operaciones asociadas a ella.

Mono<Void> setSessionState(byte[] sessionState)

Establece el estado de la sesión para la que funciona este receptor.

Métodos heredados de java.lang.Object

Detalles del método

abandon

public Mono abandon(ServiceBusReceivedMessage message)

Abandona un ServiceBusReceivedMessageobjeto . Esto hará que el mensaje vuelva a estar disponible para su procesamiento. Abandonar un mensaje aumentará el número de entregas en el mensaje.

Parameters:

message - que ServiceBusReceivedMessage se va a realizar esta operación.

Returns:

que Mono se completa cuando se completa la operación de abandono de Service Bus.

abandon

public Mono abandon(ServiceBusReceivedMessage message, AbandonOptions options)

Abandona una ServiceBusReceivedMessage actualización de las propiedades del mensaje. Esto hará que el mensaje vuelva a estar disponible para su procesamiento. Abandonar un mensaje aumentará el número de entregas en el mensaje.

Parameters:

message - ServiceBusReceivedMessage que se va a realizar esta operación.
options - Opciones que se van a establecer al abandonar el mensaje.

Returns:

que Mono se completa cuando finaliza la operación de Service Bus.

close

public void close()

Elimina el consumidor cerrando los vínculos subyacentes al servicio.

commitTransaction

public Mono commitTransaction(ServiceBusTransactionContext transactionContext)

Confirma la transacción y todas las operaciones asociadas a ella.

Creación y uso de una transacción

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - Transacción que se va a confirmar.

Returns:

que Mono finaliza esta operación en el recurso de Service Bus.

complete

public Mono complete(ServiceBusReceivedMessage message)

Completa un ServiceBusReceivedMessageobjeto . Esto eliminará el mensaje del servicio.

Parameters:

message - ServiceBusReceivedMessage que se va a realizar esta operación.

Returns:

que Mono finaliza cuando se completa el mensaje en Service Bus.

complete

public Mono complete(ServiceBusReceivedMessage message, CompleteOptions options)

Completa un ServiceBusReceivedMessage objeto con las opciones especificadas. Esto eliminará el mensaje del servicio.

Parameters:

message - ServiceBusReceivedMessage que se va a realizar esta operación.
options - Opciones usadas para completar el mensaje.

Returns:

que Mono finaliza cuando se completa el mensaje en Service Bus.

createTransaction

public Mono createTransaction()

Inicia una nueva transacción del lado del servicio. ServiceBusTransactionContext Debe pasarse a todas las operaciones que deben estar en esta transacción.

Creación y uso de una transacción

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Returns:

que Mono finaliza esta operación en el recurso de Service Bus.

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message)

Mueve un ServiceBusReceivedMessage objeto a la sub cola de mensajes fallidos.

Parameters:

message - ServiceBusReceivedMessage que se va a realizar esta operación.

Returns:

que Mono se completa cuando finaliza la operación de mensajes fallidos.

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

Mueve un ServiceBusReceivedMessage objeto a la sub cola de mensajes fallidos con las opciones especificadas.

Parameters:

message - ServiceBusReceivedMessage que se va a realizar esta operación.
options - Opciones usadas para enviar mensajes fallidos al mensaje.

Returns:

que Mono se completa cuando finaliza la operación de mensajes fallidos.

defer

public Mono defer(ServiceBusReceivedMessage message)

Aplaza un ServiceBusReceivedMessageobjeto . Esto moverá el mensaje a la sub cola diferida.

Parameters:

message - ServiceBusReceivedMessage que se va a realizar esta operación.

Returns:

que Mono se completa cuando finaliza la operación de aplazamiento de Service Bus.

defer

public Mono defer(ServiceBusReceivedMessage message, DeferOptions options)

Aplaza un ServiceBusReceivedMessage objeto con las opciones establecidas. Esto moverá el mensaje a la sub cola diferida.

Parameters:

message - ServiceBusReceivedMessage que se va a realizar esta operación.
options - Opciones usadas para aplazar el mensaje.

Returns:

que Mono se completa cuando finaliza la operación de aplazamiento.

getEntityPath

public String getEntityPath()

Obtiene el recurso de Service Bus con el que interactúa este cliente.

Returns:

El recurso de Service Bus con el que interactúa este cliente.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Obtiene el espacio de nombres completo de Service Bus al que está asociada la conexión. Es probable que sea similar a {yournamespace}.servicebus.windows.net.

Returns:

Espacio de nombres completo de Service Bus al que está asociada la conexión.

getIdentifier

public String getIdentifier()

Obtiene el identificador de la instancia de ServiceBusReceiverAsyncClient.

Returns:

Identificador que puede identificar la instancia de ServiceBusReceiverAsyncClient.

getSessionId

public String getSessionId()

Obtiene el SessionId de la sesión si este receptor es un receptor de sesión.

Returns:

SessionId o null si no es un receptor de sesión.

getSessionState

public Mono getSessionState()

Obtiene el estado de la sesión si este receptor es un receptor de sesión.

Returns:

Estado de sesión o mono vacío si no hay ningún estado establecido para la sesión.

peekMessage

public Mono peekMessage()

Lee el siguiente mensaje activo sin cambiar el estado del receptor o el origen del mensaje. La primera llamada para peek() capturar el primer mensaje activo para este receptor. Cada llamada subsiguiente captura el mensaje subsiguiente en la entidad.

Returns:

Un objeto que se ha inspeccionado ServiceBusReceivedMessage.

peekMessage

public Mono peekMessage(long sequenceNumber)

A partir del número de secuencia especificado, lee el siguiente mensaje activo sin cambiar el estado del receptor o el origen del mensaje.

Parameters:

sequenceNumber - Número de secuencia desde donde se va a leer el mensaje.

Returns:

Un objeto que se ha inspeccionado ServiceBusReceivedMessage.

peekMessages

public Flux peekMessages(int maxMessages)

Lee el siguiente lote de mensajes activos sin cambiar el estado del receptor o el origen del mensaje.

Parameters:

maxMessages - El número de mensajes.

Returns:

Un Flux de ServiceBusReceivedMessage que se ven.

peekMessages

public Flux peekMessages(int maxMessages, long sequenceNumber)

A partir del número de secuencia especificado, lee el siguiente lote de mensajes activos sin cambiar el estado del receptor o el origen del mensaje.

Parameters:

maxMessages - El número de mensajes.
sequenceNumber - Número de secuencia desde dónde empezar a leer los mensajes.

Returns:

receiveDeferredMessage

public Mono receiveDeferredMessage(long sequenceNumber)

Recibe un aplazado ServiceBusReceivedMessage. Los mensajes diferidos solo se pueden recibir mediante el número de secuencia.

Parameters:

sequenceNumber - getSequenceNumber() del mensaje.

Returns:

Mensaje diferido con la coincidencia sequenceNumber.

receiveDeferredMessages

public Flux receiveDeferredMessages(Iterable sequenceNumbers)

Recibe un lote de aplazado ServiceBusReceivedMessage. Los mensajes diferidos solo se pueden recibir mediante el número de secuencia.

Parameters:

sequenceNumbers - Números de secuencia de los mensajes aplazados.

Returns:

Flux de diferido ServiceBusReceivedMessage.

receiveMessages

public Flux receiveMessages()

Recibe un flujo infinito de ServiceBusReceivedMessage de la entidad de Service Bus. Flux recibe continuamente mensajes de una entidad de Service Bus hasta que:

  • El receptor está cerrado.
  • La suscripción a Flux se elimina.
  • Una señal de terminal de un suscriptor de bajada se propaga hacia arriba (es decir, Flux#take(long) o Flux#take(Duration)).
  • Se AmqpException produce que hace que el vínculo de recepción se detenga.

El cliente usa un vínculo AMQP debajo para recibir los mensajes; el cliente pasará de forma transparente a un nuevo vínculo de AMQP si el actual encuentra un error retriable. Cuando el cliente experimenta un error que no se puede reintentar o agota los reintentos, se notificará al controlador de terminal del org.reactivestreams.Subscriber#onError(Throwable) suscriptor con este error. No se entregará ningún mensaje adicional a org.reactivestreams.Subscriber#onNext(Object) después del evento de terminal; la aplicación debe crear un nuevo cliente para reanudar la recepción. Volver a suscribirse al flux del cliente antiguo no tendrá ningún efecto.

Nota: Algunos ejemplos de errores no reintenibles son: la aplicación que intenta conectarse a una cola que no existe, elimina o deshabilita la cola en medio de la recepción, el usuario inicia explícitamente la recuperación ante desastres geográfica. Estos son ciertos eventos en los que Service Bus se comunica con el cliente de que se produjo un error no reintenible.

Returns:

Flujo infinito de mensajes de la entidad de Service Bus.

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message)

Renueva de forma asincrónica el bloqueo en el mensaje. El bloqueo se renovará en función de la configuración especificada en la entidad. Cuando se recibe un mensaje en PEEK_LOCK modo , el mensaje se bloquea en el servidor para esta instancia de receptor durante una duración especificada durante la creación de la entidad (LockDuration). Si el procesamiento del mensaje requiere más tiempo que esta duración, el bloqueo debe renovarse. Para cada renovación, el bloqueo se restablece al valor LockDuration de la entidad.

Parameters:

message - que ServiceBusReceivedMessage se va a realizar la renovación de bloqueo automático.

Returns:

Nueva hora de expiración del mensaje.

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

Inicia la renovación de bloqueo automático para .ServiceBusReceivedMessage

Parameters:

message - que ServiceBusReceivedMessage se va a realizar esta operación.
maxLockRenewalDuration - Duración máxima para mantener la renovación del token de bloqueo.

Returns:

Mono que se completa cuando la operación de renovación de mensajes se ha completado hasta maxLockRenewalDuration.

renewSessionLock

public Mono renewSessionLock()

Renueva el bloqueo de sesión si este receptor es un receptor de sesión.

Returns:

La próxima hora de expiración para el bloqueo de sesión.

renewSessionLock

public Mono renewSessionLock(Duration maxLockRenewalDuration)

Inicia la renovación de bloqueo automático para la sesión para la que funciona este receptor.

Parameters:

maxLockRenewalDuration - Duración máxima para mantener la renovación del bloqueo de sesión.

Returns:

Una operación de renovación de bloqueo para el mensaje.

rollbackTransaction

public Mono rollbackTransaction(ServiceBusTransactionContext transactionContext)

Revierte la transacción dada y todas las operaciones asociadas a ella.

Creación y uso de una transacción

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - Transacción que se va a revertir.

Returns:

que Mono finaliza esta operación en el recurso de Service Bus.

setSessionState

public Mono setSessionState(byte[] sessionState)

Establece el estado de la sesión para la que funciona este receptor.

Parameters:

sessionState - Estado que se va a establecer en la sesión.

Returns:

Mono que se completa cuando se establece la sesión

Se aplica a