Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym artykule opisano asynchroniczny model programowania w zestawie Azure SDK dla języka Java.
Zestaw Azure SDK początkowo zawierał tylko nieblokujące, asynchroniczne interfejsy API do interakcji z usługami platformy Azure. Te interfejsy API umożliwiają wydajne tworzenie skalowalnych aplikacji korzystających z zasobów systemowych przy użyciu zestawu Azure SDK. Jednak zestaw Azure SDK dla języka Java zawiera również klientów synchronicznych, aby zaspokoić szerszą publiczność, a także sprawić, że nasze biblioteki klienckie będą dostępne dla użytkowników, którzy nie znają programowania asynchronicznego. (Zapoznaj się z tematem Przystępności w wytycznych dotyczących projektowania zestawu Azure SDK). W związku z tym wszystkie biblioteki klienckie Javy w zestawie Azure SDK dla Javy oferują zarówno klientów asynchronicznych, jak i synchronicznych. Zalecamy jednak używanie klientów asynchronicznych dla systemów produkcyjnych w celu zmaksymalizowania wykorzystania zasobów systemowych.
Strumienie reaktywne
Jeśli zapoznasz się z sekcją Async Service Clients (Klienci usług asynchronicznych) w wytycznych Java Azure SDK Design Guidelines, zauważysz, że zamiast korzystać z CompletableFuture interfejsów API asynchronicznych używanych przez Java 8, nasze asynchroniczne interfejsy API używają typów reaktywnych. Dlaczego wybraliśmy typy reaktywne zamiast typów natywnie dostępnych w JDK?
Język Java 8 wprowadził funkcje, takie jak Streams, Lambdas i CompletableFuture. Te funkcje zapewniają wiele możliwości, ale mają pewne ograniczenia.
CompletableFuture zapewnia nieblokujące możliwości oparte na wywołaniu zwrotnym, a interfejs CompletionStage umożliwiał łatwe tworzenie serii operacji asynchronicznych. Lambdy sprawiają, że te interfejsy API oparte na "push-based" są bardziej czytelne. Strumienie oferują operacje funkcjonalne umożliwiające zarządzanie zbiorem elementów danych. Jednak strumienie są synchroniczne i nie można ich używać ponownie.
CompletableFuture umożliwia wykonanie jednego żądania, zapewnia obsługę wywołania zwrotnego i oczekuje pojedynczej odpowiedzi. Jednak wiele usług w chmurze wymaga możliwości przesyłania strumieniowego danych — na przykład usługi Event Hubs.
Reaktywne strumienie mogą pomóc w przezwyciężeniu tych ograniczeń, przesyłając elementy strumieniowo ze źródła do subskrybenta. Gdy subskrybent żąda danych ze źródła, źródło wysyła dowolną liczbę wyników. Nie musi wysyłać ich wszystkich jednocześnie. Transfer odbywa się w danym okresie, za każdym razem, gdy źródło ma dane do wysłania.
W tym modelu subskrybent rejestruje programy obsługi zdarzeń w celu przetwarzania danych po ich nadejściu. Te interakcje oparte na wypychaniach powiadamiają subskrybenta za pomocą odrębnych sygnałów:
- Wywołanie
onSubscribe()wskazuje, że transfer danych ma się rozpocząć. - Wywołanie
onError()wskazuje, że wystąpił błąd, który oznacza również koniec transferu danych. - Wywołanie
onComplete()wskazuje pomyślne zakończenie transferu danych.
W przeciwieństwie do strumieni Java strumieni reaktywne traktują błędy jako zdarzenia pierwszej klasy. Strumienie reaktywne mają specjalny kanał, za pomocą którego źródło przekazuje wszelkie błędy subskrybentowi. Ponadto reaktywne strumienie umożliwiają subskrybentowi negocjowanie szybkości transferu danych w celu przekształcenia tych strumieni w model przepływowy.
Specyfikacja reaktywnych strumieni udostępnia standard sposobu przesyłania danych. Na wysokim poziomie specyfikacja definiuje następujące cztery interfejsy i określa reguły dotyczące sposobu implementacji tych interfejsów.
- Wydawca jest źródłem strumienia danych.
- Subskrybent jest odbiorcą strumienia danych.
- Subskrypcja zarządza stanem transferu danych między wydawcą a subskrybentem.
- Procesor jest zarówno wydawcą, jak i subskrybentem.
Istnieją znane biblioteki Języka Java, które zapewniają implementacje tej specyfikacji, takie jak RxJava, Akka Streams, Vert.x i Project Reactor.
Zestaw Azure SDK dla języka Java przyjął usługę Project Reactor, aby oferować swoje asynchroniczne interfejsy API. Głównym czynnikiem wpływającym na tę decyzję było zapewnienie bezproblemowej integracji z platformą Spring Webflux, która korzysta również z biblioteki Project Reactor. Innym czynnikiem przyczyniającym się do wybrania projektu Reactor nad RxJava było to, że program Project Reactor używa języka Java 8, ale RxJava, w tym czasie, był nadal w środowisku Java 7. Project Reactor oferuje również bogaty zestaw operatorów, które są komponowalne i umożliwia pisanie deklaratywnego kodu do tworzenia potoków przetwarzania danych. Kolejną miłą rzeczą w projekcie Reactor jest to, że ma adaptery do konwertowania typów Project Reactor na inne popularne typy implementacji.
Porównywanie interfejsów API operacji synchronicznych i asynchronicznych
Omówiliśmy synchronicznych klientów i opcje dla klientów asynchronicznych. W poniższej tabeli przedstawiono podsumowanie wyglądu interfejsów API, które zostały zaprojektowane przy użyciu następujących opcji:
| Typ interfejsu API | Brak wartości | Pojedyncza wartość | Wiele wartości |
|---|---|---|---|
| Standardowa Java — synchroniczne interfejsy API | void |
T |
Iterable<T> |
| Standardowa Java — asynchroniczne interfejsy API | CompletableFuture<Void> |
CompletableFuture<T> |
CompletableFuture<List<T>> |
| Interfejsy reaktywnych strumieni | Publisher<Void> |
Publisher<T> |
Publisher<T> |
| Implementacja Reactive Streams w Project Reactor | Mono<Void> |
Mono<T> |
Flux<T> |
Ze względu na kompletność warto wspomnieć, że środowisko Java 9 wprowadziło klasę Flow obejmującą cztery interfejsy reaktywnych strumieni. Jednak ta klasa nie zawiera żadnej implementacji.
Użyj asynchronicznych interfejsów API w zestawie Azure SDK dla Java.
Specyfikacja strumieni reaktywnych nie rozróżnia typów emitentów. W specyfikacji reaktywnych strumieni wydawcy po prostu tworzą zero lub więcej elementów danych. W wielu przypadkach istnieje przydatne rozróżnienie między wydawcą tworzącym co najwyżej jeden element danych a jednym, który generuje zero lub więcej. W interfejsach API opartych na chmurze to rozróżnienie wskazuje, czy żądanie zwraca jednowartościową odpowiedź, czy kolekcję. Project Reactor udostępnia dwa typy, aby je odróżnić — Mono i Flux. Interfejs API zwracający Mono element będzie zawierać odpowiedź zawierającą co najwyżej jedną wartość, a interfejs API, który zwraca Flux wartość, będzie zawierać odpowiedź, która zawiera zero lub więcej wartości.
Załóżmy na przykład, że używasz klasy ConfigurationAsyncClient do pobierania konfiguracji przechowywanej przy użyciu usługi Azure App Configuration. (Aby uzyskać więcej informacji, zobacz Co to jest usługa Azure App Configuration?).
Jeśli tworzysz element ConfigurationAsyncClient i wywołujesz getConfigurationSetting() klienta, zwraca wartość Mono, która wskazuje, że odpowiedź zawiera jedną wartość. Jednak samo wywołanie tej metody nie powoduje żadnych efektów. Klient nie złożył jeszcze żądania do usługi Azure App Configuration. Na tym etapie dane zwracane przez ten interfejs API w postaci Mono<ConfigurationSetting> są tylko "zestawem" dla potoku przetwarzania danych. Oznacza to, że wymagana konfiguracja do korzystania z danych została ukończona. Aby rzeczywiście wyzwolić transfer danych (czyli wysłać żądanie do usługi i uzyskać odpowiedź), musisz zasubskrybować zwrócony Mono. Dlatego podczas pracy z tymi strumieniami reaktywnymi należy pamiętać, aby wywołać subscribe(), ponieważ nic się nie dzieje, dopóki nie zrobisz tego.
W poniższym przykładzie pokazano, jak zasubskrybować Mono i wyświetlić wartość konfiguracji na konsoli.
ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
.connectionString("<your connection string>")
.buildAsyncClient();
asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
config -> System.out.println("Config value: " + config.getValue()),
ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
() -> System.out.println("Successfully retrieved configuration setting"));
System.out.println("Done");
Zwróć uwagę, że po wywołaniu getConfigurationSetting() klienta przykładowy kod subskrybuje wynik i udostępnia trzy oddzielne wyrażenia lambd. Pierwsza funkcja lambda używa danych odebranych z usługi, co jest wyzwalane po pomyślnym wykonaniu odpowiedzi. Druga funkcja lambda jest wyzwalana, jeśli podczas pobierania konfiguracji wystąpi błąd. Trzeci element lambda jest wywoływany po zakończeniu strumienia danych, co oznacza, że nie oczekuje się więcej elementów danych z tego strumienia.
Uwaga / Notatka
Podobnie jak w przypadku całego programowania asynchronicznego, po utworzeniu subskrypcji, wykonywanie przebiegać będzie jak zwykle. Jeśli nie ma nic, co utrzyma aktywność programu i jego wykonywanie, może zakończyć się przed ukończeniem operacji asynchronicznej. Główny wątek, który wywołał subscribe(), nie będzie czekać, aż zrealizujesz połączenie sieciowe z usługą Azure App Configuration i otrzymasz odpowiedź. W systemach produkcyjnych możesz nadal przetwarzać coś innego, ale w tym przykładzie można dodać małe opóźnienie przez wywołanie Thread.sleep() metody lub użyć elementu , CountDownLatch aby umożliwić wykonanie operacji asynchronicznych.
Jak pokazano w poniższym przykładzie, interfejsy API zwracające Flux również stosują podobny wzorzec. Różnica polega na tym, że pierwsze wywołanie zwrotne dostarczone do subscribe() metody jest wywoływane wiele razy dla każdego elementu danych w odpowiedzi. Wywołania zwrotne błędu lub ukończenia zostają wywoływane dokładnie jeden raz i są traktowane jako sygnały końcowe. Żadne inne wywołania zwrotne nie są wywoływane, jeśli którykolwiek z tych sygnałów zostanie odebrany od wydawcy.
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(
event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
Przeciwnacisk
Co się stanie, gdy źródło generuje dane w szybszym tempie niż subskrybent może obsłużyć? Subskrybent może być przeciążony danymi, co może prowadzić do błędów związanych z brakiem pamięci. Subskrybent potrzebuje sposobu komunikowania się z powrotem do wydawcy, aby spowolnić, gdy nie może nadążyć. Domyślnie podczas wywoływania subscribe() na Flux, jak pokazano w powyższym przykładzie, subskrybent żąda nieograniczonego strumienia danych, wskazując wydawcy, aby wysyłał dane tak szybko, jak to możliwe. Takie zachowanie nie zawsze jest pożądane, a subskrybent może kontrolować tempo publikowania poprzez "przeciążenie zwrotne". Backpressure umożliwia subskrybentowi przejęcie kontroli nad przepływem elementów danych. Subskrybent zażąda ograniczonej liczby elementów danych, które mogą obsłużyć. Po zakończeniu przetwarzania tych elementów subskrybent może zażądać więcej. Korzystając z funkcji backpressure, można przekształcić model wypychania na potrzeby transferu danych do modelu ściągania wypychanego.
W poniższym przykładzie pokazano, jak można kontrolować szybkość odbierania zdarzeń przez użytkownika usługi Event Hubs:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.request(1); // request another event when the subscriber is ready
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Gdy subskrybent najpierw łączy się z wydawcą, wydawca przekazuje subskrybentowi Subscription wystąpienie, które zarządza stanem transferu danych. Jest to Subscription kanał, za pomocą którego subskrybent może zastosować kontrolę przepływu, wywołując request() aby określić, ile dodatkowych elementów danych może obsłużyć.
Jeśli subskrybent żąda więcej niż jednego elementu danych za każdym razem, gdy wywołuje onNext(), request(10), wydawca natychmiast wyśle 10 kolejnych elementów, jeśli są dostępne lub gdy staną się dostępne. Te elementy gromadzą się w buforze po stronie subskrybenta, a ponieważ każde onNext() wywołanie zażąda kolejnych 10, zaległości rosną, dopóki wydawca nie ma więcej elementów danych do wysłania lub bufor subskrybenta przepełnia się, co powoduje błędy związane z brakiem pamięci.
Anulowanie subskrypcji
Subskrypcja zarządza stanem transferu danych między wydawcą a subskrybentem. Subskrypcja jest aktywna, dopóki wydawca nie zakończy przenoszenia wszystkich danych do subskrybenta lub subskrybent nie jest już zainteresowany odbieraniem danych. Istnieje kilka sposobów anulowania subskrypcji, jak pokazano poniżej.
Poniższy przykład anuluje subskrypcję, usuwając subskrybenta.
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
Disposable disposable = asyncClient.receive().subscribe(
partitionEvent -> {
Long num = partitionEvent.getData().getSequenceNumber()
System.out.println("Sequence number of received event: " + num);
},
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();
Poniższy przykład anuluje subskrypcję, wywołując metodę cancel() w pliku Subscription:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.cancel(); // Cancels the subscription. No further event is received.
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Podsumowanie
Wątki są kosztownymi zasobami, których nie należy marnować na oczekiwanie na odpowiedzi z zdalnych wywołań usługi. Wraz ze wzrostem wdrażania architektur mikrousług potrzeba wydajnego skalowania i używania zasobów staje się niezbędna. Asynchroniczne interfejsy API są korzystne, gdy istnieją operacje związane z siecią. Zestaw Azure SDK dla języka Java oferuje bogaty zestaw interfejsów API dla operacji asynchronicznych, co pomaga zmaksymalizować zasoby systemowe. Zdecydowanie zachęcamy do wypróbowania naszych klientów asynchronicznych.
Aby uzyskać więcej informacji na temat operatorów, które najlepiej pasują do konkretnych zadań, zobacz Który operator potrzebuję? w przewodniku referencyjnym Reactor 3.
Dalsze kroki
Teraz, gdy lepiej zrozumiesz różne koncepcje programowania asynchronicznego, ważne jest, aby dowiedzieć się, jak iterować wyniki. Aby uzyskać więcej informacji na temat najlepszych strategii iteracji i szczegółów dotyczących działania stronicowania, zobacz Pagination and iteration in the Azure SDK for Java (Stronicowanie i iteracja w zestawie Azure SDK dla języka Java).