Partilhar via


ServiceBusReceiverAsyncClient Classe

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient

Implementações

public final class ServiceBusReceiverAsyncClient
implements AutoCloseable

Um receptor assíncrono responsável por receber ServiceBusReceivedMessage de uma fila de Barramento de Serviço do Azure ou tópico/assinatura.

Os exemplos mostrados neste documento usam um objeto de credencial chamado DefaultAzureCredential para autenticação, que é apropriado para a maioria dos cenários, incluindo ambientes locais de desenvolvimento e produção. Além disso, recomendamos usar a identidade gerenciada para autenticação em ambientes de produção. Você pode encontrar mais informações sobre diferentes maneiras de autenticação e seus tipos de credencial correspondentes na documentação da Identidade do Azure".

Exemplo: criando um ServiceBusReceiverAsyncClient

O exemplo de código a seguir demonstra a criação do cliente ServiceBusReceiverAsyncClientassíncrono . O fullyQualifiedNamespace é o nome do host do namespace do Barramento de Serviço. Ele é listado no painel "Essentials" depois de navegar até o Namespace dos Hubs de Eventos por meio do Portal do Azure. A credencial usada é DefaultAzureCredential porque combina credenciais comumente usadas na implantação e desenvolvimento e escolhe a credencial a ser usada com base em seu ambiente de execução. PEEK_LOCK (o modo de recebimento padrão) e disableAutoComplete() são altamente recomendados para que os usuários tenham controle sobre a liquidação de mensagens.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .receiver()
     .disableAutoComplete()
     .queueName(queueName)
     .buildAsyncClient();

 // When users are done with the receiver, dispose of the receiver.
 // Clients should be long-lived objects as they require resources
 // and time to establish a connection to the service.
 asyncReceiver.close();

Exemplo: receber todas as mensagens do recurso do Barramento de Serviço

Isso retorna um fluxo infinito de mensagens do Barramento de Serviço. O fluxo termina quando a assinatura é descartada ou outros cenários de terminal. Consulte receiveMessages() para obter mais informações.

// Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 // Consider using Flux.usingWhen to scope the creation, usage, and cleanup of the receiver.
 Disposable subscription = asyncReceiver.receiveMessages()
     .flatMap(message -> {
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());

         // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return asyncReceiver.complete(message);
         } else {
             return asyncReceiver.abandon(message);
         }
     })
     .subscribe(unused -> {
     }, error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

 // When program ends, or you're done receiving all messages, dispose of the receiver.
 // Clients should be long-lived objects as they
 // require resources and time to establish a connection to the service.
 asyncReceiver.close();

Exemplo: receber mensagens no RECEIVE_AND_DELETE modo de uma entidade do Barramento de Serviço

O exemplo de código a seguir demonstra a criação do cliente ServiceBusReceiverAsyncClient assíncrono usando RECEIVE_AND_DELETE. O fullyQualifiedNamespace é o nome do host do namespace do Barramento de Serviço. Ele é listado no painel "Essentials" depois de navegar até o Namespace dos Hubs de Eventos por meio do Portal do Azure. A credencial usada é DefaultAzureCredential porque combina credenciais comumente usadas na implantação e desenvolvimento e escolhe a credencial a ser usada com base em seu ambiente de execução. Consulte RECEIVE_AND_DELETE documentos para obter mais informações sobre como receber mensagens usando esse modo.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 Disposable subscription = Flux.usingWhen(
         Mono.fromCallable(() -> {
             // Setting the receiveMode when creating the receiver enables receive and delete mode. By default,
             // peek lock mode is used. In peek lock mode, users are responsible for settling messages.
             return new ServiceBusClientBuilder()
                 .credential(fullyQualifiedNamespace, credential)
                 .receiver()
                 .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                 .queueName(queueName)
                 .buildAsyncClient();
         }), receiver -> {
             return receiver.receiveMessages();
         }, receiver -> {
             return Mono.fromRunnable(() -> receiver.close());
         })
     .subscribe(message -> {
             // Messages received in RECEIVE_AND_DELETE mode do not have to be settled because they are automatically
             // removed from the queue.
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());
     },
         error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

Exemplo: receber mensagens de uma sessão específica

Para buscar mensagens de uma sessão específica, alterne para ServiceBusSessionReceiverClientBuilder e crie o cliente receptor da sessão. Use acceptSession(String sessionId) para criar uma sessão associada a ServiceBusReceiverAsyncClient. O exemplo pressupõe que as sessões do Barramento de Serviço foram habilitadas no momento da criação da fila.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

Exemplo: receber mensagens da primeira sessão disponível

Para processar mensagens da primeira sessão disponível, alterne para ServiceBusSessionReceiverClientBuilder e crie o cliente receptor da sessão. Use acceptNextSession() para localizar a primeira sessão disponível para processar mensagens.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

Exemplo: taxa que limita o consumo de mensagens de uma entidade do Barramento de Serviço

Para receptores de mensagens que precisam limitar o número de mensagens recebidas em um determinado momento, eles podem usar BaseSubscriber#request(long).

// This is a non-blocking call. The program will move to the next line of code after setting up the operation.
 asyncReceiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() {
     private static final int NUMBER_OF_MESSAGES = 5;
     private final AtomicInteger currentNumberOfMessages = new AtomicInteger();

     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         // Tell the Publisher we only want 5 message at a time.
         request(NUMBER_OF_MESSAGES);
     }

     @Override
     protected void hookOnNext(ServiceBusReceivedMessage message) {
         // Process the ServiceBusReceivedMessage
         // If the number of messages we have currently received is a multiple of 5, that means we have reached
         // the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
         // that the subscriber is ready to get more messages from upstream.
         if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
             request(NUMBER_OF_MESSAGES);
         }
     }
 });

Resumo do método

Modificador e tipo Método e descrição
Mono<Void> abandon(ServiceBusReceivedMessage message)

Abandona um ServiceBusReceivedMessage.

Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions options)

Abandona uma ServiceBusReceivedMessage atualização das propriedades da mensagem.

void close()

Descarta o consumidor fechando os links subjacentes para o serviço.

Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext)

Confirma a transação e todas as operações associadas a ela.

Mono<Void> complete(ServiceBusReceivedMessage message)

Conclui um ServiceBusReceivedMessage.

Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions options)

Conclui um ServiceBusReceivedMessage com as opções fornecidas.

Mono<ServiceBusTransactionContext> createTransaction()

Inicia uma nova transação do lado do serviço.

Mono<Void> deadLetter(ServiceBusReceivedMessage message)

Move um ServiceBusReceivedMessage para a sub-fila de mensagens mortas.

Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

Move um ServiceBusReceivedMessage para a sub-fila de mensagens mortas com as opções fornecidas.

Mono<Void> defer(ServiceBusReceivedMessage message)

Adia um ServiceBusReceivedMessage.

Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options)

Adia um ServiceBusReceivedMessage com o conjunto de opções.

String getEntityPath()

Obtém o recurso do Barramento de Serviço com o qual esse cliente interage.

String getFullyQualifiedNamespace()

Obtém o namespace do Barramento de Serviço totalmente qualificado ao qual a conexão está associada.

String getIdentifier()

Obtém o identificador da instância de ServiceBusReceiverAsyncClient.

String getSessionId()

Obtém a ID de Sessãoda sessão se esse receptor for um receptor de sessão.

Mono<byte[]> getSessionState()

Obtém o estado da sessão se esse receptor for um receptor de sessão.

Mono<ServiceBusReceivedMessage> peekMessage()

Lê a próxima mensagem ativa sem alterar o estado do receptor ou a origem da mensagem.

Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber)

A partir do número de sequência especificado, lê em seguida a mensagem ativa sem alterar o estado do receptor ou da origem da mensagem.

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages)

Lê o próximo lote de mensagens ativas sem alterar o estado do receptor ou a origem da mensagem.

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber)

A partir do número de sequência especificado, lê o próximo lote de mensagens ativas sem alterar o estado do receptor ou a origem da mensagem.

Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber)

Recebe um adiado ServiceBusReceivedMessage.

Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers)

Recebe um lote de adiado ServiceBusReceivedMessage.

Flux<ServiceBusReceivedMessage> receiveMessages()

Recebe um fluxo infinito de ServiceBusReceivedMessage da entidade do Barramento de Serviço.

Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)

Renova de forma assíncrona o bloqueio na mensagem.

Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

Inicia a renovação de bloqueio automático para um ServiceBusReceivedMessage.

Mono<OffsetDateTime> renewSessionLock()

Renova o bloqueio de sessão se esse receptor for um receptor de sessão.

Mono<Void> renewSessionLock(Duration maxLockRenewalDuration)

Inicia a renovação de bloqueio automático para a sessão para a qual esse receptor funciona.

Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext)

Reverte a transação fornecida e todas as operações associadas a ela.

Mono<Void> setSessionState(byte[] sessionState)

Define o estado da sessão para o qual esse receptor funciona.

Métodos herdados de java.lang.Object

Detalhes do método

abandon

public Mono abandon(ServiceBusReceivedMessage message)

Abandona um ServiceBusReceivedMessage. Isso disponibilizará a mensagem novamente para processamento. Abandonar uma mensagem aumentará a contagem de entrega na mensagem.

Parameters:

message - O ServiceBusReceivedMessage para executar essa operação.

Returns:

Um Mono que é concluído quando a operação de abandono do Barramento de Serviço é concluída.

abandon

public Mono abandon(ServiceBusReceivedMessage message, AbandonOptions options)

Abandona uma ServiceBusReceivedMessage atualização das propriedades da mensagem. Isso disponibilizará a mensagem novamente para processamento. Abandonar uma mensagem aumentará a contagem de entrega na mensagem.

Parameters:

message - O ServiceBusReceivedMessage para executar essa operação.
options - As opções a serem definidas ao abandonar a mensagem.

Returns:

Um Mono que é concluído quando a operação do Barramento de Serviço é concluída.

close

public void close()

Descarta o consumidor fechando os links subjacentes para o serviço.

commitTransaction

public Mono commitTransaction(ServiceBusTransactionContext transactionContext)

Confirma a transação e todas as operações associadas a ela.

Criando e usando uma transação

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - A transação a ser confirmada.

Returns:

O Mono que conclui essa operação no recurso do barramento de serviço.

complete

public Mono complete(ServiceBusReceivedMessage message)

Conclui um ServiceBusReceivedMessage. Isso excluirá a mensagem do serviço.

Parameters:

message - O ServiceBusReceivedMessage para executar essa operação.

Returns:

Um Mono que termina quando a mensagem é concluída no Barramento de Serviço.

complete

public Mono complete(ServiceBusReceivedMessage message, CompleteOptions options)

Conclui um ServiceBusReceivedMessage com as opções fornecidas. Isso excluirá a mensagem do serviço.

Parameters:

message - O ServiceBusReceivedMessage para executar essa operação.
options - Opções usadas para concluir a mensagem.

Returns:

Um Mono que termina quando a mensagem é concluída no Barramento de Serviço.

createTransaction

public Mono createTransaction()

Inicia uma nova transação do lado do serviço. O ServiceBusTransactionContext deve ser passado para todas as operações que precisam estar nessa transação.

Criando e usando uma transação

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Returns:

O Mono que conclui essa operação no recurso do barramento de serviço.

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message)

Move um ServiceBusReceivedMessage para a sub-fila de mensagens mortas.

Parameters:

message - O ServiceBusReceivedMessage para executar essa operação.

Returns:

Um Mono que é concluído quando a operação de mensagens mortas é concluída.

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

Move um ServiceBusReceivedMessage para a sub-fila de mensagens mortas com as opções fornecidas.

Parameters:

message - O ServiceBusReceivedMessage para executar essa operação.
options - Opções usadas para enviar a mensagem com mensagens mortas.

Returns:

Um Mono que é concluído quando a operação de mensagens mortas é concluída.

defer

public Mono defer(ServiceBusReceivedMessage message)

Adia um ServiceBusReceivedMessage. Isso moverá a mensagem para a sub-fila adiada.

Parameters:

message - O ServiceBusReceivedMessage para executar essa operação.

Returns:

Um Mono que é concluído quando a operação de adiamento do Barramento de Serviço é concluída.

defer

public Mono defer(ServiceBusReceivedMessage message, DeferOptions options)

Adia um ServiceBusReceivedMessage com o conjunto de opções. Isso moverá a mensagem para a sub-fila adiada.

Parameters:

message - O ServiceBusReceivedMessage para executar essa operação.
options - Opções usadas para adiar a mensagem.

Returns:

Um Mono que é concluído quando a operação de adiamento é concluída.

getEntityPath

public String getEntityPath()

Obtém o recurso do Barramento de Serviço com o qual esse cliente interage.

Returns:

O recurso do Barramento de Serviço com o qual esse cliente interage.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Obtém o namespace do Barramento de Serviço totalmente qualificado ao qual a conexão está associada. Isso provavelmente é semelhante a {yournamespace}.servicebus.windows.net.

Returns:

O namespace do Barramento de Serviço totalmente qualificado ao qual a conexão está associada.

getIdentifier

public String getIdentifier()

Obtém o identificador da instância de ServiceBusReceiverAsyncClient.

Returns:

O identificador que pode identificar a instância do ServiceBusReceiverAsyncClient.

getSessionId

public String getSessionId()

Obtém a SessionId da sessão se esse receptor for um receptor de sessão.

Returns:

SessionId ou null se este não for um receptor de sessão.

getSessionState

public Mono getSessionState()

Obtém o estado da sessão se esse receptor for um receptor de sessão.

Returns:

O estado da sessão ou um Mono vazio se não houver nenhum estado definido para a sessão.

peekMessage

public Mono peekMessage()

Lê a próxima mensagem ativa sem alterar o estado do receptor ou a origem da mensagem. A primeira chamada para peek() buscar a primeira mensagem ativa para esse receptor. Cada chamada subsequente busca a mensagem subsequente na entidade.

Returns:

peekMessage

public Mono peekMessage(long sequenceNumber)

A partir do número de sequência especificado, lê em seguida a mensagem ativa sem alterar o estado do receptor ou da origem da mensagem.

Parameters:

sequenceNumber - O número de sequência de onde ler a mensagem.

Returns:

peekMessages

public Flux peekMessages(int maxMessages)

Lê o próximo lote de mensagens ativas sem alterar o estado do receptor ou a origem da mensagem.

Parameters:

maxMessages - O número de mensagens.

Returns:

Um Flux de ServiceBusReceivedMessage que são espiados.

peekMessages

public Flux peekMessages(int maxMessages, long sequenceNumber)

A partir do número de sequência especificado, lê o próximo lote de mensagens ativas sem alterar o estado do receptor ou a origem da mensagem.

Parameters:

maxMessages - O número de mensagens.
sequenceNumber - O número de sequência de onde começar a ler mensagens.

Returns:

Um Flux de ServiceBusReceivedMessage espiado.

receiveDeferredMessage

public Mono receiveDeferredMessage(long sequenceNumber)

Recebe um adiado ServiceBusReceivedMessage. Mensagens adiadas só podem ser recebidas usando o número de sequência.

Parameters:

sequenceNumber - O getSequenceNumber() da mensagem.

Returns:

Uma mensagem adiada com o correspondente sequenceNumber.

receiveDeferredMessages

public Flux receiveDeferredMessages(Iterable sequenceNumbers)

Recebe um lote de adiado ServiceBusReceivedMessage. Mensagens adiadas só podem ser recebidas usando o número de sequência.

Parameters:

sequenceNumbers - Os números de sequência das mensagens adiadas.

Returns:

Um Flux de adiado ServiceBusReceivedMessage.

receiveMessages

public Flux receiveMessages()

Recebe um fluxo infinito de ServiceBusReceivedMessage da entidade do Barramento de Serviço. Esse Flux recebe continuamente mensagens de uma entidade do Barramento de Serviço até:

  • O receptor está fechado.
  • A assinatura do Flux é descartada.
  • Um sinal de terminal de um assinante downstream é propagado upstream (ou seja, Flux#take(long) ou Flux#take(Duration)).
  • Ocorre um AmqpException que faz com que o link de recebimento pare.

O cliente usa um link AMQP abaixo para receber as mensagens; o cliente fará a transição transparente para um novo link AMQP se o atual encontrar um erro retriável. Quando o cliente apresentar um erro não retriável ou esgotar as tentativas, o manipulador de terminal do org.reactivestreams.Subscriber#onError(Throwable) Assinante será notificado com esse erro. Nenhuma outra mensagem será entregue org.reactivestreams.Subscriber#onNext(Object) após o evento de terminal; o aplicativo deve criar um novo cliente para retomar o recebimento. A reinscrever o Flux do cliente antigo não terá efeito.

Observação: alguns exemplos de erros não retriáveis são : o aplicativo que tenta se conectar a uma fila que não existe, excluindo ou desabilitando a fila no meio do recebimento, o usuário iniciando explicitamente a Geo-DR. Esses são determinados eventos em que o Barramento de Serviço comunica ao cliente que ocorreu um erro não retriável.

Returns:

Um fluxo infinito de mensagens da entidade do Barramento de Serviço.

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message)

Renova de forma assíncrona o bloqueio na mensagem. O bloqueio será renovado com base na configuração especificada na entidade . Quando uma mensagem é recebida no PEEK_LOCK modo , a mensagem é bloqueada no servidor para essa instância de receptor por uma duração conforme especificado durante a criação da entidade (LockDuration). Se o processamento da mensagem exigir mais tempo do que essa duração, o bloqueio precisará ser renovado. Para cada renovação, o bloqueio é redefinido para o valor LockDuration da entidade.

Parameters:

message - O ServiceBusReceivedMessage para executar a renovação de bloqueio automático.

Returns:

O novo tempo de expiração para a mensagem.

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

Inicia a renovação de bloqueio automático para um ServiceBusReceivedMessage.

Parameters:

message - O ServiceBusReceivedMessage para executar essa operação.
maxLockRenewalDuration - Duração máxima para continuar renovando o token de bloqueio.

Returns:

Um Mono que é concluído quando a operação de renovação de mensagem é concluída até maxLockRenewalDuration.

renewSessionLock

public Mono renewSessionLock()

Renova o bloqueio de sessão se esse receptor for um receptor de sessão.

Returns:

O próximo tempo de expiração para o bloqueio de sessão.

renewSessionLock

public Mono renewSessionLock(Duration maxLockRenewalDuration)

Inicia a renovação de bloqueio automático para a sessão para a qual esse receptor funciona.

Parameters:

maxLockRenewalDuration - Duração máxima para continuar renovando o bloqueio de sessão.

Returns:

Uma operação de renovação de bloqueio para a mensagem.

rollbackTransaction

public Mono rollbackTransaction(ServiceBusTransactionContext transactionContext)

Reverte a transação fornecida e todas as operações associadas a ela.

Criando e usando uma transação

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - A transação a ser revertida.

Returns:

O Mono que conclui essa operação no recurso do barramento de serviço.

setSessionState

public Mono setSessionState(byte[] sessionState)

Define o estado da sessão para o qual esse receptor funciona.

Parameters:

sessionState - Estado a ser definido na sessão.

Returns:

Um Mono que é concluído quando a sessão é definida

Aplica-se a