Udostępnij za pośrednictwem


Programowanie asynchroniczne w zestawie Azure SDK dla języka Java

W tym artykule opisano asynchroniczny model programowania w zestawie Azure SDK dla języka Java.

Zestaw Azure SDK początkowo zawierał tylko nieblokujące, asynchroniczne interfejsy API do interakcji z usługami platformy Azure. Te interfejsy API umożliwiają wydajne tworzenie skalowalnych aplikacji korzystających z zasobów systemowych przy użyciu zestawu Azure SDK. Jednak zestaw Azure SDK dla języka Java zawiera również klientów synchronicznych, aby zaspokoić szerszą publiczność, a także sprawić, że nasze biblioteki klienckie będą dostępne dla użytkowników, którzy nie znają programowania asynchronicznego. (Zobacz Podejście do wytycznych dotyczących projektowania zestawu Azure SDK). W związku z tym wszystkie biblioteki klienckie Języka Java w zestawie Azure SDK dla języka Java oferują zarówno klientów asynchronicznych, jak i synchronicznych. Zalecamy jednak używanie klientów asynchronicznych dla systemów produkcyjnych w celu zmaksymalizowania wykorzystania zasobów systemowych.

Strumienie reaktywne

Jeśli zapoznasz się z sekcją Async Service Clients (Klienci usług asynchronicznych ) w wytycznych dotyczących projektowania zestawu Azure SDK platformy Azure, zauważysz, że zamiast korzystać z CompletableFuture interfejsów API asynchronicznych używanych przez język Java 8, nasze asynchroniczne interfejsy API używają typów reaktywnych. Dlaczego wybieraliśmy typy reaktywne dla typów, które są natywnie dostępne w zestawie JDK?

Język Java 8 wprowadził funkcje, takie jak Strumienie, Lambdas i CompletableFuture. Te funkcje zapewniają wiele możliwości, ale mają pewne ograniczenia.

CompletableFuture Zapewnia możliwości oparte na wywołaniu zwrotnym, nieblokujących oraz CompletionStage interfejs dozwolony do łatwego tworzenia serii operacji asynchronicznych. Lambdy sprawiają, że te interfejsy API oparte na wypychaniach są bardziej czytelne. Strumienie zapewnić operacje w stylu funkcjonalnym do obsługi kolekcji elementów danych. Jednak strumienie są synchroniczne i nie można ich używać ponownie. CompletableFutureumożliwia wykonanie jednego żądania, zapewnia obsługę wywołania zwrotnego i oczekuje pojedynczej odpowiedzi. Jednak wiele usług w chmurze wymaga możliwości przesyłania strumieniowego danych — na przykład usługi Event Hubs.

Reaktywne strumienie mogą pomóc w przezwyciężeniu tych ograniczeń przez elementy przesyłania strumieniowego ze źródła do subskrybenta. Gdy subskrybent żąda danych ze źródła, źródło wysyła dowolną liczbę wyników. Nie musi wysyłać ich wszystkich jednocześnie. Transfer odbywa się w danym okresie, za każdym razem, gdy źródło ma dane do wysłania.

W tym modelu subskrybent rejestruje programy obsługi zdarzeń w celu przetwarzania danych po ich nadejściu. Te interakcje oparte na wypychaniach powiadamiają subskrybenta za pomocą odrębnych sygnałów:

  • Wywołanie onSubscribe() wskazuje, że transfer danych ma się rozpocząć.
  • Wywołanie onError() wskazuje, że wystąpił błąd, który oznacza również koniec transferu danych.
  • Wywołanie onComplete() wskazuje pomyślne zakończenie transferu danych.

W przeciwieństwie do Strumienie Java strumienie reaktywne traktują błędy jako zdarzenia pierwszej klasy. Strumienie reaktywne mają dedykowany kanał dla źródła, aby przekazać wszelkie błędy subskrybentowi. Ponadto reaktywne strumienie umożliwiają subskrybentowi negocjowanie szybkości transferu danych w celu przekształcenia tych strumieni w model ściągania wypychanego.

Specyfikacja reaktywnej Strumienie zapewnia standard sposobu przesyłania danych. Na wysokim poziomie specyfikacja definiuje następujące cztery interfejsy i określa reguły dotyczące sposobu implementacji tych interfejsów.

  • Wydawca jest źródłem strumienia danych.
  • Subskrybent jest odbiorcą strumienia danych.
  • Subskrypcja zarządza stanem transferu danych między wydawcą a subskrybentem.
  • Procesor jest zarówno wydawcą, jak i subskrybentem.

Istnieją znane biblioteki Języka Java, które zapewniają implementacje tej specyfikacji, takie jak RxJava, Akka Strumienie, Vert.x i Project Reactor.

Zestaw Azure SDK dla języka Java przyjął usługę Project Reactor, aby oferować swoje asynchroniczne interfejsy API. Głównym czynnikiem wpływającym na tę decyzję było zapewnienie bezproblemowej integracji z platformą Spring Webflux, która korzysta również z biblioteki Project Reactor. Innym czynnikiem przyczyniającym się do wybrania projektu Reactor nad RxJava było to, że program Project Reactor używa języka Java 8, ale RxJava, w tym czasie, był nadal w środowisku Java 7. Project Reactor oferuje również bogaty zestaw operatorów, które są komponowalne i umożliwia pisanie deklaratywnego kodu do tworzenia potoków przetwarzania danych. Kolejną miłą rzeczą w projekcie Reactor jest to, że ma adaptery do konwertowania typów Project Reactor na inne popularne typy implementacji.

Porównywanie interfejsów API operacji synchronicznych i asynchronicznych

Omówiliśmy synchronicznych klientów i opcje dla klientów asynchronicznych. W poniższej tabeli przedstawiono podsumowanie wyglądu interfejsów API, które zostały zaprojektowane przy użyciu następujących opcji:

Typ interfejsu API Brak wartości Pojedyncza wartość Wiele wartości
Standardowa Java — synchroniczne interfejsy API void T Iterable<T>
Standardowa Java — asynchroniczne interfejsy API CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Reaktywne interfejsy Strumienie Publisher<Void> Publisher<T> Publisher<T>
Implementacja reaktywnego Strumienie reaktora projektu Mono<Void> Mono<T> Flux<T>

Ze względu na kompletność warto wspomnieć, że środowisko Java 9 wprowadziło klasę Flow obejmującą cztery interfejsy reaktywnych strumieni. Jednak ta klasa nie zawiera żadnej implementacji.

Używanie asynchronicznych interfejsów API w zestawie Azure SDK dla języka Java

Specyfikacja reaktywnych strumieni nie rozróżnia typów wydawców. W specyfikacji reaktywnych strumieni wydawcy po prostu tworzą zero lub więcej elementów danych. W wielu przypadkach istnieje przydatne rozróżnienie między wydawcą tworzącym co najwyżej jeden element danych a jednym, który generuje zero lub więcej. W interfejsach API opartych na chmurze to rozróżnienie wskazuje, czy żądanie zwraca jednowartościową odpowiedź, czy kolekcję. Project Reactor udostępnia dwa typy, aby to rozróżnienie było rozróżniane — Mono i Flux. Interfejs API zwracający Mono element będzie zawierać odpowiedź zawierającą co najwyżej jedną wartość, a interfejs API, który zwraca Flux wartość, będzie zawierać odpowiedź, która zawiera zero lub więcej wartości.

Załóżmy na przykład, że używasz klasy ConfigurationAsyncClient do pobierania konfiguracji przechowywanej przy użyciu usługi aplikacja systemu Azure Configuration. (Aby uzyskać więcej informacji, zobacz Co to jest konfiguracja aplikacja systemu Azure?).

Jeśli tworzysz element ConfigurationAsyncClient i wywołujesz getConfigurationSetting() klienta, zwraca wartość Mono, która wskazuje, że odpowiedź zawiera jedną wartość. Jednak samo wywołanie tej metody nie wykonuje żadnych czynności. Klient nie złożył jeszcze żądania do usługi konfiguracji aplikacja systemu Azure. Na tym etapie Mono<ConfigurationSetting> zwracany przez ten interfejs API jest tylko "zestawem" potoku przetwarzania danych. Oznacza to, że wymagana konfiguracja do korzystania z danych została ukończona. Aby rzeczywiście wyzwolić transfer danych (czyli wysłać żądanie do usługi i uzyskać odpowiedź), musisz zasubskrybować zwrócony Monoelement . Dlatego podczas pracy z tymi strumieniami reaktywnymi należy pamiętać, aby wywołać wywołanie subscribe() , ponieważ nic się nie dzieje, dopóki nie zrobisz tego.

W poniższym przykładzie pokazano, jak zasubskrybować Mono element i wydrukować wartość konfiguracji w konsoli programu .

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Zwróć uwagę, że po wywołaniu getConfigurationSetting() klienta przykładowy kod subskrybuje wynik i udostępnia trzy oddzielne wyrażenia lambd. Pierwsza funkcja lambda używa danych odebranych z usługi, co jest wyzwalane po pomyślnym wykonaniu odpowiedzi. Drugi element lambda jest wyzwalany, jeśli podczas pobierania konfiguracji występuje błąd. Trzeci element lambda jest wywoływany po zakończeniu strumienia danych, co oznacza, że nie oczekuje się więcej elementów danych z tego strumienia.

Uwaga

Podobnie jak w przypadku wszystkich programowania asynchronicznego, po utworzeniu subskrypcji wykonywanie przebiega jak zwykle. Jeśli nie ma nic, aby program był aktywny i wykonywany, może zakończyć się przed ukończeniem operacji asynchronicznych. Główny wątek, który jest wywoływanysubscribe(), nie będzie czekać, dopóki nie zostanie nawiązane wywołanie sieciowe, aby aplikacja systemu Azure Konfiguracja i otrzymać odpowiedź. W systemach produkcyjnych możesz nadal przetwarzać coś innego, ale w tym przykładzie można dodać małe opóźnienie przez wywołanie Thread.sleep() metody lub użyć elementu , CountDownLatch aby umożliwić wykonanie operacji asynchronicznych.

Jak pokazano w poniższym przykładzie, interfejsy API zwracające Flux również podobny wzorzec. Różnica polega na tym, że pierwsze wywołanie zwrotne dostarczone do subscribe() metody jest wywoływane wiele razy dla każdego elementu danych w odpowiedzi. Błąd lub wywołania zwrotne ukończenia są wywoływane dokładnie raz i są traktowane jako sygnały terminalowe. Żadne inne wywołania zwrotne nie są wywoływane, jeśli jeden z tych sygnałów zostanie odebrany od wydawcy.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Backpressure

Co się stanie, gdy źródło generuje dane w szybszym tempie niż subskrybent może obsłużyć? Subskrybent może być przeciążony danymi, co może prowadzić do błędów braku pamięci. Subskrybent potrzebuje sposobu komunikowania się z powrotem do wydawcy, aby spowolnić, gdy nie może nadążyć. Domyślnie podczas wywoływania subscribe() elementu , Flux jak pokazano w powyższym przykładzie, subskrybent żąda niezwiązanego strumienia danych, wskazując wydawcy, aby wysyłał dane tak szybko, jak to możliwe. Takie zachowanie nie zawsze jest pożądane, a subskrybent może mieć kontrolę nad szybkością publikowania za pośrednictwem "backpressure". Backpressure umożliwia subskrybentowi przejęcie kontroli nad przepływem elementów danych. Subskrybent zażąda ograniczonej liczby elementów danych, które mogą obsłużyć. Po zakończeniu przetwarzania tych elementów subskrybent może zażądać więcej. Korzystając z funkcji backpressure, można przekształcić model wypychania na potrzeby transferu danych do modelu ściągania wypychanego.

W poniższym przykładzie pokazano, jak można kontrolować szybkość odbierania zdarzeń przez użytkownika usługi Event Hubs:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Gdy subskrybent najpierw łączy się z wydawcą, wydawca przekazuje subskrybentowi Subscription wystąpienie, które zarządza stanem transferu danych. Jest to Subscription medium, za pomocą którego subskrybent może zastosować backpressure, wywołując, request() aby określić, ile więcej elementów danych może obsłużyć.

Jeśli subskrybent żąda więcej niż jednego elementu danych za każdym razem, gdy wywołuje onNext()element , request(10) na przykład wydawca wyśle natychmiast 10 kolejnych elementów, jeśli są dostępne lub gdy staną się dostępne. Te elementy gromadzą się w buforze na końcu subskrybenta, a ponieważ każde onNext() wywołanie będzie żądać 10 więcej, zaległości rosną, dopóki wydawca nie ma więcej elementów danych do wysłania lub przepełnienia buforu subskrybenta, co powoduje błędy braku pamięci.

Anulowanie subskrypcji

Subskrypcja zarządza stanem transferu danych między wydawcą a subskrybentem. Subskrypcja jest aktywna, dopóki wydawca nie zakończy przenoszenia wszystkich danych do subskrybenta lub subskrybent nie jest już zainteresowany odbieraniem danych. Istnieje kilka sposobów anulowania subskrypcji, jak pokazano poniżej.

Poniższy przykład anuluje subskrypcję, dysponując subskrybenta:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

Poniższy przykład anuluje subskrypcję, wywołując metodę cancel() w pliku Subscription:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Podsumowanie

Wątki są kosztownymi zasobami, których nie należy marnować na oczekiwanie na odpowiedzi z zdalnych wywołań usługi. Wraz ze wzrostem wdrażania architektur mikrousług potrzeba wydajnego skalowania i używania zasobów staje się niezbędna. Asynchroniczne interfejsy API są korzystne, gdy istnieją operacje związane z siecią. Zestaw Azure SDK dla języka Java oferuje bogaty zestaw interfejsów API dla operacji asynchronicznych, co pomaga zmaksymalizować zasoby systemowe. Zdecydowanie zachęcamy do wypróbowania naszych klientów asynchronicznych.

Aby uzyskać więcej informacji na temat operatorów, które najlepiej pasują do konkretnych zadań, zobacz Który operator potrzebuję? w przewodniku referencyjnym Reactor 3.

Następne kroki

Teraz, gdy lepiej zrozumiesz różne koncepcje programowania asynchronicznego, ważne jest, aby dowiedzieć się, jak iterować wyniki. Aby uzyskać więcej informacji na temat najlepszych strategii iteracji i szczegółów dotyczących działania stronicowania, zobacz Pagination and iteration in the Azure SDK for Java (Stronicowanie i iteracja w zestawie Azure SDK dla języka Java).