Sdílet prostřednictvím


Asynchronní programování v sadě Azure SDK pro Javu

Tento článek popisuje asynchronní programovací model v sadě Azure SDK pro Javu.

Sada Azure SDK původně obsahovala pouze neblokující asynchronní rozhraní API pro interakci se službami Azure. Tato rozhraní API umožňují pomocí sady Azure SDK efektivně vytvářet škálovatelné aplikace, které používají systémové prostředky. Sada Azure SDK pro Javu ale také obsahuje synchronní klienty, kteří se starají o širší cílovou skupinu, a také umožňují přístup k klientským knihovnám pro uživatele, kteří nejsou obeznámeni s asynchronním programováním. (Viz Přístup v pokynech k návrhu sady Azure SDK.) Všechny klientské knihovny Java v sadě Azure SDK pro Javu proto nabízejí asynchronní i synchronní klienty. K maximalizaci využití systémových prostředků však doporučujeme používat asynchronní klienty pro produkční systémy.

Reaktivní proudy

Pokud se podíváte na část Klienti asynchronní služby v pokynech pro návrh sady Java Azure SDK, všimnete si, že místo použití CompletableFuture javy 8 naše asynchronní rozhraní API používají reaktivní typy. Proč jsme zvolili reaktivní typy namísto typů, které jsou nativně dostupné v JDK?

Java 8 představila funkce, jako jsou Streams, Lambdas a CompletableFuture. Tyto funkce poskytují mnoho funkcí, ale mají určitá omezení.

CompletableFuture poskytuje neblokující schopnosti založené na zpětném volání a rozhraní CompletionStage umožňuje snadné skládání řady asynchronních operací. Lambdas usnadňují čtení těchto rozhraní API založených na nabízených oznámeních. Streamy poskytují operace funkčního stylu pro zpracování kolekce datových prvků. Streamy jsou ale synchronní a nejde je znovu použít. CompletableFuture umožňuje vytvořit jeden požadavek, poskytuje podporu zpětného volání a očekává jednu odpověď. Mnoho cloudových služeb ale vyžaduje možnost streamovat data – například Event Hubs.

Reaktivní streamy mohou pomoci překonat tato omezení tím, že streamují prvky ze zdroje k odběrateli. Když odběratel požádá o data ze zdroje, odešle zdroj libovolný počet výsledků zpět. Nemusí je posílat všechny najednou. Přenos probíhá po určitou dobu, kdykoli zdroj obsahuje data k odeslání.

V tomto modelu odběratel registruje obslužné rutiny událostí ke zpracování dat při doručení. Tyto interakce založené na nabízených oznámeních upozorňují odběratele prostřednictvím různých signálů:

  • Volání onSubscribe() označuje, že přenos dat začíná.
  • Volání onError() označuje chybu, která označuje také konec přenosu dat.
  • Volání onComplete() označuje úspěšné dokončení přenosu dat.

Na rozdíl od streamů Java se reaktivní datové proudy chovají k chybám jako události první třídy. Reaktivní streamy mají vyhrazený kanál pro sdělení chyb odběrateli ze strany zdroje. Reaktivní datové proudy také umožňují odběrateli vyjednat rychlost přenosu dat, aby tyto datové proudy transformoval na model push-pull.

Specifikace Reactive Streams poskytuje standardní způsob přenosu dat. Na vysoké úrovni specifikace definuje následující čtyři rozhraní a určuje pravidla pro způsob implementace těchto rozhraní.

  • Zdroj datového streamu je publisher.
  • Odběratel je příjemcem datového streamu.
  • Předplatné spravuje stav přenosu dat mezi vydavatelem a odběratelem.
  • Procesor je vydavatel i odběratel.

Existují některé dobře známé knihovny Java, které poskytují implementace této specifikace, jako jsou RxJava, Akka Streams, Vert.x a Project Reactor.

Sada Azure SDK pro Javu přijala projekt Reactor a nabídla její asynchronní rozhraní API. Hlavním faktorem, který toto rozhodnutí řídí, bylo zajistit bezproblémovou integraci se spring webfluxem, který také používá projekt Reactor. Dalším faktorem, který přispěl k výběru projektu Reactor místo RxJava, byl, že Project Reactor používá Javu 8, ale RxJava v té době stále používala Javu 7. Project Reactor také nabízí bohatou sadu operátorů, které jsou kompozovatelné a umožňují psát deklarativní kód pro vytváření kanálů pro zpracování dat. Další pěknou věcí o projektu Reactor je, že obsahuje adaptéry pro převod typů projektových reaktorů na jiné oblíbené typy implementace.

Porovnání rozhraní API synchronních a asynchronních operací

Probrali jsme synchronní klienty a možnosti pro asynchronní klienty. Následující tabulka shrnuje, jak vypadají rozhraní API, která jsou navržená pomocí těchto možností:

Typ rozhraní API Žádná hodnota Jedna hodnota Více hodnot
Standardní Java – synchronní rozhraní API void T Iterable<T>
Standardní Java – asynchronní rozhraní API CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Rozhraní reaktivních proudů Publisher<Void> Publisher<T> Publisher<T>
Implementace Reactive Streams v projektu Reactor Mono<Void> Mono<T> Flux<T>

V zájmu úplnosti stojí za zmínku, že Java 9 zavedla třídu Flow , která zahrnuje čtyři reaktivní rozhraní datových proudů. Tato třída však neobsahuje žádnou implementaci.

Použití asynchronních rozhraní API v sadě Azure SDK pro Javu

Specifikace reaktivních datových proudů nerozlišuje mezi typy vydavatelů. Ve specifikaci reaktivních datových proudů vydavatelé jednoduše produkují nula nebo více datových elementů. V mnoha případech existuje užitečný rozdíl mezi vydavatelem, který vytváří maximálně jeden datový prvek, a vydavatelem, který vytváří nula nebo více. V cloudových rozhraních API tento rozdíl označuje, jestli požadavek vrací odpověď s jednou hodnotou nebo kolekci. Projekt Reactor nabízí dva typy, které tento rozdíl rozlišují – Mono a Flux. Rozhraní API, které vrací Mono odpověď, bude obsahovat odpověď, která má maximálně jednu hodnotu, a rozhraní API, které vrátí Flux odpověď, která bude obsahovat nulovou nebo více hodnot.

Předpokládejme například, že k načtení konfigurace uložené pomocí služby Azure App Configuration použijete ConfigurationAsyncClient . (Další informace najdete v tématu Co je Azure App Configuration?).)

Pokud vytvoříte ConfigurationAsyncClient a zavoláte getConfigurationSetting() na klientovi, vrátí Mono, což naznačuje, že odpověď obsahuje jednu hodnotu. Nic nedělá samotné volání této metody. Klient dosud nepodal žádost ke službě Azure App Configuration. V této fázi je Mono<ConfigurationSetting> vrácené tímto API pouze "sestavení" potrubí zpracování dat. To znamená, že požadované nastavení pro využívání dat je hotové. Chcete-li skutečně aktivovat přenos dat (tj. provést požadavek na službu a získat odpověď), musíte se přihlásit k odběru vrácené Mono. Takže při zpracování těchto reaktivních datových proudů musíte pamatovat na volání subscribe() , protože se nic nestane, dokud to neuděláte.

Následující příklad ukazuje, jak se přihlásit k odběru Mono a vytisknout konfigurační hodnotu konzoly.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Všimněte si, že po volání getConfigurationSetting() klienta se ukázkový kód přihlásí k odběru výsledku a poskytuje tři samostatné lambda. První lambda využívá data přijatá ze služby, která se aktivují po úspěšné odpovědi. Druhá lambda se aktivuje, pokud při načítání konfigurace dojde k chybě. Třetí lambda se vyvolá při dokončení datového streamu, což znamená, že z tohoto datového proudu se neočekávají žádné další datové prvky.

Poznámka:

Stejně jako u veškerého asynchronního programování pokračuje po vytvoření předplatného provádění obvyklým způsobem. Pokud program není udržován aktivní a spuštěný, může se ukončit dříve, než se dokončí asynchronní operace. Hlavní vlákno, které zavolalo subscribe(), nebude čekat, až provedete síťové volání do služby Azure App Configuration a obdržíte odpověď. V produkčních systémech můžete pokračovat v zpracování něčeho jiného, ale v tomto příkladu můžete přidat malé zpoždění voláním Thread.sleep() nebo použitím CountDownLatch, čímž dáte asynchronní operaci šanci dokončit.

Jak je znázorněno v následujícím příkladu, rozhraní API, která vrací Flux, také následují podobný vzorec. Rozdíl je v tom, že první zpětná volání, která jsou poskytována metodě subscribe(), jsou volána vícekrát pro každý datový prvek v odpovědi. Chyba nebo zpětná volání při dokončení se volají přesně jednou a jsou považovány za konečné signály. Žádná další zpětná volání nejsou vyvolána, pokud jsou některé z těchto signálů přijaty od vydavatele.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Zpětný tlak

Co se stane, když zdroj vytváří data rychleji, než může odběratel zpracovat? Odběratel může být zahlcený daty, což může vést k chybám nedostatku paměti. Odběratel potřebuje způsob, jak komunikovat s vydavatelem, aby zpomalil, když nemůže držet krok. Ve výchozím nastavení, když voláte subscribe() na Flux podle výše uvedeného příkladu, odběratel požaduje nevázaný proud dat, kterým dává najevo vydavateli, aby data odeslal co nejrychleji. Toto chování není vždy žádoucí a odběratel může potřebovat řídit míru publikování pomocí zpětného tlaku, známého jako "backpressure". Backpressure umožňuje odběrateli převzít kontrolu nad tokem datových prvků. Odběratel si vyžádá omezený počet datových prvků, které může zpracovat. Jakmile odběratel dokončí zpracování těchto prvků, může si odběratel vyžádat další. Pomocí backpressure můžete transformovat model push-model pro přenos dat na model push-pull.

Následující příklad ukazuje, jak můžete řídit rychlost přijetí událostí příjemcem služby Event Hubs:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Když se odběratel poprvé "připojí" k vydavateli, vydavatel předá odběrateli Subscription instanci, která spravuje stav přenosu dat. Toto Subscription je médium, přes které může odběratel použít backpressure tím, že zavolá request() a určí, kolik dalších datových prvků může zpracovat.

Pokud odběratel požaduje více než jeden datový prvek pokaždé, když onNext()volá , request(10) například vydavatel odešle dalších 10 prvků okamžitě, pokud jsou k dispozici nebo jakmile budou k dispozici. Tyto prvky se hromadí ve vyrovnávací paměti u odběratele, a vzhledem k tomu, že každé onNext() volání požaduje dalších 10, backlog se stále zvětšuje, dokud vydavatel nemá k odeslání žádné další datové prvky, nebo dokud vyrovnávací paměť odběratele nepřeteče, což vede k chybám nedostatku paměti.

Zrušení předplatného

Předplatné spravuje stav přenosu dat mezi vydavatelem a odběratelem. Předplatné je aktivní, dokud vydavatel nedokončil přenos všech dat odběrateli nebo odběratele už nemá zájem o příjem dat. Existuje několik způsobů, jak můžete předplatné zrušit, jak je znázorněno níže.

Následující příklad ruší předplatné tím, že ukončí účast odběratele.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

Následující příklad zruší předplatné voláním metody cancel() na Subscription.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Závěr

Vlákna jsou nákladné prostředky, které byste neměli plýtvat při čekání na odpovědi z volání vzdálených služeb. S rostoucím přijetím architektur mikroslužeb je potřeba efektivně škálovat a využívat prostředky. Asynchronní rozhraní API jsou příznivá, pokud existují operace vázané na síť. Sada Azure SDK pro Javu nabízí bohatou sadu rozhraní API pro asynchronní operace, která vám pomůže maximalizovat systémové prostředky. Důrazně doporučujeme vyzkoušet naše asynchronní klienty.

Další informace o operátorech, které nejlépe vyhovují vašim konkrétním úkolům, najdete v referenční příručce Reactor 3.

Další kroky

Teď, když lépe rozumíte různým konceptům asynchronního programování, je důležité se naučit iterovat výsledky. Další informace o nejlepších strategiích iterace a podrobnostech o tom, jak stránkování funguje, najdete v tématu Stránkování a iterace v sadě Azure SDK pro Javu.