Asynkron programmering i Azure SDK för Java

Den här artikeln beskriver den asynkrona programmeringsmodellen i Azure SDK för Java.

Azure SDK innehöll inledningsvis endast icke-blockerande, asynkrona API:er för interaktion med Azure-tjänster. Med dessa API:er kan du använda Azure SDK för att skapa skalbara program som använder systemresurser effektivt. Azure SDK för Java innehåller dock även synkrona klienter för att tillgodose en bredare målgrupp och även göra våra klientbibliotek lättillgängliga för användare som inte är bekanta med asynkron programmering. (Se Kan användas i Riktlinjerna för Azure SDK-design.) Därför erbjuder alla Java-klientbibliotek i Azure SDK för Java både asynkrona och synkrona klienter. Vi rekommenderar dock att du använder asynkrona klienter för produktionssystem för att maximera användningen av systemresurser.

Reaktiva strömmar

Om du tittar på avsnittet Async-tjänstklienter i designriktlinjerna för Java Azure SDK ser du att våra asynkrona API:er använder reaktiva typer i stället för att använda CompletableFuture tillhandahållna av Java 8. Varför valde vi reaktiva typer framför typer som är internt tillgängliga i JDK?

Java 8 introducerade funktioner som Flöden, Lambdas och CompletableFuture. De här funktionerna har många funktioner, men har vissa begränsningar.

CompletableFuture tillhandahåller återanropsbaserade, icke-blockerande funktioner och det CompletionStage gränssnitt som tillåts för enkel sammansättning av en serie asynkrona åtgärder. Lambdas gör dessa push-baserade API:er mer läsbara. Flöden tillhandahålla funktionella åtgärder för att hantera en samling dataelement. Strömmar är dock synkrona och kan inte återanvändas. CompletableFuture gör att du kan göra en enda begäran, ger stöd för ett återanrop och förväntar dig ett enda svar. Många molntjänster kräver dock möjligheten att strömma data – till exempel Event Hubs.

Reaktiva strömmar kan hjälpa dig att övervinna dessa begränsningar genom att strömma element från en källa till en prenumerant. När en prenumerant begär data från en källa skickar källan tillbaka valfritt antal resultat. Det behöver inte skicka alla på en gång. Överföringen sker under en tidsperiod, när källan har data att skicka.

I den här modellen registrerar prenumeranten händelsehanterare för att bearbeta data när de tas emot. Dessa push-baserade interaktioner meddelar prenumeranten via distinkta signaler:

  • Ett onSubscribe() anrop anger att dataöverföringen är på väg att påbörjas.
  • Ett onError() anrop anger att det fanns ett fel som också markerar slutet på dataöverföringen.
  • Ett onComplete() anrop anger att dataöverföringen har slutförts.

Till skillnad från Java Flöden behandlar reaktiva strömmar fel som förstklassiga händelser. Reaktiva strömmar har en dedikerad kanal där källan kan meddela eventuella fel till prenumeranten. Dessutom kan reaktiva strömmar göra det möjligt för prenumeranten att förhandla om dataöverföringshastigheten för att omvandla dessa strömmar till en push-pull-modell.

Specifikationen Reactive Flöden tillhandahåller en standard för hur dataöverföring ska ske. På hög nivå definierar specifikationen följande fyra gränssnitt och anger regler för hur dessa gränssnitt ska implementeras.

  • Publisher är källan till en dataström.
  • Prenumerant är konsument av en dataström.
  • Prenumerationen hanterar tillståndet för dataöverföring mellan en utgivare och en prenumerant.
  • Processorn är både utgivare och prenumerant.

Det finns några välkända Java-bibliotek som tillhandahåller implementeringar av den här specifikationen, till exempel RxJava, Akka Flöden, Vert.x och Project Reactor.

Azure SDK för Java antog Project Reactor för att erbjuda sina asynkrona API:er. Den viktigaste faktorn som drev detta beslut var att tillhandahålla en smidig integrering med Spring Webflux, som också använder Project Reactor. En annan bidragande faktor att välja Project Reactor framför RxJava var att Project Reactor använder Java 8 men RxJava, vid den tiden, var fortfarande på Java 7. Project Reactor erbjuder också en omfattande uppsättning operatorer som är komposterbara och gör att du kan skriva deklarativ kod för att skapa databearbetningspipelines. En annan trevlig sak med Project Reactor är att den har adaptrar för att konvertera Project Reactor-typer till andra populära implementeringstyper.

Jämföra API:er för synkrona och asynkrona åtgärder

Vi diskuterade synkrona klienter och alternativ för asynkrona klienter. Tabellen nedan sammanfattar hur API:er ser ut som är utformade med hjälp av följande alternativ:

API-typ Inget värde Enstaka värde Flera värden
Standard Java – synkrona API:er void T Iterable<T>
Standard Java – Asynkrona API:er CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Reaktiva Flöden gränssnitt Publisher<Void> Publisher<T> Publisher<T>
Projektreaktorimplementering av reaktiv Flöden Mono<Void> Mono<T> Flux<T>

För fullständighetens skull är det värt att nämna att Java 9 introducerade klassen Flow som innehåller de fyra reaktiva strömmarnas gränssnitt. Den här klassen innehåller dock ingen implementering.

Använda asynkrona API:er i Azure SDK för Java

Specifikationen för reaktiva strömmar skiljer inte mellan olika typer av utgivare. I specifikationen för reaktiva strömmar skapar utgivare helt enkelt noll eller fler dataelement. I många fall finns det en användbar skillnad mellan en utgivare som producerar högst ett dataelement jämfört med ett som producerar noll eller mer. I molnbaserade API:er anger den här skillnaden om en begäran returnerar ett svar med en enda värde eller en samling. Project Reactor tillhandahåller två typer för att göra denna skillnad – Mono och Flux. Ett API som returnerar ett Mono innehåller ett svar som har högst ett värde, och ett API som returnerar ett Flux innehåller ett svar som har noll eller fler värden.

Anta till exempel att du använder en ConfigurationAsyncClient för att hämta en konfiguration som lagras med azure appkonfigurationstjänsten. (Mer information finns i Vad är Azure App Configuration?.)

Om du skapar ett ConfigurationAsyncClient och anropar getConfigurationSetting() klienten returneras ett Mono, som anger att svaret innehåller ett enda värde. Men att anropa den här metoden ensam gör ingenting. Klienten har ännu inte gjort någon begäran till Azure App Configuration-tjänsten. I det här skedet är det som returneras av det här API:et Mono<ConfigurationSetting> bara en "sammansättning" av databearbetningspipelinen. Det innebär att den nödvändiga konfigurationen för användning av data är klar. Om du vill utlösa dataöverföringen (dvs. för att göra begäran till tjänsten och få svaret) måste du prenumerera på den returnerade Mono. Så när du hanterar dessa reaktiva strömmar måste du komma ihåg att ringa subscribe() eftersom ingenting händer förrän du gör det.

I följande exempel visas hur du prenumererar på Mono och skriver ut konfigurationsvärdet till konsolen.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Observera att efter att ha anropat getConfigurationSetting() klienten prenumererar exempelkoden på resultatet och tillhandahåller tre separata lambdas. Den första lambda använder data som tagits emot från tjänsten, vilket utlöses vid lyckat svar. Den andra lambda utlöses om det uppstår ett fel när konfigurationen hämtas. Den tredje lambda anropas när dataströmmen är klar, vilket innebär att inga fler dataelement förväntas från den här dataströmmen.

Kommentar

Precis som med all asynkron programmering fortsätter körningen som vanligt när prenumerationen har skapats. Om det inte finns något som gör att programmet är aktivt och körs kan det avslutas innan asynkroniseringsåtgärden har slutförts. Huvudtråden som anropade subscribe() väntar inte tills du gör nätverksanropet till Azure App Configuration och får ett svar. I produktionssystem kan du fortsätta att bearbeta något annat, men i det här exemplet kan du lägga till en liten fördröjning genom att anropa Thread.sleep() eller använda en CountDownLatch för att ge asynkron åtgärden en chans att slutföras.

Som du ser i följande exempel följer ÄVEN API:er som returnerar ett Flux liknande mönster. Skillnaden är att den första motringningen som tillhandahålls subscribe() till metoden anropas flera gånger för varje dataelement i svaret. Felet eller återanropet anropas exakt en gång och betraktas som terminalsignaler. Inga andra återanrop anropas om någon av dessa signaler tas emot från utgivaren.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Ryggtryck

Vad händer när källan producerar data snabbare än vad prenumeranten kan hantera? Prenumeranten kan bli överbelastad med data, vilket kan leda till minnesfel. Prenumeranten behöver ett sätt att kommunicera tillbaka till utgivaren för att sakta ner när den inte kan hänga med. När du anropar subscribe() en Flux som visas i exemplet ovan begär prenumeranten som standard en obundna dataström, vilket anger för utgivaren att skicka data så snabbt som möjligt. Det här beteendet är inte alltid önskvärt och prenumeranten kan behöva kontrollera publiceringshastigheten via "backpressure". Med backpressure kan prenumeranten ta kontroll över flödet av dataelement. En prenumerant begär ett begränsat antal dataelement som de kan hantera. När prenumeranten har slutfört bearbetningen av dessa element kan prenumeranten begära mer. Genom att använda backpressure kan du omvandla en push-modell för dataöverföring till en push-pull-modell.

I följande exempel visas hur du kan styra hur snabbt händelser tas emot av Event Hubs-konsumenten:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

När prenumeranten först "ansluter" till utgivaren ger utgivaren prenumeranten en Subscription instans som hanterar dataöverföringens tillstånd. Det här Subscription är mediet genom vilket prenumeranten kan använda backpressure genom att anropa request() för att ange hur många fler dataelement den kan hantera.

Om prenumeranten begär mer än ett dataelement varje gång den anropar onNext(), request(10) skickar utgivaren till exempel nästa 10 element omedelbart om de är tillgängliga eller när de blir tillgängliga. Dessa element ackumuleras i en buffert på prenumerantens slut, och eftersom varje onNext() anrop begär ytterligare 10 fortsätter kvarvarande uppgifter att växa tills utgivaren antingen inte har några fler dataelement att skicka eller prenumerantens buffertspill, vilket resulterar i minnesfel.

Avsluta en prenumeration

En prenumeration hanterar tillståndet för dataöverföring mellan en utgivare och en prenumerant. Prenumerationen är aktiv tills utgivaren har slutfört överföringen av alla data till prenumeranten eller så är prenumeranten inte längre intresserad av att ta emot data. Det finns ett par sätt att avbryta en prenumeration enligt nedan.

I följande exempel avbryts prenumerationen genom att prenumeranten exponeras:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

Följande exempel avbryter prenumerationen genom att anropa metoden på cancel()Subscription:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Slutsats

Trådar är dyra resurser som du inte bör slösa på att vänta på svar från fjärrtjänstsamtal. I takt med att införandet av mikrotjänstarkitekturer ökar blir behovet av att skala och använda resurser effektivt avgörande. Asynkrona API:er är gynnsamma när det finns nätverksbundna åtgärder. Azure SDK för Java erbjuder en omfattande uppsättning API:er för asynkrona åtgärder för att maximera systemresurserna. Vi rekommenderar starkt att du provar våra asynkrona klienter.

Mer information om de operatorer som bäst passar just dina uppgifter finns i Vilken operatör behöver jag? i referensguiden för Reactor 3.

Nästa steg

Nu när du bättre förstår de olika asynkrona programmeringsbegreppen är det viktigt att lära dig att iterera över resultaten. Mer information om de bästa iterationsstrategierna och information om hur sidnumrering fungerar finns i Sidnumrering och iteration i Azure SDK för Java.