Delen via


Asynchrone programmering in de Azure SDK voor Java

In dit artikel wordt het asynchrone programmeermodel in de Azure SDK voor Java beschreven.

De Azure SDK bevatte aanvankelijk alleen niet-blokkerende, asynchrone API's voor interactie met Azure-services. Met deze API's kunt u de Azure SDK gebruiken om schaalbare toepassingen te bouwen die efficiënt systeembronnen gebruiken. De Azure SDK voor Java bevat echter ook synchrone clients voor een breder publiek en maken onze clientbibliotheken ook geschikt voor gebruikers die niet bekend zijn met asynchrone programmering. (Zie Benaderbaar in de ontwerprichtlijnen voor De Azure SDK.) Als zodanig bieden alle Java-clientbibliotheken in de Azure SDK voor Java zowel asynchrone als synchrone clients. We raden u echter aan de asynchrone clients voor productiesystemen te gebruiken om het gebruik van systeembronnen te maximaliseren.

Reactieve streams

Als u de sectie Async Service Clients in de Java Azure SDK-ontwerprichtlijnen bekijkt, ziet u dat onze asynchrone API's reactieve typen gebruiken in plaats van CompletableFuture dat ze worden gebruikt door Java 8. Waarom hebben we reactieve typen gekozen voor typen die systeemeigen beschikbaar zijn in JDK?

Java 8 heeft functies geïntroduceerd zoals Streams, Lambdas en CompletableFuture. Deze functies bieden veel mogelijkheden, maar hebben enkele beperkingen.

CompletableFuture biedt op callback gebaseerde, niet-blokkerende mogelijkheden en de CompletionStage interface die is toegestaan voor een eenvoudige samenstelling van een reeks asynchrone bewerkingen. Lambdas maakt deze push-API's beter leesbaar. Streams bieden functionele bewerkingen voor het verwerken van een verzameling gegevenselementen. Streams zijn echter synchroon en kunnen niet opnieuw worden gebruikt. CompletableFuture stelt u in staat om één aanvraag te doen, biedt ondersteuning voor een callback en verwacht één antwoord. Veel cloudservices vereisen echter de mogelijkheid om gegevens te streamen- Event Hubs bijvoorbeeld.

Reactieve streams kunnen helpen deze beperkingen te overwinnen door streaming-elementen van een bron naar een abonnee te verwijderen. Wanneer een abonnee gegevens van een bron aanvraagt, verzendt de bron een willekeurig aantal resultaten terug. Het hoeft ze niet allemaal tegelijk te verzenden. De overdracht vindt plaats gedurende een bepaalde periode, wanneer de bron gegevens heeft die moeten worden verzonden.

In dit model registreert de abonnee gebeurtenis-handlers om gegevens te verwerken wanneer deze binnenkomen. Deze op push gebaseerde interacties stellen de abonnee op de hoogte via afzonderlijke signalen:

  • Een onSubscribe() aanroep geeft aan dat de gegevensoverdracht op het punt staat te beginnen.
  • Een onError() aanroep geeft aan dat er een fout is opgetreden, die ook het einde van de gegevensoverdracht markeert.
  • Een onComplete() aanroep geeft aan dat de gegevensoverdracht is voltooid.

In tegenstelling tot Java Streams behandelen reactieve streams fouten als eersteklas gebeurtenissen. Reactieve streams hebben een toegewezen kanaal voor de bron om eventuele fouten aan de abonnee te communiceren. Met reactieve streams kan de abonnee ook onderhandelen over de snelheid van gegevensoverdracht om deze stromen te transformeren in een push-pull-model.

De specificatie reactieve streams biedt een standaard voor de wijze waarop de gegevensoverdracht moet plaatsvinden. Op hoog niveau definieert de specificatie de volgende vier interfaces en geeft deze regels op voor de wijze waarop deze interfaces moeten worden geïmplementeerd.

  • Publisher is de bron van een gegevensstroom.
  • Abonnee is de consument van een gegevensstroom.
  • Het abonnement beheert de status van gegevensoverdracht tussen een uitgever en een abonnee.
  • Processor is zowel een uitgever als een abonnee.

Er zijn enkele bekende Java-bibliotheken die implementaties van deze specificatie bieden, zoals RxJava, Akka Streams, Vert.x en Project Reactor.

De Azure SDK voor Java heeft Project Reactor aangenomen om de asynchrone API's aan te bieden. De belangrijkste factor voor deze beslissing was het bieden van een soepele integratie met Spring Webflux, die ook gebruikmaakt van Project Reactor. Een andere bijdragefactor voor het kiezen van Project Reactor over RxJava was dat Project Reactor Java 8 gebruikt, maar RxJava was op dat moment nog steeds op Java 7. Project Reactor biedt ook een uitgebreide set operators die kunnen worden opgesteld en waarmee u declaratieve code kunt schrijven voor het bouwen van pijplijnen voor gegevensverwerking. Een ander leuk ding over Project Reactor is dat het adapters heeft voor het converteren van Project Reactor-typen naar andere populaire implementatietypen.

Vergelijking van API's van synchrone en asynchrone bewerkingen

We hebben de synchrone clients en opties voor asynchrone clients besproken. In de onderstaande tabel ziet u een overzicht van hoe API's eruitzien die zijn ontworpen met behulp van deze opties:

Type API Geen waarde Enkele waarde Meerdere waarden
Standaard Java - Synchrone API's void T Iterable<T>
Standaard Java - Asynchrone API's CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Reactieve streams-interfaces Publisher<Void> Publisher<T> Publisher<T>
Project Reactor-implementatie van reactieve streams Mono<Void> Mono<T> Flux<T>

Voor de volledigheid is het de moeite waard om te vermelden dat Java 9 de Flow-klasse heeft geïntroduceerd die de vier reactieve streams-interfaces bevat. Deze klasse bevat echter geen implementatie.

asynchrone API's gebruiken in de Azure SDK voor Java

De specificatie voor reactieve streams maakt geen onderscheid tussen typen uitgevers. In de specificatie voor reactieve streams produceren uitgevers gewoon nul of meer gegevenselementen. In veel gevallen is er een nuttig onderscheid tussen een uitgever die maximaal één gegevenselement produceert versus één dat nul of meer produceert. In cloud-API's geeft dit onderscheid aan of een aanvraag een antwoord met één waarde of een verzameling retourneert. Project Reactor biedt twee soorten om dit onderscheid te maken: Mono en Flux. Een API die een Mono retourneert, bevat een antwoord met maximaal één waarde en een API die een Flux antwoord retourneert dat nul of meer waarden bevat.

Stel dat u een ConfigurationAsyncClient gebruikt om een configuratie op te halen die is opgeslagen met behulp van de Azure-app Configuration-service. (Zie voor meer informatie Wat is Azure-app-configuratie?))

Als u een ConfigurationAsyncClient client maakt en aanroept getConfigurationSetting() , wordt een Mono, die aangeeft dat het antwoord één waarde bevat. Het aanroepen van deze methode doet echter niets. De client heeft nog geen aanvraag ingediend bij de Azure-app Configuration-service. In deze fase is de Mono<ConfigurationSetting> geretourneerde door deze API slechts een 'assembly' van de pijplijn voor gegevensverwerking. Dit betekent dat de vereiste installatie voor het verbruik van de gegevens is voltooid. Als u de gegevensoverdracht daadwerkelijk wilt activeren (dat wil gezegd, om de aanvraag naar de service te verzenden en het antwoord te krijgen), moet u zich abonneren op de geretourneerde Monogegevens. Bij het omgaan met deze reactieve streams moet u er dus rekening mee houden dat subscribe() er niets gebeurt totdat u dit doet.

In het volgende voorbeeld ziet u hoe u zich abonneert op de Mono configuratiewaarde en de configuratiewaarde afdrukt op de console.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

U ziet dat na het aanroepen getConfigurationSetting() van de client de voorbeeldcode zich abonneert op het resultaat en drie afzonderlijke lambdas biedt. De eerste lambda verbruikt gegevens die zijn ontvangen van de service, die wordt geactiveerd na een geslaagde reactie. De tweede lambda wordt geactiveerd als er een fout optreedt tijdens het ophalen van de configuratie. De derde lambda wordt aangeroepen wanneer de gegevensstroom is voltooid, wat betekent dat er geen gegevenselementen meer worden verwacht uit deze stroom.

Notitie

Net als bij alle asynchrone programmering, gaat de uitvoering, nadat het abonnement is gemaakt, zoals gebruikelijk. Als er niets is om het programma actief te houden en uit te voeren, kan het worden beëindigd voordat de asynchrone bewerking is voltooid. De hoofdthread die wordt aangeroepensubscribe(), wacht niet totdat u de netwerkoproep naar Azure-app Configuratie maakt en een antwoord ontvangt. In productiesystemen kunt u iets anders blijven verwerken, maar in dit voorbeeld kunt u een kleine vertraging toevoegen door aan te roepen Thread.sleep() of een CountDownLatch te gebruiken om de asynchrone bewerking te voltooien.

Zoals wordt weergegeven in het volgende voorbeeld, volgen API's die eenzelfde Flux patroon retourneren. Het verschil is dat de eerste callback die aan de subscribe() methode is verstrekt, meerdere keren wordt aangeroepen voor elk gegevenselement in het antwoord. De fout of de callbacks voor voltooiing worden exact één keer aangeroepen en worden beschouwd als terminalsignalen. Er worden geen andere callbacks aangeroepen als een van deze signalen van de uitgever wordt ontvangen.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Tegendruk

Wat gebeurt er wanneer de bron de gegevens sneller produceert dan de abonnee kan verwerken? De abonnee kan overweldigd raken met gegevens, wat kan leiden tot fouten met onvoldoende geheugen. De abonnee heeft een manier nodig om terug te communiceren met de uitgever om te vertragen wanneer deze niet kan bijhouden. Wanneer u een Flux aanroept subscribe() zoals in het bovenstaande voorbeeld, vraagt de abonnee standaard een niet-gebonden gegevensstroom aan, die aangeeft dat de uitgever de gegevens zo snel mogelijk moet verzenden. Dit gedrag is niet altijd wenselijk en de abonnee moet mogelijk de frequentie van publiceren beheren via 'backpressure'. Met backpressie kan de abonnee de controle over de stroom van gegevenselementen overnemen. Een abonnee vraagt een beperkt aantal gegevenselementen aan die ze kunnen verwerken. Nadat de abonnee de verwerking van deze elementen heeft voltooid, kan de abonnee meer aanvragen. Met backpressure kunt u een push-model voor gegevensoverdracht transformeren naar een push-pull-model.

In het volgende voorbeeld ziet u hoe u de snelheid kunt bepalen waarmee gebeurtenissen worden ontvangen door de Event Hubs-consument:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Wanneer de abonnee voor het eerst verbinding maakt met de uitgever, geeft de uitgever de abonnee een Subscription exemplaar, waarmee de status van de gegevensoverdracht wordt beheerd. Dit Subscription is het medium waarmee de abonnee tegendruk kan toepassen door aan te roepen request() om op te geven hoeveel meer gegevenselementen het kan verwerken.

Als de abonnee meer dan één gegevenselement aanvraagt telkens wanneer het wordt aanroepen onNext(), request(10) verzendt de uitgever bijvoorbeeld de volgende 10 elementen onmiddellijk als ze beschikbaar zijn of wanneer ze beschikbaar zijn. Deze elementen verzamelen zich in een buffer aan het einde van de abonnee en aangezien elke onNext() aanroep meer dan 10 aanvragen, blijft de achterstand toenemen totdat de uitgever geen gegevenselementen meer heeft om te verzenden, of de bufferoverloop van de abonnee, wat resulteert in fouten met onvoldoende geheugen.

Een abonnement opzeggen

Een abonnement beheert de status van gegevensoverdracht tussen een uitgever en een abonnee. Het abonnement is actief totdat de uitgever de overdracht van alle gegevens naar de abonnee heeft voltooid of de abonnee niet langer geïnteresseerd is in het ontvangen van gegevens. Er zijn een aantal manieren waarop u een abonnement kunt opzeggen, zoals hieronder wordt weergegeven.

In het volgende voorbeeld wordt het abonnement geannuleerd door de abonnee op te heffen:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

In het volgende voorbeeld wordt het abonnement geannuleerd door de cancel() methode aan te roepen op Subscription:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Conclusie

Threads zijn dure resources die u niet mag verspillen aan het wachten op antwoorden van externe serviceaanroepen. Naarmate de acceptatie van microservicesarchitecturen toeneemt, wordt de noodzaak om resources efficiënt te schalen en te gebruiken essentieel. Asynchrone API's zijn gunstig wanneer er netwerkgebonden bewerkingen zijn. De Azure SDK voor Java biedt een uitgebreide set API's voor asynchrone bewerkingen om uw systeemresources te maximaliseren. We raden u ten zeerste aan om onze asynchrone klanten uit te proberen.

Zie welke operator heb ik nodig in de referentiegids reactor 3 voor meer informatie over de operators die het beste bij uw specifieke taken passen.

Volgende stappen

Nu u de verschillende asynchrone programmeerconcepten beter begrijpt, is het belangrijk om te leren hoe u de resultaten kunt herhalen. Zie Paginering en iteratie in de Azure SDK voor Java voor meer informatie over de beste iteratiestrategieën en details over de werking van paginering.