ServiceBusSessionReceiverAsyncClient Klasse
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusSessionReceiverAsyncClient
- com.
Implementiert
public final class ServiceBusSessionReceiverAsyncClient
implements AutoCloseable
Dieser asynchrone Sitzungsempfängerclient wird verwendet, um Sitzungssperren aus einer Warteschlange oder einem Thema abzurufen und Instanzen zu erstellen ServiceBusReceiverAsyncClient , die an die gesperrten Sitzungen gebunden sind. Sitzungen können als FIFO-Verarbeitung (First In, First Out) von Nachrichten verwendet werden. Warteschlangen und Themen/Abonnements unterstützen Service Bus-Sitzungen, sie müssen jedoch zum Zeitpunkt der Entitätserstellung aktiviert werden.
Die in diesem Dokument gezeigten Beispiele verwenden ein Anmeldeinformationsobjekt namens DefaultAzureCredential für die Authentifizierung, das für die meisten Szenarien geeignet ist, einschließlich lokaler Entwicklungs- und Produktionsumgebungen. Darüber hinaus wird die Verwendung einer verwalteten Identität für die Authentifizierung in Produktionsumgebungen empfohlen. Weitere Informationen zu verschiedenen Authentifizierungsmethoden und den entsprechenden Anmeldeinformationstypen finden Sie in der Dokumentation zu Azure Identity."
Beispiel: Empfangen von Nachrichten aus einer bestimmten Sitzung
Verwenden Sie acceptSession(String sessionId) , um die Sperre einer Sitzung zu erhalten, wenn Sie die Sitzungs-ID kennen. PEEK_LOCK und disableAutoComplete() wird dringend empfohlen, damit Benutzer die Kontrolle über die Nachrichtenabwicklung haben.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
// successfully locked.
// `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
// operations complete.
// `Mono.usingWhen` can also be used if the resource closure returns a single item.
Flux<Void> sessionMessages = Flux.usingWhen(
sessionReceiver.acceptSession("<<my-session-id>>"),
receiver -> {
// Receive messages from <<my-session-id>> session.
return receiver.receiveMessages().flatMap(message -> {
System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
});
},
receiver -> Mono.fromRunnable(() -> {
// Dispose of resources.
receiver.close();
sessionReceiver.close();
}));
// When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
// is non-blocking and kicks off the operation.
Disposable subscription = sessionMessages.subscribe(
unused -> {
}, error -> System.err.print("Error receiving message from session: " + error),
() -> System.out.println("Completed receiving from session."));
Beispiel: Empfangen von Nachrichten aus der ersten verfügbaren Sitzung
Verwenden Sie acceptNextSession() , um die Sperre der nächsten verfügbaren Sitzung abzurufen, ohne die Sitzungs-ID anzugeben.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// Creates a client to receive messages from the first available session. It waits until
// AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
// completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages().flatMap(message -> {
System.out.println("Received message: " + message.getBody());
// Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
}),
receiver -> Mono.fromRunnable(() -> {
// Dispose of the receiver and sessionReceiver when done receiving messages.
receiver.close();
sessionReceiver.close();
}));
// This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
// operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
// receiving messages.
Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
Methodenzusammenfassung
Modifizierer und Typ | Methode und Beschreibung |
---|---|
Mono<Service |
acceptNextSession()
Ruft eine Sitzungssperre für die nächste verfügbare Sitzung ab und erstellt eine ServiceBusReceiverAsyncClient , um Nachrichten von der Sitzung zu empfangen. |
Mono<Service |
acceptSession(String sessionId)
Ruft eine Sitzungssperre für |
void | close() |
Geerbte Methoden von java.lang.Object
Details zur Methode
acceptNextSession
public Mono
Ruft eine Sitzungssperre für die nächste verfügbare Sitzung ab und erstellt eine ServiceBusReceiverAsyncClient , um Nachrichten von der Sitzung zu empfangen. Es wird gewartet, bis eine Sitzung verfügbar ist, wenn keine sofort verfügbar ist.
Returns:
acceptSession
public Mono
Ruft eine Sitzungssperre für sessionId
ab und erstellt eine ServiceBusReceiverAsyncClient , um Nachrichten von der Sitzung zu empfangen. Wenn die Sitzung bereits von einem anderen Client gesperrt ist, wird eine AmqpException ausgelöst.
Parameters:
Returns:
close
public void close()
Gilt für:
Azure SDK for Java