Asynchronní programování v sadě Azure SDK pro Javu
Tento článek popisuje asynchronní programovací model v sadě Azure SDK pro Javu.
Sada Azure SDK původně obsahovala pouze neblokující asynchronní rozhraní API pro interakci se službami Azure. Tato rozhraní API umožňují pomocí sady Azure SDK efektivně vytvářet škálovatelné aplikace, které používají systémové prostředky. Sada Azure SDK pro Javu ale také obsahuje synchronní klienty, kteří se starají o širší cílovou skupinu, a také umožňují přístup k klientským knihovnám pro uživatele, kteří nejsou obeznámeni s asynchronním programováním. (Viz Přístupný v pokynech k návrhu sady Azure SDK.) Všechny klientské knihovny Java v sadě Azure SDK pro Javu proto nabízejí asynchronní i synchronní klienty. K maximalizaci využití systémových prostředků však doporučujeme používat asynchronní klienty pro produkční systémy.
Reaktivní streamy
Pokud se podíváte na část Klienti asynchronní služby v pokynech pro návrh sady Java Azure SDK, všimnete si, že místo použití CompletableFuture
javy 8 naše asynchronní rozhraní API používají reaktivní typy. Proč jsme u typů, které jsou nativně dostupné v JDK, zvolili jsme reaktivní typy?
Java 8 zavedla funkce, jako jsou Toky, Lambdas a CompletableFuture. Tyto funkce poskytují mnoho funkcí, ale mají určitá omezení.
CompletableFuture
poskytuje funkce založené na zpětném volání, neblokující funkce a CompletionStage
rozhraní povolené pro snadné složení řady asynchronních operací. Lambdas usnadňují čtení těchto rozhraní API založených na nabízených oznámeních. Toky poskytují operace funkčního stylu pro zpracování kolekce datových prvků. Streamy jsou ale synchronní a nejde je znovu použít. CompletableFuture
umožňuje vytvořit jeden požadavek, poskytuje podporu zpětného volání a očekává jednu odpověď. Mnoho cloudových služeb ale vyžaduje možnost streamovat data – například Event Hubs.
Reaktivní streamy můžou pomoct tyto omezení překonat prvky streamování ze zdroje do odběratele. Když odběratel požádá o data ze zdroje, odešle zdroj libovolný počet výsledků zpět. Nemusí je posílat všechny najednou. Přenos probíhá po určitou dobu, kdykoli zdroj obsahuje data k odeslání.
V tomto modelu odběratel registruje obslužné rutiny událostí ke zpracování dat při doručení. Tyto interakce založené na nabízených oznámeních upozorňují odběratele prostřednictvím různých signálů:
- Volání
onSubscribe()
označuje, že přenos dat začíná. - Volání
onError()
označuje chybu, která označuje také konec přenosu dat. - Volání
onComplete()
označuje úspěšné dokončení přenosu dat.
Na rozdíl od javy Toky zachází reaktivní streamy s chybami jako s událostmi první třídy. Reaktivní streamy mají vyhrazený kanál, který zdroji umožňuje sdělit všem chybám odběrateli. Reaktivní datové proudy také umožňují odběrateli vyjednat rychlost přenosu dat, aby tyto datové proudy transformoval na model push-pull.
Specifikace reaktivního Toky poskytuje standard pro způsob přenosu dat. Na vysoké úrovni specifikace definuje následující čtyři rozhraní a určuje pravidla pro způsob implementace těchto rozhraní.
- Zdroj datového streamu je publisher .
- Odběratel je příjemcem datového streamu.
- Předplatné spravuje stav přenosu dat mezi vydavatelem a odběratelem.
- Procesor je vydavatel i odběratel.
Existují některé dobře známé knihovny Java, které poskytují implementace této specifikace, jako jsou RxJava, Akka Toky, Vert.x a Projekt Reactor.
Sada Azure SDK pro Javu přijala projekt Reactor a nabídla její asynchronní rozhraní API. Hlavním faktorem, který toto rozhodnutí řídí, bylo zajistit bezproblémovou integraci se spring webfluxem, který také používá projekt Reactor. Dalším faktorem, který přispívá k výběru projektu Reactor přes RxJava, byl, že Project Reactor používá Javu 8, ale RxJava v té době byl stále v Javě 7. Project Reactor také nabízí bohatou sadu operátorů, které jsou kompozovatelné a umožňují psát deklarativní kód pro vytváření kanálů pro zpracování dat. Další pěknou věcí o projektu Reactor je, že obsahuje adaptéry pro převod typů projektových reaktorů na jiné oblíbené typy implementace.
Porovnání rozhraní API synchronních a asynchronních operací
Probrali jsme synchronní klienty a možnosti pro asynchronní klienty. Následující tabulka shrnuje, jak vypadají rozhraní API, která jsou navržená pomocí těchto možností:
Typ rozhraní API | Žádná hodnota | Jedna hodnota | Více hodnot |
---|---|---|---|
Standardní Java – synchronní rozhraní API | void |
T |
Iterable<T> |
Standardní Java – asynchronní rozhraní API | CompletableFuture<Void> |
CompletableFuture<T> |
CompletableFuture<List<T>> |
Reaktivní rozhraní Toky | Publisher<Void> |
Publisher<T> |
Publisher<T> |
Implementace reaktivního Toky projektu Reactor | Mono<Void> |
Mono<T> |
Flux<T> |
V zájmu úplnosti stojí za zmínku, že Java 9 zavedla třídu Flow , která zahrnuje čtyři reaktivní rozhraní datových proudů. Tato třída však neobsahuje žádnou implementaci.
Použití asynchronních rozhraní API v sadě Azure SDK pro Javu
Specifikace reaktivních datových proudů nerozlišuje mezi typy vydavatelů. Ve specifikaci reaktivních datových proudů vydavatelé jednoduše vytvářejí nula nebo více datových prvků. V mnoha případech existuje užitečný rozdíl mezi vydavatelem, který vytváří maximálně jeden datový prvek, než jeden, který vytváří nula nebo více. V cloudových rozhraních API tento rozdíl označuje, jestli požadavek vrací odpověď s jednou hodnotou nebo kolekci. Projekt Reactor nabízí dva typy, které tento rozdíl rozlišují – Mono a Flux. Rozhraní API, které vrací Mono
odpověď, bude obsahovat odpověď, která má maximálně jednu hodnotu, a rozhraní API, které vrátí Flux
odpověď, která bude obsahovat nulovou nebo více hodnot.
Předpokládejme například, že používáte ConfigurationAsyncClient k načtení konfigurace uložené pomocí služby konfigurace Aplikace Azure. (Další informace najdete v tématu Co je konfigurace Aplikace Azure?.)
Pokud vytvoříte ConfigurationAsyncClient
klienta a zavoláte getConfigurationSetting()
ho, vrátí hodnotu Mono
označující, že odpověď obsahuje jednu hodnotu. Volání této metody ale nic nedělá. Klient dosud neudělil žádost službě Aplikace Azure Configuration. V této fázi Mono<ConfigurationSetting>
je vrácené tímto rozhraním API pouze "sestavení" kanálu zpracování dat. To znamená, že požadované nastavení pro využívání dat je hotové. Chcete-li skutečně aktivovat přenos dat (tj. provést požadavek na službu a získat odpověď), musíte se přihlásit k odběru vrácené Mono
. Takže při zpracování těchto reaktivních datových proudů musíte pamatovat na volání subscribe()
, protože se nic nestane, dokud to neuděláte.
Následující příklad ukazuje, jak se přihlásit k odběru Mono
a vytisknout konfigurační hodnotu konzoly.
ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
.connectionString("<your connection string>")
.buildAsyncClient();
asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
config -> System.out.println("Config value: " + config.getValue()),
ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
() -> System.out.println("Successfully retrieved configuration setting"));
System.out.println("Done");
Všimněte si, že po volání getConfigurationSetting()
klienta se ukázkový kód přihlásí k odběru výsledku a poskytuje tři samostatné lambda. První lambda využívá data přijatá ze služby, která se aktivují po úspěšné odpovědi. Druhá lambda se aktivuje, pokud při načítání konfigurace dojde k chybě. Třetí lambda se vyvolá při dokončení datového streamu, což znamená, že z tohoto datového proudu se neočekávají žádné další datové prvky.
Poznámka:
Stejně jako u veškerého asynchronního programování pokračuje po vytvoření předplatného provádění obvyklým způsobem. Pokud program není aktivní a spuštěný, může se ukončit před dokončením asynchronní operace. Hlavní vlákno, které volalsubscribe()
, nebude čekat, až provedete síťové volání Aplikace Azure Konfigurace a obdrží odpověď. V produkčních systémech můžete pokračovat v zpracování něčeho jiného, ale v tomto příkladu můžete přidat malé zpoždění voláním Thread.sleep()
nebo použitím funkce CountDownLatch
async operation šanci dokončit.
Jak je znázorněno v následujícím příkladu, rozhraní API, která vrací Flux
podobný vzor. Rozdíl je v tom, že první zpětné volání poskytnuté metodě subscribe()
je volána vícekrát pro každý datový prvek v odpovědi. Chyba nebo zpětná volání dokončení se volají přesně jednou a jsou považovány za terminálové signály. Žádné další zpětná volání se nevyvolají, pokud některý z těchto signálů obdrží od vydavatele.
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(
event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
Zpětný tlak
Co se stane, když zdroj vytváří data rychleji, než může odběratel zpracovat? Odběratel může být zahlcený daty, což může vést k chybám nedostatku paměti. Odběratel potřebuje způsob, jak komunikovat s vydavatelem, aby zpomalil, když nemůže držet krok. Když ve výchozím nastavení voláte subscribe()
na základě Flux
výše uvedeného příkladu, odběratel požaduje nevázaný datový proud dat, který vydavateli říká, aby data co nejrychleji odeslal. Toto chování není vždy žádoucí a odběratel může muset řídit míru publikování prostřednictvím "backpressure". Backpressure umožňuje odběrateli převzít kontrolu nad tokem datových prvků. Odběratel si vyžádá omezený počet datových prvků, které může zpracovat. Jakmile odběratel dokončí zpracování těchto prvků, může si odběratel vyžádat další. Pomocí backpressure můžete transformovat model push-model pro přenos dat na model push-pull.
Následující příklad ukazuje, jak můžete řídit rychlost přijetí událostí příjemcem služby Event Hubs:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.request(1); // request another event when the subscriber is ready
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Když se odběratel poprvé "připojí" k vydavateli, vydavatel předá odběrateli Subscription
instanci, která spravuje stav přenosu dat. Toto Subscription
je médium, přes které může odběratel použít backpressure voláním request()
určit, kolik dalších datových prvků může zpracovat.
Pokud odběratel požaduje více než jeden datový prvek pokaždé, když onNext()
volá , request(10)
například vydavatel odešle dalších 10 prvků okamžitě, pokud jsou k dispozici nebo jakmile budou k dispozici. Tyto prvky se hromadí ve vyrovnávací paměti na konci odběratele a vzhledem k tomu, že každé onNext()
volání bude požadovat více než 10, backlog se stále zvětšuje, dokud vydavatel nemá k odeslání žádné další datové prvky, nebo přetečení vyrovnávací paměti odběratele, což vede k chybám nedostatku paměti.
Zrušení předplatného:
Předplatné spravuje stav přenosu dat mezi vydavatelem a odběratelem. Předplatné je aktivní, dokud vydavatel nedokončil přenos všech dat odběrateli nebo odběratele už nemá zájem o příjem dat. Existuje několik způsobů, jak můžete předplatné zrušit, jak je znázorněno níže.
Následující příklad zruší předplatné tím, že zruší předplatné tím, že zruší předplatné:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
Disposable disposable = asyncClient.receive().subscribe(
partitionEvent -> {
Long num = partitionEvent.getData().getSequenceNumber()
System.out.println("Sequence number of received event: " + num);
},
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();
Následující příklad zruší předplatné voláním metody :cancel()
Subscription
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.cancel(); // Cancels the subscription. No further event is received.
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Závěr
Vlákna jsou nákladné prostředky, které byste neměli plýtvat při čekání na odpovědi ze vzdálených volání služeb. S rostoucím přijetím architektur mikroslužeb je potřeba efektivně škálovat a využívat prostředky. Asynchronní rozhraní API jsou příznivá, pokud existují operace vázané na síť. Sada Azure SDK pro Javu nabízí bohatou sadu rozhraní API pro asynchronní operace, která vám pomůže maximalizovat systémové prostředky. Důrazně doporučujeme vyzkoušet naše asynchronní klienty.
Další informace o operátorech, které nejlépe vyhovují vašim konkrétním úkolům, najdete v referenční příručce Reactor 3.
Další kroky
Teď, když lépe rozumíte různým konceptům asynchronního programování, je důležité se naučit iterovat výsledky. Další informace o nejlepších strategiích iterace a podrobnostech o tom, jak stránkování funguje, najdete v tématu Stránkování a iterace v sadě Azure SDK pro Javu.