Freigeben über


ServiceBusSessionReceiverAsyncClient Klasse

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient

Implementiert

public final class ServiceBusSessionReceiverAsyncClient
implements AutoCloseable

Dieser asynchrone Sitzungsempfängerclient wird verwendet, um Sitzungssperren aus einer Warteschlange oder einem Thema abzurufen und Instanzen zu erstellen ServiceBusReceiverAsyncClient , die an die gesperrten Sitzungen gebunden sind. Sitzungen können als FIFO-Verarbeitung (First In, First Out) von Nachrichten verwendet werden. Warteschlangen und Themen/Abonnements unterstützen Service Bus-Sitzungen, sie müssen jedoch zum Zeitpunkt der Entitätserstellung aktiviert werden.

Die in diesem Dokument gezeigten Beispiele verwenden ein Anmeldeinformationsobjekt namens DefaultAzureCredential für die Authentifizierung, das für die meisten Szenarien geeignet ist, einschließlich lokaler Entwicklungs- und Produktionsumgebungen. Darüber hinaus wird die Verwendung einer verwalteten Identität für die Authentifizierung in Produktionsumgebungen empfohlen. Weitere Informationen zu verschiedenen Authentifizierungsmethoden und den entsprechenden Anmeldeinformationstypen finden Sie in der Dokumentation zu Azure Identity."

Beispiel: Empfangen von Nachrichten aus einer bestimmten Sitzung

Verwenden Sie acceptSession(String sessionId) , um die Sperre einer Sitzung zu erhalten, wenn Sie die Sitzungs-ID kennen. PEEK_LOCK und disableAutoComplete() wird dringend empfohlen, damit Benutzer die Kontrolle über die Nachrichtenabwicklung haben.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

Beispiel: Empfangen von Nachrichten aus der ersten verfügbaren Sitzung

Verwenden Sie acceptNextSession() , um die Sperre der nächsten verfügbaren Sitzung abzurufen, ohne die Sitzungs-ID anzugeben.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

Methodenzusammenfassung

Modifizierer und Typ Methode und Beschreibung
Mono<ServiceBusReceiverAsyncClient> acceptNextSession()

Ruft eine Sitzungssperre für die nächste verfügbare Sitzung ab und erstellt eine ServiceBusReceiverAsyncClient , um Nachrichten von der Sitzung zu empfangen.

Mono<ServiceBusReceiverAsyncClient> acceptSession(String sessionId)

Ruft eine Sitzungssperre für sessionId ab und erstellt eine ServiceBusReceiverAsyncClient , um Nachrichten von der Sitzung zu empfangen.

void close()

Geerbte Methoden von java.lang.Object

Details zur Methode

acceptNextSession

public Mono acceptNextSession()

Ruft eine Sitzungssperre für die nächste verfügbare Sitzung ab und erstellt eine ServiceBusReceiverAsyncClient , um Nachrichten von der Sitzung zu empfangen. Es wird gewartet, bis eine Sitzung verfügbar ist, wenn keine sofort verfügbar ist.

Returns:

Ein ServiceBusReceiverAsyncClient , der an die verfügbare Sitzung gebunden ist.

acceptSession

public Mono acceptSession(String sessionId)

Ruft eine Sitzungssperre für sessionId ab und erstellt eine ServiceBusReceiverAsyncClient , um Nachrichten von der Sitzung zu empfangen. Wenn die Sitzung bereits von einem anderen Client gesperrt ist, wird eine AmqpException ausgelöst.

Parameters:

sessionId - Die Sitzungs-ID.

Returns:

Ein ServiceBusReceiverAsyncClient , der an die angegebene Sitzung gebunden ist.

close

public void close()

Gilt für: