Freigeben über


ServiceBusReceiverAsyncClient Klasse

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient

Implementiert

public final class ServiceBusReceiverAsyncClient
implements AutoCloseable

Ein asynchroner Empfänger, der für den Empfang ServiceBusReceivedMessage von einer Azure Service Bus Warteschlange oder eines Themas/Abonnements verantwortlich ist.

Die in diesem Dokument gezeigten Beispiele verwenden ein Anmeldeinformationsobjekt namens DefaultAzureCredential für die Authentifizierung, das für die meisten Szenarien geeignet ist, einschließlich lokaler Entwicklungs- und Produktionsumgebungen. Darüber hinaus wird empfohlen, die verwaltete Identität für die Authentifizierung in Produktionsumgebungen zu verwenden. Weitere Informationen zu verschiedenen Authentifizierungsmethoden und den entsprechenden Anmeldeinformationstypen finden Sie in der Azure Identity-Dokumentation.

Beispiel: Erstellen eines ServiceBusReceiverAsyncClient

Im folgenden Codebeispiel wird die Erstellung des asynchronen Clients ServiceBusReceiverAsyncClientveranschaulicht. Der fullyQualifiedNamespace ist der Hostname des Service Bus-Namespaces. Sie wird im Bereich "Essentials" aufgeführt, nachdem Sie über das Azure-Portal zum Event Hubs-Namespace navigieren. Die verwendeten Anmeldeinformationen basieren DefaultAzureCredential darauf, dass sie häufig verwendete Anmeldeinformationen in Bereitstellung und Entwicklung kombiniert und die zu verwendenden Anmeldeinformationen basierend auf der ausgeführten Umgebung auswählen. PEEK_LOCK (der Standard-Empfangsmodus) und disableAutoComplete() werden dringend empfohlen, damit Benutzer die Kontrolle über die Nachrichtenabwicklung haben.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .receiver()
     .disableAutoComplete()
     .queueName(queueName)
     .buildAsyncClient();

 // When users are done with the receiver, dispose of the receiver.
 // Clients should be long-lived objects as they require resources
 // and time to establish a connection to the service.
 asyncReceiver.close();

Beispiel: Empfangen aller Nachrichten von der Service Bus-Ressource

Dadurch wird ein unendlicher Nachrichtenstrom von Service Bus zurückgegeben. Der Stream endet, wenn das Abonnement oder andere Terminalszenarien verworfen wird. Weitere Informationen finden Sie unter receiveMessages().

// Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 // Consider using Flux.usingWhen to scope the creation, usage, and cleanup of the receiver.
 Disposable subscription = asyncReceiver.receiveMessages()
     .flatMap(message -> {
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());

         // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return asyncReceiver.complete(message);
         } else {
             return asyncReceiver.abandon(message);
         }
     })
     .subscribe(unused -> {
     }, error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

 // When program ends, or you're done receiving all messages, dispose of the receiver.
 // Clients should be long-lived objects as they
 // require resources and time to establish a connection to the service.
 asyncReceiver.close();

Beispiel: Empfangen von Nachrichten im RECEIVE_AND_DELETE Modus von einer Service Bus-Entität

Das folgende Codebeispiel veranschaulicht die Erstellung des asynchronen Clients ServiceBusReceiverAsyncClient mithilfe RECEIVE_AND_DELETEvon . Der fullyQualifiedNamespace ist der Hostname des Service Bus-Namespaces. Sie wird im Bereich "Essentials" aufgeführt, nachdem Sie über das Azure-Portal zum Event Hubs-Namespace navigieren. Die verwendeten Anmeldeinformationen basieren DefaultAzureCredential darauf, dass sie häufig verwendete Anmeldeinformationen in Bereitstellung und Entwicklung kombiniert und die zu verwendenden Anmeldeinformationen basierend auf der ausgeführten Umgebung auswählen. Weitere Informationen zum Empfangen von Nachrichten in diesem Modus finden Sie RECEIVE_AND_DELETE in der Dokumentation.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 Disposable subscription = Flux.usingWhen(
         Mono.fromCallable(() -> {
             // Setting the receiveMode when creating the receiver enables receive and delete mode. By default,
             // peek lock mode is used. In peek lock mode, users are responsible for settling messages.
             return new ServiceBusClientBuilder()
                 .credential(fullyQualifiedNamespace, credential)
                 .receiver()
                 .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                 .queueName(queueName)
                 .buildAsyncClient();
         }), receiver -> {
             return receiver.receiveMessages();
         }, receiver -> {
             return Mono.fromRunnable(() -> receiver.close());
         })
     .subscribe(message -> {
             // Messages received in RECEIVE_AND_DELETE mode do not have to be settled because they are automatically
             // removed from the queue.
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());
     },
         error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

Beispiel: Empfangen von Nachrichten aus einer bestimmten Sitzung

Um Nachrichten aus einer bestimmten Sitzung abzurufen, wechseln Sie zum ServiceBusSessionReceiverClientBuilder Sitzungsempfängerclient, und erstellen Sie diesen. Verwenden Sie acceptSession(String sessionId) zum Erstellen einer sitzungsgebundenen ServiceBusReceiverAsyncClient. Im Beispiel wird davon ausgegangen, dass Service Bus-Sitzungen zum Zeitpunkt der Warteschlangenerstellung aktiviert wurden.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

Beispiel: Empfangen von Nachrichten aus der ersten verfügbaren Sitzung

Um Nachrichten aus der ersten verfügbaren Sitzung zu verarbeiten, wechseln Sie zum ServiceBusSessionReceiverClientBuilder Sitzungsempfängerclient, und erstellen Sie diesen. Verwenden Sie acceptNextSession() , um die erste verfügbare Sitzung zum Verarbeiten von Nachrichten zu finden.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

Beispiel: Ratenbegrenzung des Verbrauchs von Nachrichten aus einer Service Bus-Entität

Für Nachrichtenempfänger, die die Anzahl der nachrichten einschränken müssen, die sie zu einem bestimmten Zeitpunkt empfangen, können sie verwenden BaseSubscriber#request(long).

// This is a non-blocking call. The program will move to the next line of code after setting up the operation.
 asyncReceiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() {
     private static final int NUMBER_OF_MESSAGES = 5;
     private final AtomicInteger currentNumberOfMessages = new AtomicInteger();

     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         // Tell the Publisher we only want 5 message at a time.
         request(NUMBER_OF_MESSAGES);
     }

     @Override
     protected void hookOnNext(ServiceBusReceivedMessage message) {
         // Process the ServiceBusReceivedMessage
         // If the number of messages we have currently received is a multiple of 5, that means we have reached
         // the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
         // that the subscriber is ready to get more messages from upstream.
         if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
             request(NUMBER_OF_MESSAGES);
         }
     }
 });

Methodenzusammenfassung

Modifizierer und Typ Methode und Beschreibung
Mono<Void> abandon(ServiceBusReceivedMessage message)

Gibt ein ServiceBusReceivedMessageauf.

Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions options)

Gibt auf, dass die ServiceBusReceivedMessage Eigenschaften der Nachricht aktualisiert werden.

void close()

Entsorgt den Consumer, indem die zugrunde liegenden Links zum Dienst geschlossen werden.

Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext)

Committent die Transaktion und alle ihr zugeordneten Vorgänge.

Mono<Void> complete(ServiceBusReceivedMessage message)

Schließt eine ab ServiceBusReceivedMessage.

Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions options)

Schließt eine ServiceBusReceivedMessage mit den angegebenen Optionen ab.

Mono<ServiceBusTransactionContext> createTransaction()

Startet eine neue dienstseitige Transaktion.

Mono<Void> deadLetter(ServiceBusReceivedMessage message)

Verschiebt eine ServiceBusReceivedMessage in die Unterwarteschlange für unzustellbare Nachrichten.

Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

Verschiebt eine ServiceBusReceivedMessage mit den angegebenen Optionen in die Unterwarteschlange für unzustellbare Nachrichten.

Mono<Void> defer(ServiceBusReceivedMessage message)

Verschiebt einen ServiceBusReceivedMessagezurück.

Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options)

Verschiebt eine ServiceBusReceivedMessage mit den festgelegten Optionen.

String getEntityPath()

Ruft die Service Bus-Ressource ab, mit der dieser Client interagiert.

String getFullyQualifiedNamespace()

Ruft den vollqualifizierten Service Bus-Namespace ab, dem die Verbindung zugeordnet ist.

String getIdentifier()

Ruft den Bezeichner des instance von abServiceBusReceiverAsyncClient.

String getSessionId()

Ruft die Sitzungs-IDder Sitzung ab, wenn dieser Empfänger ein Sitzungsempfänger ist.

Mono<byte[]> getSessionState()

Ruft den Zustand der Sitzung ab, wenn dieser Empfänger ein Sitzungsempfänger ist.

Mono<ServiceBusReceivedMessage> peekMessage()

Liest die nächste aktive Nachricht, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern.

Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber)

Ab der angegebenen Sequenznummer liest neben der aktiven Nachricht, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern.

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages)

Liest den nächsten Batch mit aktiven Nachrichten, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern.

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber)

Liest ab der angegebenen Sequenznummer den nächsten Batch aktiver Nachrichten, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern.

Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber)

Empfängt eine verzögerte ServiceBusReceivedMessage.

Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers)

Empfängt einen Batch mit verzögertem ServiceBusReceivedMessage.

Flux<ServiceBusReceivedMessage> receiveMessages()

Empfängt einen unendlichen Stream von ServiceBusReceivedMessage von der Service Bus-Entität.

Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)

Erneuert asynchron die Sperre für die Nachricht.

Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

Startet die verlängerung der automatischen Sperre für ein ServiceBusReceivedMessage.

Mono<OffsetDateTime> renewSessionLock()

Verlängert die Sitzungssperre, wenn dieser Empfänger ein Sitzungsempfänger ist.

Mono<Void> renewSessionLock(Duration maxLockRenewalDuration)

Startet die Verlängerung der automatischen Sperre für die Sitzung, für die dieser Empfänger arbeitet.

Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext)

Rollbacks für die angegebene Transaktion und alle damit verbundenen Vorgänge.

Mono<Void> setSessionState(byte[] sessionState)

Legt den Zustand der Sitzung fest, für die dieser Empfänger arbeitet.

Geerbte Methoden von java.lang.Object

Details zur Methode

abandon

public Mono abandon(ServiceBusReceivedMessage message)

Gibt ein ServiceBusReceivedMessageauf. Dadurch wird die Nachricht wieder für die Verarbeitung verfügbar. Wenn Sie eine Nachricht abbrechen, wird die Anzahl der Zustellungen für die Nachricht erhöht.

Parameters:

message - Der ServiceBusReceivedMessage , um diesen Vorgang auszuführen.

Returns:

Eine Mono , die abgeschlossen wird, wenn der Service Bus-Vorgang beendet wird.

abandon

public Mono abandon(ServiceBusReceivedMessage message, AbandonOptions options)

Beendet eine ServiceBusReceivedMessage Aktualisierung der Nachrichteneigenschaften. Dadurch wird die Nachricht wieder für die Verarbeitung verfügbar. Wenn Sie eine Nachricht abbrechen, wird die Anzahl der Zustellungen für die Nachricht erhöht.

Parameters:

message - Der ServiceBusReceivedMessage , der diesen Vorgang ausführen soll.
options - Die Optionen, die beim Verlassen der Nachricht festgelegt werden sollen.

Returns:

Ein Mono , der nach Abschluss des Service Bus-Vorgangs abgeschlossen wird.

close

public void close()

Entsorgt den Consumer, indem die zugrunde liegenden Links zum Dienst geschlossen werden.

commitTransaction

public Mono commitTransaction(ServiceBusTransactionContext transactionContext)

Committent die Transaktion und alle ihr zugeordneten Vorgänge.

Erstellen und Verwenden einer Transaktion

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - Die Transaktion, die committet werden soll.

Returns:

Der Mono , der diesen Vorgang für die Service Bus-Ressource beendet.

complete

public Mono complete(ServiceBusReceivedMessage message)

Schließt eine ab ServiceBusReceivedMessage. Dadurch wird die Nachricht aus dem Dienst gelöscht.

Parameters:

message - Der ServiceBusReceivedMessage , der diesen Vorgang ausführen soll.

Returns:

Ein Mono , der beendet wird, wenn die Nachricht in Service Bus abgeschlossen ist.

complete

public Mono complete(ServiceBusReceivedMessage message, CompleteOptions options)

Schließt eine ServiceBusReceivedMessage mit den angegebenen Optionen ab. Dadurch wird die Nachricht aus dem Dienst gelöscht.

Parameters:

message - Der ServiceBusReceivedMessage , der diesen Vorgang ausführen soll.
options - Optionen, die zum Abschließen der Nachricht verwendet werden.

Returns:

Ein Mono , der beendet wird, wenn die Nachricht in Service Bus abgeschlossen ist.

createTransaction

public Mono createTransaction()

Startet eine neue dienstseitige Transaktion. Sollte ServiceBusTransactionContext an alle Vorgänge übergeben werden, die in dieser Transaktion sein müssen.

Erstellen und Verwenden einer Transaktion

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Returns:

Der Mono , der diesen Vorgang für die Service Bus-Ressource beendet.

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message)

Verschiebt eine ServiceBusReceivedMessage in die Unterwarteschlange für unzustellbare Nachrichten.

Parameters:

message - Der ServiceBusReceivedMessage , der diesen Vorgang ausführen soll.

Returns:

Ein Mono , der abgeschlossen wird, wenn der Vorgang für unzustellbare Nachrichten abgeschlossen ist.

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

Verschiebt eine ServiceBusReceivedMessage mit den angegebenen Optionen in die Unterwarteschlange für unzustellbare Nachrichten.

Parameters:

message - Der ServiceBusReceivedMessage , der diesen Vorgang ausführen soll.
options - Optionen, die verwendet werden, um die Nachricht unzustellbar zu machen.

Returns:

Ein Mono , der abgeschlossen wird, wenn der Vorgang für unzustellbare Nachrichten abgeschlossen ist.

defer

public Mono defer(ServiceBusReceivedMessage message)

Verschiebt einen ServiceBusReceivedMessagezurück. Dadurch wird die Nachricht in die verzögerte Untergeordnete Warteschlange verschoben.

Parameters:

message - Der ServiceBusReceivedMessage , der diesen Vorgang ausführen soll.

Returns:

Ein Mono , der abgeschlossen wird, wenn der Service Bus-Zurückstellungsvorgang abgeschlossen ist.

defer

public Mono defer(ServiceBusReceivedMessage message, DeferOptions options)

Verschiebt eine ServiceBusReceivedMessage mit den festgelegten Optionen. Dadurch wird die Nachricht in die verzögerte Untergeordnete Warteschlange verschoben.

Parameters:

message - Der ServiceBusReceivedMessage , der diesen Vorgang ausführen soll.
options - Optionen, die zum Zurückstellen der Nachricht verwendet werden.

Returns:

Ein Mono , der nach Abschluss des Zurückstellungsvorgangs abgeschlossen wird.

getEntityPath

public String getEntityPath()

Ruft die Service Bus-Ressource ab, mit der dieser Client interagiert.

Returns:

Die Service Bus-Ressource, mit der dieser Client interagiert.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Ruft den vollqualifizierten Service Bus-Namespace ab, dem die Verbindung zugeordnet ist. Dies ist wahrscheinlich ähnlich wie .{yournamespace}.servicebus.windows.net

Returns:

Der vollqualifizierte Service Bus-Namespace, dem die Verbindung zugeordnet ist.

getIdentifier

public String getIdentifier()

Ruft den Bezeichner des instance von abServiceBusReceiverAsyncClient.

Returns:

Der Bezeichner, der die instance von ServiceBusReceiverAsyncClientidentifizieren kann.

getSessionId

public String getSessionId()

Ruft die SessionId der Sitzung ab, wenn dieser Empfänger ein Sitzungsempfänger ist.

Returns:

Die SessionId oder NULL, wenn dies kein Sitzungsempfänger ist.

getSessionState

public Mono getSessionState()

Ruft den Zustand der Sitzung ab, wenn dieser Empfänger ein Sitzungsempfänger ist.

Returns:

Der Sitzungszustand oder ein leeres Mono, wenn kein Zustand für die Sitzung festgelegt ist.

peekMessage

public Mono peekMessage()

Liest die nächste aktive Nachricht, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern. Der erste Aufruf von ruft peek() die erste aktive Nachricht für diesen Empfänger ab. Jeder nachfolgende Aufruf ruft die nachfolgende Nachricht in der Entität ab.

Returns:

Ein eingesehenes ServiceBusReceivedMessage.

peekMessage

public Mono peekMessage(long sequenceNumber)

Ab der angegebenen Sequenznummer liest neben der aktiven Nachricht, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern.

Parameters:

sequenceNumber - Die Sequenznummer, von der die Nachricht gelesen werden soll.

Returns:

Ein eingesehenes ServiceBusReceivedMessage.

peekMessages

public Flux peekMessages(int maxMessages)

Liest den nächsten Batch mit aktiven Nachrichten, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern.

Parameters:

maxMessages - Die Anzahl der Meldungen.

Returns:

Eine Flux von ServiceBusReceivedMessage , die eingesehen wird.

peekMessages

public Flux peekMessages(int maxMessages, long sequenceNumber)

Liest ab der angegebenen Sequenznummer den nächsten Batch aktiver Nachrichten, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern.

Parameters:

maxMessages - Die Anzahl der Meldungen.
sequenceNumber - Die Sequenznummer, ab der mit dem Lesen von Nachrichten begonnen werden soll.

Returns:

Ein Flux von ServiceBusReceivedMessage eingesehen.

receiveDeferredMessage

public Mono receiveDeferredMessage(long sequenceNumber)

Empfängt eine verzögerte ServiceBusReceivedMessage. Verzögerte Nachrichten können nur mithilfe der Sequenznummer empfangen werden.

Parameters:

sequenceNumber - Die getSequenceNumber() der Meldung.

Returns:

Eine verzögerte Nachricht mit dem übereinstimmenden sequenceNumber.

receiveDeferredMessages

public Flux receiveDeferredMessages(Iterable sequenceNumbers)

Empfängt einen Batch mit verzögertem ServiceBusReceivedMessage. Verzögerte Nachrichten können nur mithilfe der Sequenznummer empfangen werden.

Parameters:

sequenceNumbers - Die Sequenznummern der verzögerten Nachrichten.

Returns:

Ein Flux von verzögert ServiceBusReceivedMessage.

receiveMessages

public Flux receiveMessages()

Empfängt einen unendlichen Stream von ServiceBusReceivedMessage von der Service Bus-Entität. Dieser Flux empfängt kontinuierlich Nachrichten von einer Service Bus-Entität, bis:

  • Der Empfänger ist geschlossen.
  • Das Abonnement für flux wird verworfen.
  • Ein Terminalsignal von einem Downstreamabonnent wird Upstream (dh. Flux#take(long) oder Flux#take(Duration)).
  • Ein AmqpException tritt auf, der dazu führt, dass der Empfangslink beendet wird.

Der Client verwendet einen AMQP-Link darunter, um die Nachrichten zu empfangen. Der Client wechselt transparent zu einem neuen AMQP-Link, wenn für den aktuellen ein wiederholbarer Fehler auftritt. Wenn auf dem Client ein nicht wiederholbarer Fehler auftritt oder die Wiederholungsversuche erschöpft sind, wird der Terminalhandler des org.reactivestreams.Subscriber#onError(Throwable) Abonnenten mit diesem Fehler benachrichtigt. Nach dem Terminalereignis werden keine weiteren Nachrichten übermittelt org.reactivestreams.Subscriber#onNext(Object) . Die Anwendung muss einen neuen Client erstellen, um den Empfang fortzusetzen. Das erneute Abonnieren des Flux des alten Clients hat keine Auswirkungen.

Hinweis: Einige Beispiele für nicht wiederholbare Fehler sind: Die Anwendung versucht, eine Verbindung mit einer Warteschlange herzustellen, die nicht vorhanden ist, das Löschen oder Deaktivieren der Warteschlange in der Mitte des Empfangs, der Benutzer initiiert explizit geo-DR. Dies sind bestimmte Ereignisse, bei denen Service Bus dem Client mitgeteilt, dass ein nicht wiederholbarer Fehler aufgetreten ist.

Returns:

Ein unendlicher Nachrichtenstrom von der Service Bus-Entität.

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message)

Erneuert asynchron die Sperre für die Nachricht. Die Sperre wird basierend auf der für die Entität angegebenen Einstellung verlängert. Wenn eine Nachricht im PEEK_LOCK Modus empfangen wird, wird die Nachricht auf dem Server für diesen Empfänger instance für einen Zeitraum gesperrt, der während der Entitätserstellung angegeben wurde (LockDuration). Wenn die Verarbeitung der Nachricht länger als diese Dauer erfordert, muss die Sperre verlängert werden. Bei jeder Verlängerung wird die Sperre auf den LockDuration-Wert der Entität zurückgesetzt.

Parameters:

message - Die ServiceBusReceivedMessage zum Ausführen der automatischen Sperrverlängerung.

Returns:

Die neue Ablaufzeit für die Nachricht.

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

Startet die verlängerung der automatischen Sperre für ein ServiceBusReceivedMessage.

Parameters:

message - Der ServiceBusReceivedMessage , der diesen Vorgang ausführen soll.
maxLockRenewalDuration - Maximale Dauer für die weitere Verlängerung des Sperrtokens.

Returns:

Ein Mono-Typ, der abgeschlossen wird, wenn der Nachrichtenerneuerungsvorgang bis maxLockRenewalDurationabgeschlossen ist.

renewSessionLock

public Mono renewSessionLock()

Verlängert die Sitzungssperre, wenn dieser Empfänger ein Sitzungsempfänger ist.

Returns:

Die nächste Ablaufzeit für die Sitzungssperre.

renewSessionLock

public Mono renewSessionLock(Duration maxLockRenewalDuration)

Startet die Verlängerung der automatischen Sperre für die Sitzung, für die dieser Empfänger arbeitet.

Parameters:

maxLockRenewalDuration - Maximale Dauer, um die Sitzungssperre weiterhin zu verlängern.

Returns:

Ein Sperrverlängerungsvorgang für die Nachricht.

rollbackTransaction

public Mono rollbackTransaction(ServiceBusTransactionContext transactionContext)

Rollbacks für die angegebene Transaktion und alle damit verbundenen Vorgänge.

Erstellen und Verwenden einer Transaktion

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - Die Transaktion, die rollbacken soll.

Returns:

Der Mono , der diesen Vorgang für die Service Bus-Ressource beendet.

setSessionState

public Mono setSessionState(byte[] sessionState)

Legt den Zustand der Sitzung fest, für die dieser Empfänger arbeitet.

Parameters:

sessionState - Status, der für die Sitzung festgelegt werden soll.

Returns:

Ein Mono-Wert, der abgeschlossen wird, wenn die Sitzung festgelegt ist

Gilt für: