Freigeben über


Asynchrone Programmierung im Azure SDK für Java

In diesem Artikel wird das asynchrone Programmiermodell im Azure SDK für Java beschrieben.

Das Azure SDK enthielt anfänglich nur nicht blockierende, asynchrone APIs für die Interaktion mit Azure-Diensten. Mit diesen APIs können Sie das Azure SDK verwenden, um skalierbare Anwendungen zu erstellen, die Systemressourcen effizient verwenden. Das Azure SDK für Java enthält jedoch auch synchrone Clients, die auf ein breiteres Publikum ausgerichtet sind, und machen unsere Clientbibliotheken auch für Benutzer zugänglich, die mit der asynchronen Programmierung nicht vertraut sind. (Siehe "Ansatzfähig " in den Azure SDK-Entwurfsrichtlinien.) Daher bieten alle Java-Clientbibliotheken im Azure SDK für Java sowohl asynchrone als auch synchrone Clients an. Es wird jedoch empfohlen, die asynchronen Clients für Produktionssysteme zu verwenden, um die Nutzung von Systemressourcen zu maximieren.

Reaktive Datenströme

Wenn Sie sich den Abschnitt Async Service Clients in den Java Azure SDK Design-Richtlinien ansehen, werden Sie feststellen, dass unsere asynchronen APIs reaktive Typen verwenden, anstatt die von Java 8 bereitgestellten CompletableFuture. Warum haben wir reaktive Typen über Typen ausgewählt, die nativ in JDK verfügbar sind?

Java 8 hat Features wie Streams, Lambdas und CompletableFuture eingeführt. Diese Features bieten viele Funktionen, weisen jedoch einige Einschränkungen auf.

CompletableFuture bietet rückrufbasierte, nicht blockierende Funktionen, und die CompletionStage-Schnittstelle ermöglichte eine einfache Komposition einer Reihe asynchroner Vorgänge. Lambdas machen diese pushbasierten APIs besser lesbar. Datenströme stellen Vorgänge im Funktionalen Stil bereit, um eine Sammlung von Datenelementen zu verarbeiten. Datenströme sind jedoch synchron und können nicht wiederverwendet werden. CompletableFuture ermöglicht es Ihnen, eine einzelne Anforderung zu stellen, Bietet Unterstützung für einen Rückruf und erwartet eine einzelne Antwort. Viele Clouddienste erfordern jedoch die Möglichkeit, Daten zu streamen – z. B. Event Hubs.

Reaktive Datenströme können dabei helfen, diese Einschränkungen zu überwinden, indem Elemente von einer Quelle an einen Abonnenten gestreamt werden. Wenn ein Abonnent Daten aus einer Quelle anfordert, sendet die Quelle eine beliebige Anzahl von Ergebnissen zurück. Es muss sie nicht alle gleichzeitig senden. Die Übertragung erfolgt über einen bestimmten Zeitraum, wenn die Quelle Daten sendet.

In diesem Modell registriert der Abonnent Ereignishandler, um Daten bei deren Ankunft zu verarbeiten. Diese pushbasierten Interaktionen benachrichtigen den Abonnenten über unterschiedliche Signale:

  • Ein onSubscribe() Aufruf gibt an, dass die Datenübertragung beginnt.
  • Ein onError() Aufruf gibt an, dass ein Fehler aufgetreten ist, der auch das Ende der Datenübertragung kennzeichnet.
  • Ein onComplete() Aufruf zeigt den erfolgreichen Abschluss der Datenübertragung an.

Im Gegensatz zu Java Streams behandeln reaktive Datenströme Fehler als Erstklassenereignisse. Reaktive Datenströme verfügen über einen dedizierten Kanal für die Quelle, um Fehler an den Abonnenten zu kommunizieren. Außerdem ermöglichen reaktive Datenströme es dem Abonnenten, die Datenübertragungsrate auszuhandeln, um diese Datenströme in ein Push-Pull-Modell zu transformieren.

Die Spezifikation "Reaktive Datenströme " stellt einen Standard für die Übertragung von Daten bereit. Auf hoher Ebene definiert die Spezifikation die folgenden vier Schnittstellen und gibt Regeln für die Implementierung dieser Schnittstellen an.

  • Publisher ist die Quelle eines Datenstroms.
  • Abonnent ist der Verbraucher eines Datenstroms.
  • Das Abonnement verwaltet den Status der Datenübertragung zwischen einem Herausgeber und einem Abonnenten.
  • Processor ist sowohl ein Herausgeber als auch ein Abonnent.

Es gibt einige bekannte Java-Bibliotheken, die Implementierungen dieser Spezifikation bereitstellen, z. B. RxJava, Akka Streams, Vert.x und Project Reactor.

Das Azure SDK für Java hat Project Reactor übernommen, um seine asynchronen APIs anzubieten. Der wichtigste Faktor für diese Entscheidung war die reibungslose Integration mit Spring Webflux, die auch Project Reactor verwendet. Ein weiterer Faktor für die Auswahl von Project Reaktor über RxJava war, dass Project Reactor Java 8 verwendet, aber RxJava war damals noch bei Java 7. Project Reactor bietet auch eine vielzahl von Operatoren, die komponierbar sind und es Ihnen ermöglichen, deklarativen Code für die Erstellung von Datenverarbeitungspipelines zu schreiben. Eine weitere schöne Sache von Project Reactor ist, dass es Adapter für die Konvertierung von Project-Reaktortypen in andere beliebte Implementierungstypen hat.

Vergleichen von APIs synchroner und asynchroner Vorgänge

Wir haben die synchronen Clients und Optionen für asynchrone Clients erörtert. In der folgenden Tabelle wird zusammengefasst, wie APIs aussehen, die mithilfe dieser Optionen entworfen wurden:

API-Typ Kein Wert Einzelner Wert Mehrere Werte
Standard Java – synchrone APIs void T Iterable<T>
Standard Java – asynchrone APIs CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Reaktive Datenströme-Schnittstellen Publisher<Void> Publisher<T> Publisher<T>
Project Reactor-Implementierung von reaktiven Streams Mono<Void> Mono<T> Flux<T>

Aus Gründen der Vollständigkeit ist es erwähnenswert, dass Java 9 die Flow-Klasse eingeführt hat, die die vier reaktiven Datenstromschnittstellen enthält. Diese Klasse enthält jedoch keine Implementierung.

Verwenden asynchroner APIs im Azure SDK für Java

Die Spezifikation für reaktive Datenströme unterscheidet nicht zwischen Herausgebertypen. In der Spezifikation für reaktive Datenströme erzeugen Herausgeber einfach null oder mehr Datenelemente. In vielen Fällen gibt es einen nützlichen Unterschied zwischen einem Herausgeber, der höchstens ein Datenelement erzeugt, im Vergleich zu einem Element, das null oder mehr erzeugt. In cloudbasierten APIs gibt diese Unterscheidung an, ob eine Anforderung eine einzelwertige Antwort oder eine Auflistung zurückgibt. Project Reactor bietet zwei Typen, um diese Unterscheidung zu treffen - Mono und Flux. Eine API, die eine Mono Antwort zurückgibt, enthält eine Antwort, die höchstens einen Wert aufweist, und eine API, die eine Flux Antwort zurückgibt, die null oder mehr Werte enthält.

Angenommen, Sie verwenden einen ConfigurationAsyncClient , um eine konfiguration abzurufen, die mit dem Azure App-Konfigurationsdienst gespeichert ist. (Weitere Informationen finden Sie unter Was ist Die Azure-App-Konfiguration?)

Wenn Sie auf dem Client eine ConfigurationAsyncClient erstellen und getConfigurationSetting() aufrufen, wird ein Mono zurückgegeben, der angibt, dass die Antwort einen einzelnen Wert enthält. Das Aufrufen dieser Methode allein macht jedoch nichts. Der Client hat noch keine Anforderung an den Azure App-Konfigurationsdienst gestellt. In dieser Phase ist die von dieser API zurückgegebene Mono<ConfigurationSetting> nur eine "Zusammenstellung" der Datenverarbeitungspipeline. Dies bedeutet, dass die erforderliche Einrichtung für die Nutzung der Daten abgeschlossen ist. Um die Datenübertragung tatsächlich auszulösen (d. h., um die Anforderung an den Dienst zu senden und die Antwort zu erhalten), müssen Sie den zurückgegebenen Mono abonnieren. Wenn Sie also mit diesen reaktiven Datenströmen arbeiten, müssen Sie daran denken, subscribe() aufzurufen, da nichts passiert, bis Sie dies tun.

Das folgende Beispiel zeigt, wie Sie sich für Mono anmelden und den Konfigurationswert auf der Konsole ausgeben.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Beachten Sie, dass nach dem Aufrufen getConfigurationSetting() des Clients der Beispielcode das Ergebnis abonniert und drei separate Lambdas bereitstellt. Die erste Lambda-Funktion verwendet Daten, die vom Dienst empfangen werden, was bei erfolgreicher Antwort ausgelöst wird. Die zweite Lambda-Funktion wird ausgelöst, wenn beim Abrufen der Konfiguration ein Fehler auftritt. Die dritte Lambda-Funktion wird aufgerufen, wenn der Datenstrom abgeschlossen ist, was bedeutet, dass von diesem Datenstrom keine weiteren Datenelemente erwartet werden.

Hinweis

Wie bei allen asynchronen Programmierungen wird die Ausführung nach der Erstellung des Abonnements wie gewohnt fortgesetzt. Wenn es nichts gibt, das das Programm aktiv hält und ausführt, kann es beendet werden, bevor der asynchrone Vorgang abgeschlossen ist. Der Hauptthread, der subscribe() aufgerufen hat, wartet nicht, bis Sie den Netzwerkanruf an die Azure App-Konfiguration tätigen und eine Antwort erhalten. In Produktionssystemen können Sie möglicherweise weiter etwas anderes verarbeiten, aber in diesem Beispiel können Sie eine kleine Verzögerung hinzufügen, indem Sie Thread.sleep() aufrufen oder CountDownLatch verwenden, um dem asynchronen Vorgang eine Chance zu geben, abgeschlossen zu werden.

Wie im folgenden Beispiel gezeigt, folgen APIs, die ein Flux zurückgeben, einem ähnlichen Muster. Der Unterschied besteht darin, dass der erste an die subscribe() Methode bereitgestellte Rückruf mehrmals für jedes Datenelement in der Antwort aufgerufen wird. Der Fehler oder die Abschlussrückrufe werden genau einmal aufgerufen und als Terminalsignale betrachtet. Es werden keine anderen Rückrufe mehr aufgerufen, wenn eines dieser Signale vom Herausgeber empfangen wird.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Rückdruck

Was geschieht, wenn die Quelle die Daten schneller erzeugt, als der Abonnent verarbeiten kann? Der Abonnent kann durch die Datenflut überfordert werden, was zu Speicherfehlern führen kann. Der Abonnent benötigt eine Möglichkeit für Rückmeldungen an den Herausgeber, um die Übertragung zu verlangsamen, wenn er nicht Schritt halten kann. Wenn Sie subscribe() für einen Flux aufrufen, wie im obigen Beispiel gezeigt, fordert der Abonnent standardmäßig einen unbegrenzten Datenstrom an, was dem Herausgeber signalisiert, dass die Daten so schnell wie möglich gesendet werden sollen. Dieses Verhalten ist nicht immer wünschenswert, und der Abonnent muss möglicherweise die Veröffentlichungsrate durch "Gegensteuerung" kontrollieren. Backpressure ermöglicht es dem Abonnenten, die Steuerung des Flusses von Datenelementen zu übernehmen. Ein Abonnent fordert eine begrenzte Anzahl von Datenelementen an, die sie verarbeiten können. Nachdem der Abonnent die Verarbeitung dieser Elemente abgeschlossen hat, kann der Abonnent mehr anfordern. Mithilfe von Backpressure können Sie ein Pushmodell für die Datenübertragung in ein Push-Pull-Modell umwandeln.

Das folgende Beispiel zeigt, wie Sie die Rate steuern können, mit der Ereignisse vom Event Hubs-Consumer empfangen werden:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Wenn der Abonnent zuerst eine Verbindung mit dem Herausgeber herstellt, übergibt der Herausgeber eine Subscription Instanz, die den Status der Datenübertragung verwaltet. Dies Subscription ist das Medium, über das der Abonnent Backpressure anwenden kann, indem request() aufgerufen wird, um anzugeben, wie viele weitere Datenelemente der Abonnent verarbeiten kann.

Wenn der Abonnent jedes Mal, z. B., mehrere Datenelemente bei onNext(), request(10) anfordert, sendet der Publisher die nächsten 10 Elemente sofort, wenn sie verfügbar sind oder sobald sie verfügbar sind. Diese Elemente sammeln sich in einem Puffer auf der Seite des Abonnenten an, und da bei jedem onNext() Aufruf jeweils 10 weitere angefordert werden, wächst der Rückstand weiter, bis der Herausgeber keine weiteren Datenelemente zum Senden hat oder der Puffer des Abonnenten überläuft, was zu Speicherfehlern führt.

Kündigen eines Abonnements

Ein Abonnement verwaltet den Status der Datenübertragung zwischen einem Herausgeber und einem Abonnenten. Das Abonnement ist aktiv, bis der Herausgeber die Übertragung aller Daten an den Abonnenten abgeschlossen hat oder der Abonnent keine Daten mehr empfängt. Es gibt eine Reihe von Möglichkeiten, ein Abonnement wie unten dargestellt zu kündigen.

Im folgenden Beispiel wird das Abonnement durch Beseitigen des Abonnenten gekündigt:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

Im folgenden Beispiel wird das Abonnement durch das Aufrufen der cancel()-Methode für Subscription storniert:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Schlussfolgerung

Threads sind teure Ressourcen, die Sie nicht verschwenden sollten, um auf Antworten von Remotedienstaufrufen zu warten. Da sich die Akzeptanz von Microservices-Architekturen erhöht, wird die Notwendigkeit, Ressourcen effizient zu skalieren und zu nutzen, entscheidend. Asynchrone APIs sind günstig, wenn netzwerkgebundene Vorgänge vorhanden sind. Das Azure SDK für Java bietet eine vielzahl von APIs für asynchrone Vorgänge, um Ihre Systemressourcen zu maximieren. Wir empfehlen Ihnen dringend, unsere asynchronen Clients auszuprobieren.

Weitere Informationen zu den Betreibern, die Ihren speziellen Aufgaben am besten entsprechen, finden Sie im Referenzleitfaden für Reaktor 3unter Welchen Operator benötige ich?

Nächste Schritte

Nachdem Sie nun die verschiedenen Konzepte der asynchronen Programmierung besser verstehen, ist es wichtig zu erfahren, wie die Ergebnisse durchlaufen werden. Weitere Informationen zu den besten Iterationsstrategien und Details zur Funktionsweise der Paginierung finden Sie unter Paginierung und Iteration im Azure SDK für Java.