ServiceBusReceiverAsyncClient Classe
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusReceiverAsyncClient
- com.
Implementações
public final class ServiceBusReceiverAsyncClient
implements AutoCloseable
Um receptor assíncrono responsável por receber ServiceBusReceivedMessage de uma fila de Barramento de Serviço do Azure ou tópico/assinatura.
Os exemplos mostrados neste documento usam um objeto de credencial chamado DefaultAzureCredential para autenticação, que é apropriado para a maioria dos cenários, incluindo ambientes locais de desenvolvimento e produção. Além disso, recomendamos usar a identidade gerenciada para autenticação em ambientes de produção. Você pode encontrar mais informações sobre diferentes maneiras de autenticação e seus tipos de credencial correspondentes na documentação da Identidade do Azure".
Exemplo: criando um ServiceBusReceiverAsyncClient
O exemplo de código a seguir demonstra a criação do cliente ServiceBusReceiverAsyncClientassíncrono . O fullyQualifiedNamespace
é o nome do host do namespace do Barramento de Serviço. Ele é listado no painel "Essentials" depois de navegar até o Namespace dos Hubs de Eventos por meio do Portal do Azure. A credencial usada é DefaultAzureCredential
porque combina credenciais comumente usadas na implantação e desenvolvimento e escolhe a credencial a ser usada com base em seu ambiente de execução. PEEK_LOCK (o modo de recebimento padrão) e disableAutoComplete() são altamente recomendados para que os usuários tenham controle sobre a liquidação de mensagens.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.receiver()
.disableAutoComplete()
.queueName(queueName)
.buildAsyncClient();
// When users are done with the receiver, dispose of the receiver.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
asyncReceiver.close();
Exemplo: receber todas as mensagens do recurso do Barramento de Serviço
Isso retorna um fluxo infinito de mensagens do Barramento de Serviço. O fluxo termina quando a assinatura é descartada ou outros cenários de terminal. Consulte receiveMessages() para obter mais informações.
// Keep a reference to `subscription`. When the program is finished receiving messages, call
// subscription.dispose(). This will stop fetching messages from the Service Bus.
// Consider using Flux.usingWhen to scope the creation, usage, and cleanup of the receiver.
Disposable subscription = asyncReceiver.receiveMessages()
.flatMap(message -> {
System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
System.out.printf("Contents of message as string: %s%n", message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return asyncReceiver.complete(message);
} else {
return asyncReceiver.abandon(message);
}
})
.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
// When program ends, or you're done receiving all messages, dispose of the receiver.
// Clients should be long-lived objects as they
// require resources and time to establish a connection to the service.
asyncReceiver.close();
Exemplo: receber mensagens no RECEIVE_AND_DELETE modo de uma entidade do Barramento de Serviço
O exemplo de código a seguir demonstra a criação do cliente ServiceBusReceiverAsyncClient assíncrono usando RECEIVE_AND_DELETE. O fullyQualifiedNamespace
é o nome do host do namespace do Barramento de Serviço. Ele é listado no painel "Essentials" depois de navegar até o Namespace dos Hubs de Eventos por meio do Portal do Azure. A credencial usada é DefaultAzureCredential
porque combina credenciais comumente usadas na implantação e desenvolvimento e escolhe a credencial a ser usada com base em seu ambiente de execução. Consulte RECEIVE_AND_DELETE documentos para obter mais informações sobre como receber mensagens usando esse modo.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// Keep a reference to `subscription`. When the program is finished receiving messages, call
// subscription.dispose(). This will stop fetching messages from the Service Bus.
Disposable subscription = Flux.usingWhen(
Mono.fromCallable(() -> {
// Setting the receiveMode when creating the receiver enables receive and delete mode. By default,
// peek lock mode is used. In peek lock mode, users are responsible for settling messages.
return new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.receiver()
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.queueName(queueName)
.buildAsyncClient();
}), receiver -> {
return receiver.receiveMessages();
}, receiver -> {
return Mono.fromRunnable(() -> receiver.close());
})
.subscribe(message -> {
// Messages received in RECEIVE_AND_DELETE mode do not have to be settled because they are automatically
// removed from the queue.
System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
System.out.printf("Contents of message as string: %s%n", message.getBody());
},
error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
Exemplo: receber mensagens de uma sessão específica
Para buscar mensagens de uma sessão específica, alterne para ServiceBusSessionReceiverClientBuilder e crie o cliente receptor da sessão. Use acceptSession(String sessionId) para criar uma sessão associada a ServiceBusReceiverAsyncClient. O exemplo pressupõe que as sessões do Barramento de Serviço foram habilitadas no momento da criação da fila.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
// successfully locked.
// `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
// operations complete.
// `Mono.usingWhen` can also be used if the resource closure returns a single item.
Flux<Void> sessionMessages = Flux.usingWhen(
sessionReceiver.acceptSession("<<my-session-id>>"),
receiver -> {
// Receive messages from <<my-session-id>> session.
return receiver.receiveMessages().flatMap(message -> {
System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
});
},
receiver -> Mono.fromRunnable(() -> {
// Dispose of resources.
receiver.close();
sessionReceiver.close();
}));
// When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
// is non-blocking and kicks off the operation.
Disposable subscription = sessionMessages.subscribe(
unused -> {
}, error -> System.err.print("Error receiving message from session: " + error),
() -> System.out.println("Completed receiving from session."));
Exemplo: receber mensagens da primeira sessão disponível
Para processar mensagens da primeira sessão disponível, alterne para ServiceBusSessionReceiverClientBuilder e crie o cliente receptor da sessão. Use acceptNextSession() para localizar a primeira sessão disponível para processar mensagens.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// Creates a client to receive messages from the first available session. It waits until
// AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
// completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages().flatMap(message -> {
System.out.println("Received message: " + message.getBody());
// Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
}),
receiver -> Mono.fromRunnable(() -> {
// Dispose of the receiver and sessionReceiver when done receiving messages.
receiver.close();
sessionReceiver.close();
}));
// This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
// operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
// receiving messages.
Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
Exemplo: taxa que limita o consumo de mensagens de uma entidade do Barramento de Serviço
Para receptores de mensagens que precisam limitar o número de mensagens recebidas em um determinado momento, eles podem usar BaseSubscriber#request(long).
// This is a non-blocking call. The program will move to the next line of code after setting up the operation.
asyncReceiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() {
private static final int NUMBER_OF_MESSAGES = 5;
private final AtomicInteger currentNumberOfMessages = new AtomicInteger();
@Override
protected void hookOnSubscribe(Subscription subscription) {
// Tell the Publisher we only want 5 message at a time.
request(NUMBER_OF_MESSAGES);
}
@Override
protected void hookOnNext(ServiceBusReceivedMessage message) {
// Process the ServiceBusReceivedMessage
// If the number of messages we have currently received is a multiple of 5, that means we have reached
// the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
// that the subscriber is ready to get more messages from upstream.
if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
request(NUMBER_OF_MESSAGES);
}
}
});
Resumo do método
Modificador e tipo | Método e descrição |
---|---|
Mono<Void> |
abandon(ServiceBusReceivedMessage message)
Abandona um ServiceBusReceivedMessage. |
Mono<Void> |
abandon(ServiceBusReceivedMessage message, AbandonOptions options)
Abandona uma ServiceBusReceivedMessage atualização das propriedades da mensagem. |
void |
close()
Descarta o consumidor fechando os links subjacentes para o serviço. |
Mono<Void> |
commitTransaction(ServiceBusTransactionContext transactionContext)
Confirma a transação e todas as operações associadas a ela. |
Mono<Void> |
complete(ServiceBusReceivedMessage message)
Conclui um ServiceBusReceivedMessage. |
Mono<Void> |
complete(ServiceBusReceivedMessage message, CompleteOptions options)
Conclui um ServiceBusReceivedMessage com as opções fornecidas. |
Mono<Service |
createTransaction()
Inicia uma nova transação do lado do serviço. |
Mono<Void> |
deadLetter(ServiceBusReceivedMessage message)
Move um ServiceBusReceivedMessage para a sub-fila de mensagens mortas. |
Mono<Void> |
deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)
Move um ServiceBusReceivedMessage para a sub-fila de mensagens mortas com as opções fornecidas. |
Mono<Void> |
defer(ServiceBusReceivedMessage message)
Adia um ServiceBusReceivedMessage. |
Mono<Void> |
defer(ServiceBusReceivedMessage message, DeferOptions options)
Adia um ServiceBusReceivedMessage com o conjunto de opções. |
String |
getEntityPath()
Obtém o recurso do Barramento de Serviço com o qual esse cliente interage. |
String |
getFullyQualifiedNamespace()
Obtém o namespace do Barramento de Serviço totalmente qualificado ao qual a conexão está associada. |
String |
getIdentifier()
Obtém o identificador da instância de ServiceBusReceiverAsyncClient. |
String |
getSessionId()
Obtém a ID de Sessão |
Mono<byte[]> |
getSessionState()
Obtém o estado da sessão se esse receptor for um receptor de sessão. |
Mono<Service |
peekMessage()
Lê a próxima mensagem ativa sem alterar o estado do receptor ou a origem da mensagem. |
Mono<Service |
peekMessage(long sequenceNumber)
A partir do número de sequência especificado, lê em seguida a mensagem ativa sem alterar o estado do receptor ou da origem da mensagem. |
Flux<Service |
peekMessages(int maxMessages)
Lê o próximo lote de mensagens ativas sem alterar o estado do receptor ou a origem da mensagem. |
Flux<Service |
peekMessages(int maxMessages, long sequenceNumber)
A partir do número de sequência especificado, lê o próximo lote de mensagens ativas sem alterar o estado do receptor ou a origem da mensagem. |
Mono<Service |
receiveDeferredMessage(long sequenceNumber)
Recebe um adiado ServiceBusReceivedMessage. |
Flux<Service |
receiveDeferredMessages(Iterable<Long> sequenceNumbers)
Recebe um lote de adiado ServiceBusReceivedMessage. |
Flux<Service |
receiveMessages()
Recebe um fluxo infinito de ServiceBusReceivedMessage da entidade do Barramento de Serviço. |
Mono<Offset |
renewMessageLock(ServiceBusReceivedMessage message)
Renova de forma assíncrona o bloqueio na mensagem. |
Mono<Void> |
renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)
Inicia a renovação de bloqueio automático para um ServiceBusReceivedMessage. |
Mono<Offset |
renewSessionLock()
Renova o bloqueio de sessão se esse receptor for um receptor de sessão. |
Mono<Void> |
renewSessionLock(Duration maxLockRenewalDuration)
Inicia a renovação de bloqueio automático para a sessão para a qual esse receptor funciona. |
Mono<Void> |
rollbackTransaction(ServiceBusTransactionContext transactionContext)
Reverte a transação fornecida e todas as operações associadas a ela. |
Mono<Void> |
setSessionState(byte[] sessionState)
Define o estado da sessão para o qual esse receptor funciona. |
Métodos herdados de java.lang.Object
Detalhes do método
abandon
public Mono
Abandona um ServiceBusReceivedMessage. Isso disponibilizará a mensagem novamente para processamento. Abandonar uma mensagem aumentará a contagem de entrega na mensagem.
Parameters:
Returns:
abandon
public Mono
Abandona uma ServiceBusReceivedMessage atualização das propriedades da mensagem. Isso disponibilizará a mensagem novamente para processamento. Abandonar uma mensagem aumentará a contagem de entrega na mensagem.
Parameters:
Returns:
close
public void close()
Descarta o consumidor fechando os links subjacentes para o serviço.
commitTransaction
public Mono
Confirma a transação e todas as operações associadas a ela.
Criando e usando uma transação
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Parameters:
Returns:
complete
public Mono
Conclui um ServiceBusReceivedMessage. Isso excluirá a mensagem do serviço.
Parameters:
Returns:
complete
public Mono
Conclui um ServiceBusReceivedMessage com as opções fornecidas. Isso excluirá a mensagem do serviço.
Parameters:
Returns:
createTransaction
public Mono
Inicia uma nova transação do lado do serviço. O ServiceBusTransactionContext deve ser passado para todas as operações que precisam estar nessa transação.
Criando e usando uma transação
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Returns:
deadLetter
public Mono
Move um ServiceBusReceivedMessage para a sub-fila de mensagens mortas.
Parameters:
Returns:
deadLetter
public Mono
Move um ServiceBusReceivedMessage para a sub-fila de mensagens mortas com as opções fornecidas.
Parameters:
Returns:
defer
public Mono
Adia um ServiceBusReceivedMessage. Isso moverá a mensagem para a sub-fila adiada.
Parameters:
Returns:
defer
public Mono
Adia um ServiceBusReceivedMessage com o conjunto de opções. Isso moverá a mensagem para a sub-fila adiada.
Parameters:
Returns:
getEntityPath
public String getEntityPath()
Obtém o recurso do Barramento de Serviço com o qual esse cliente interage.
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
Obtém o namespace do Barramento de Serviço totalmente qualificado ao qual a conexão está associada. Isso provavelmente é semelhante a {yournamespace}.servicebus.windows.net
.
Returns:
getIdentifier
public String getIdentifier()
Obtém o identificador da instância de ServiceBusReceiverAsyncClient.
Returns:
getSessionId
public String getSessionId()
Obtém a SessionId da sessão se esse receptor for um receptor de sessão.
Returns:
getSessionState
public Mono
Obtém o estado da sessão se esse receptor for um receptor de sessão.
Returns:
peekMessage
public Mono
Lê a próxima mensagem ativa sem alterar o estado do receptor ou a origem da mensagem. A primeira chamada para peek()
buscar a primeira mensagem ativa para esse receptor. Cada chamada subsequente busca a mensagem subsequente na entidade.
Returns:
peekMessage
public Mono
A partir do número de sequência especificado, lê em seguida a mensagem ativa sem alterar o estado do receptor ou da origem da mensagem.
Parameters:
Returns:
peekMessages
public Flux
Lê o próximo lote de mensagens ativas sem alterar o estado do receptor ou a origem da mensagem.
Parameters:
Returns:
peekMessages
public Flux
A partir do número de sequência especificado, lê o próximo lote de mensagens ativas sem alterar o estado do receptor ou a origem da mensagem.
Parameters:
Returns:
receiveDeferredMessage
public Mono
Recebe um adiado ServiceBusReceivedMessage. Mensagens adiadas só podem ser recebidas usando o número de sequência.
Parameters:
Returns:
sequenceNumber
.receiveDeferredMessages
public Flux
Recebe um lote de adiado ServiceBusReceivedMessage. Mensagens adiadas só podem ser recebidas usando o número de sequência.
Parameters:
Returns:
receiveMessages
public Flux
Recebe um fluxo infinito de ServiceBusReceivedMessage da entidade do Barramento de Serviço. Esse Flux recebe continuamente mensagens de uma entidade do Barramento de Serviço até:
- O receptor está fechado.
- A assinatura do Flux é descartada.
- Um sinal de terminal de um assinante downstream é propagado upstream (ou seja, Flux#take(long) ou Flux#take(Duration)).
- Ocorre um AmqpException que faz com que o link de recebimento pare.
O cliente usa um link AMQP abaixo para receber as mensagens; o cliente fará a transição transparente para um novo link AMQP se o atual encontrar um erro retriável. Quando o cliente apresentar um erro não retriável ou esgotar as tentativas, o manipulador de terminal do org.reactivestreams.Subscriber#onError(Throwable) Assinante será notificado com esse erro. Nenhuma outra mensagem será entregue org.reactivestreams.Subscriber#onNext(Object) após o evento de terminal; o aplicativo deve criar um novo cliente para retomar o recebimento. A reinscrever o Flux do cliente antigo não terá efeito.
Observação: alguns exemplos de erros não retriáveis são : o aplicativo que tenta se conectar a uma fila que não existe, excluindo ou desabilitando a fila no meio do recebimento, o usuário iniciando explicitamente a Geo-DR. Esses são determinados eventos em que o Barramento de Serviço comunica ao cliente que ocorreu um erro não retriável.
Returns:
renewMessageLock
public Mono
Renova de forma assíncrona o bloqueio na mensagem. O bloqueio será renovado com base na configuração especificada na entidade . Quando uma mensagem é recebida no PEEK_LOCK modo , a mensagem é bloqueada no servidor para essa instância de receptor por uma duração conforme especificado durante a criação da entidade (LockDuration). Se o processamento da mensagem exigir mais tempo do que essa duração, o bloqueio precisará ser renovado. Para cada renovação, o bloqueio é redefinido para o valor LockDuration da entidade.
Parameters:
Returns:
renewMessageLock
public Mono
Inicia a renovação de bloqueio automático para um ServiceBusReceivedMessage.
Parameters:
Returns:
maxLockRenewalDuration
.renewSessionLock
public Mono
Renova o bloqueio de sessão se esse receptor for um receptor de sessão.
Returns:
renewSessionLock
public Mono
Inicia a renovação de bloqueio automático para a sessão para a qual esse receptor funciona.
Parameters:
Returns:
rollbackTransaction
public Mono
Reverte a transação fornecida e todas as operações associadas a ela.
Criando e usando uma transação
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Parameters:
Returns:
setSessionState
public Mono
Define o estado da sessão para o qual esse receptor funciona.
Parameters:
Returns:
Aplica-se a
Azure SDK for Java