Megosztás a következőn keresztül:


Aszinkron programozás a Javához készült Azure SDK-ban

Ez a cikk a Javához készült Azure SDK aszinkron programozási modelljét ismerteti.

Az Azure SDK kezdetben csak nem blokkoló, aszinkron API-kat tartalmazott az Azure-szolgáltatásokkal való interakcióhoz. Ezek az API-k lehetővé teszik az Azure SDK használatát a rendszererőforrásokat hatékonyan használó skálázható alkalmazások létrehozásához. A Java-hoz készült Azure SDK azonban szinkron ügyfeleket is tartalmaz a szélesebb közönség kiszolgálásához, és az ügyfélkódtárakat olyan felhasználók számára is elérhetővé teszi, akik nem ismerik az aszinkron programozást. (Lásd: Megközelítés az Azure SDK tervezési irányelveiben.) Így az Azure SDK for Java összes Java-ügyfélkódtára aszinkron és szinkron ügyfeleket is kínál. Javasoljuk az aszinkron kliensek használatát éles rendszerekhez a rendszererőforrások maximális kihasználása érdekében.

Reaktív streamek

Ha a Java Azure SDK tervezési útmutatójában az Async Service Clients szakaszt tekinti át, láthatja, hogy a Java 8 által biztosítottak CompletableFuture helyett az aszinkron API-k reaktív típusokat használnak. Miért választottunk reaktív típusokat a JDK-ban natívan elérhető típusok közül?

A Java 8 olyan funkciókat vezetett be, mint a Streams, a Lambdas és az CompletableFuture. Ezek a funkciók számos képességet biztosítanak, de bizonyos korlátozásokkal rendelkeznek.

CompletableFuture visszahívás-alapú, nem blokkoló képességeket biztosít, valamint az CompletionStage felület egyszerűvé tette aszinkron műveletek sorozatának összeállítását. A Lambdas olvashatóbbá teszi ezeket a leküldéses alapú API-kat. A streamek funkcionális stílusú műveleteket biztosítanak az adatelemek gyűjteményének kezeléséhez. A streamek azonban szinkronok, és nem használhatók újra. CompletableFuture lehetővé teszi, hogy egyetlen kérést küldjön, támogatást nyújt a visszahíváshoz, és egyetlen választ vár. Számos felhőszolgáltatás azonban megköveteli az adatok streamelésének lehetőségét – például az Event Hubsot.

A reaktív streamek segíthetnek leküzdeni ezeket a korlátozásokat azáltal, hogy az elemeket egy forrásból egy előfizetőbe streamelik. Amikor egy előfizető adatokat kér egy forrástól, a forrás bármilyen számú eredményt küld vissza. Nem kell egyszerre elküldenie őket. Az átvitel egy adott időszakban történik, amikor a forrásnak van elküldendő adata.

Ebben a modellben az előfizető eseménykezelőket regisztrál az adatok beérkezésekor történő feldolgozásához. Ezek a leküldéses interakciók különböző jeleken keresztül értesítik az előfizetőt:

  • A onSubscribe() hívás azt jelzi, hogy az adatátvitel megkezdődött.
  • A onError() hívás azt jelzi, hogy hiba történt, ami az adatátvitel végét is jelzi.
  • A onComplete() hívás az adatátvitel sikeres befejezését jelzi.

A Java Streamekkel ellentétben a reaktív streamek első osztályú eseményként kezelik a hibákat. A reaktív streamek dedikált csatornával rendelkeznek a forrás számára, a hibáknak az előfizetővel való közléséhez. A reaktív adatfolyamok lehetővé teszik, hogy az előfizető tárgyalhassa az adatátviteli sebességet, és ezáltal a streameket egy push-pull modellé alakítsa át.

A Reaktív streamek specifikációja szabványt biztosít az adatok átvitelének módjához. Magas szinten a specifikáció a következő négy interfészt határozza meg, és meghatározza az interfészek implementálásának szabályait.

  • A Publisher egy adatfolyam forrása.
  • Az előfizető egy adatfolyam fogyasztója.
  • Az előfizetés kezeli a közzétevő és az előfizető közötti adatátvitel állapotát.
  • A processzor közzétevőként és előfizetőként egyaránt működik.

Vannak jól ismert Java-kódtárak, amelyek a specifikáció implementációit biztosítják, például az RxJava, az Akka Streams, a Vert.x és a Project Reactor.

A Java-hoz készült Azure SDK elfogadta a Project Reactort, hogy az aszinkron API-kat kínáljon. Ennek a döntésnek a fő mozgatórugója az volt, hogy zökkenőmentes integrációt biztosítson a Project Reactort is használó Spring Webfluxszal. A Project Reactor RxJava-n keresztül történő kiválasztásához az is hozzájárult, hogy a Project Reactor Java 8-at használ, de az RxJava akkor még a Java 7-nél volt. A Project Reactor számos olyan operátort is kínál, amelyek összeállíthatók, és lehetővé teszik deklaratív kód írását adatfeldolgozási folyamatok létrehozásához. A Project Reactor egy másik szép dolog, hogy adapterekkel rendelkezik a Project Reactor-típusok más népszerű implementálási típusokká való átalakításához.

Szinkron és aszinkron műveletek API-jainak összehasonlítása

Megbeszéltük a szinkron ügyfeleket és az aszinkron ügyfelek lehetőségeit. Az alábbi táblázat összefoglalja, hogyan néznek ki az API-k az alábbi lehetőségek használatával:

API-típus Nincs érték Egyetlen érték Több érték
Standard Java – Szinkron API-k void T Iterable<T>
Standard Java – Aszinkron API-k CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Reaktív streamek felületei Publisher<Void> Publisher<T> Publisher<T>
Reaktív streamek projektreaktoros implementálása Mono<Void> Mono<T> Flux<T>

A teljesség kedvéért érdemes megemlíteni, hogy a Java 9 bevezette a Flow osztályt, amely tartalmazza a négy reaktív stream interfészt. Ez az osztály azonban nem tartalmaz implementációt.

Aszinkron API-k használata a Javához készült Azure SDK-ban

A reaktív streamek specifikációja nem tesz különbséget a közzétevők típusai között. A reaktív streamek specifikációjában a közzétevők egyszerűen nulla vagy több adatelemet állítanak elő. Sok esetben hasznos különbséget tenni kétféle közzétevő között: az egyik, amely legfeljebb egy adatelemet állít elő, és a másik, amely nullát vagy többet. A felhőalapú API-kban ez a különbség azt jelzi, hogy egy kérés egyetlen értékű választ vagy gyűjteményt ad vissza. A Project Reactor kétféleképpen teszi ezt a különbséget : Mono és Flux. Az az API, amely egy Mono választ ad vissza, legfeljebb egy értékkel rendelkezik, míg az az API, amely egy Flux választ ad vissza, nulla vagy több értéket tartalmaz.

Tegyük fel például, hogy egy ConfigurationAsyncClient használatával kéri le az Azure App Configuration service-ben tárolt konfigurációt. (További információ: Mi az Azure App Configuration?.)

Ha létrehoz egy ConfigurationAsyncClient az ügyfélen, és meghívja a getConfigurationSetting(), az egy Mono-et ad vissza, amely azt jelzi, hogy a válasz egyetlen értéket tartalmaz. A módszer önmagában való meghívása azonban nem tesz semmit. Az ügyfél még nem küldött kérelmet az Azure App Configuration szolgáltatáshoz. Ebben a szakaszban az Mono<ConfigurationSetting> API által visszaadott adatok csak az adatfeldolgozási folyamat "szerelvényei". Ez azt jelenti, hogy az adatok felhasználásához szükséges beállítás befejeződött. Az adatátvitel tényleges aktiválásához (azaz hogy a kérés eljusson a szolgáltatáshoz és megkapja a választ), elő kell fizetnie a visszaadott Mono-re. Tehát, amikor ezekkel a reaktív streamekkel foglalkozik, emlékeznie kell arra, hogy hívja meg subscribe(), mert semmi sem történik, amíg meg nem teszi.

Az alábbi példa bemutatja, hogyan iratkozhat fel a Mono konzolra, és hogyan nyomtathatja ki a konfigurációs értéket.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Figyelje meg, hogy az ügyfél meghívása getConfigurationSetting() után a példakód feliratkozik az eredményre, és három különálló lambdát biztosít. Az első lambda a szolgáltatástól kapott adatokat használja fel, amelyek sikeres válasz esetén aktiválódnak. A második lambda akkor aktiválódik, ha hiba történik a konfiguráció beolvasása közben. A harmadik lambda az adatfolyam befejezésekor lesz meghívva, ami azt jelenti, hogy nem várható több adatelem ebből a streamből.

Megjegyzés:

Az összes aszinkron programozáshoz hasonlóan az előfizetés létrehozása után a végrehajtás a szokásos módon folytatódik. Ha nincs semmi, ami aktívan tartaná és működtetné a programot, az aszinkron művelet befejezése előtt leállhat. Az a fő szál, amely a subscribe()-t hívta, nem fog várni, amíg Ön hálózati hívást kezdeményez az Azure App Configuration felé, és választ kap. Az éles rendszerekben előfordulhat, hogy valami mást is feldolgoz, de ebben a példában kis késést adhat hozzá, ha meghívja a Thread.sleep() függvényt, vagy használ egy CountDownLatch-et, hogy lehetőséget adjon az aszinkron művelet befejeződésére.

Az alábbi példában látható módon az olyan API-k, amelyek egy Flux típusú értéket adnak vissza, szintén hasonló mintát követnek. A különbség az, hogy a metódushoz subscribe() megadott első visszahívást a rendszer többször hívja meg a válasz minden adateleméhez. A hiba vagy a befejezési visszahívások pontosan egyszer lesznek meghívva, és termináljeleknek minősülnek. A rendszer nem hív meg más visszahívásokat, ha ezek közül bármelyik jel a közzétevőtől érkezik.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Visszanyomás

Mi történik, ha a forrás gyorsabban készíti el az adatokat, mint amennyit az előfizető képes kezelni? Az előfizetőt túlterhelhetik az adatok, ami memóriahibákhoz vezethet. Az előfizetőnek vissza kell kommunikálnia a közzétevővel, hogy lassítson, ha nem tud lépést tartani. Alapértelmezés szerint, amikor a fenti példában látható módon meghívják a subscribe()Flux, az előfizető korlátlan adatfolyamot kér, jelezve a közzétevőnek, hogy minél gyorsabban küldje el az adatokat. Ez a viselkedés nem mindig kívánatos, és előfordulhat, hogy az előfizetőnek szabályoznia kell a közzététel sebességét a "backpressure" használatával. A backpressure lehetővé teszi az előfizető számára az adatelemek áramlásának szabályozását. Az előfizetők korlátozott számú adatelemet igényelnek, amelyeket kezelni tudnak. Miután az előfizető befejezte ezeknek az elemeknek a feldolgozását, az előfizető további kéréseket kérhet. A backpressure használatával a leküldéses adatátviteli modellt leküldéses-lekéréses modellté alakíthatja.

Az alábbi példa bemutatja, hogyan szabályozhatja, hogy az Eseményközpontok fogyasztója milyen sebességgel fogad eseményeket:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Amikor az előfizető először "csatlakozik" a közzétevőhöz, a közzétevő átad egy példányt Subscription az előfizetőnek, amely kezeli az adatátvitel állapotát. Ez Subscription az a közeg, amelyen keresztül az előfizető visszanyomást alkalmazhat, ha meghívja request() , hogy adja meg, hány további adatelemet képes kezelni.

Ha az előfizető minden híváskor onNext()több adatelemet kér, request(10) például a közzétevő azonnal elküldi a következő 10 elemet, ha elérhetők, vagy amikor elérhetővé válnak. Ezek az elemek az előfizető végén egy pufferben halmozódnak fel, és mivel minden onNext() hívás további 10-et kér, a lemaradás mindaddig növekszik, amíg a közzétevőnek nincs több adateleme küldeni, vagyis túlcsordul az előfizető pufferje, ami memóriatúlcsordulási hibákat okoz.

Előfizetés lemondása

Az előfizetés kezeli a közzétevő és az előfizető közötti adatátvitel állapotát. Az előfizetés addig aktív, amíg a közzétevő nem végzi el az összes adat átvitelét az előfizetőnek, vagy az előfizető már nem szeretne adatokat kapni. Az alábbiakban látható módon lemondhatja az előfizetést.

Az alábbi példa megszakítja az előfizetést az előfizető kizárásával:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

A következő példa az cancel() metódus meghívásával megszakítja az Subscription előfizetést.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Következtetés

A szálak költséges erőforrások, amelyeket nem szabad pazarolni a távoli szolgáltatáshívások válaszaira való várakozásra. A mikroszolgáltatási architektúrák bevezetésének növekedésével az erőforrások hatékony skálázásának és használatának szükségessége létfontosságúvá válik. Az aszinkron API-k akkor előnyösek, ha hálózathoz kötött műveletek vannak. Az Azure SDK for Java számos API-t kínál az aszinkron műveletekhez a rendszererőforrások maximalizálása érdekében. Javasoljuk, hogy próbálja ki az aszinkron ügyfeleinket.

Az adott feladatoknak leginkább megfelelő operátorokról további információt a Reactor 3 referencia-útmutatójábantalál.

Következő lépések

Most, hogy jobban megismerte a különböző aszinkron programozási fogalmakat, fontos, hogy megtanulja, hogyan lehet iterálni az eredményeket. A legjobb iterációs stratégiákról és a lapozás működésének részleteiről további információt a Java Azure SDK-ban található Pagination és iteráció című témakörben talál.