Condividi tramite


Programmazione asincrona in Azure SDK per Java

Questo articolo descrive il modello di programmazione asincrona in Azure SDK per Java.

Azure SDK contiene inizialmente solo API asincrone non bloccanti per l'interazione con i servizi di Azure. Queste API consentono di usare Azure SDK per creare applicazioni scalabili che usano le risorse di sistema in modo efficiente. Tuttavia, Azure SDK per Java contiene anche client sincroni per soddisfare un pubblico più ampio e rendere anche le librerie client raggiungibili per gli utenti che non hanno familiarità con la programmazione asincrona. Vedere Approachable nelle linee guida di progettazione dell'Azure SDK. Di conseguenza, tutte le librerie client Java nell'Azure SDK per Java offrono sia client asincroni che sincroni. È tuttavia consigliabile usare i client asincroni per i sistemi di produzione per ottimizzare l'uso delle risorse di sistema.

Flussi reattivi

Se si esamina la sezione Client del servizio asincrono nelle Linee Guida di Progettazione di Java Azure SDK, si noterà che, anziché usare CompletableFuture fornito da Java 8, le API asincrone usano tipi reattivi. Perché abbiamo scelto tipi reattivi rispetto ai tipi disponibili in modo nativo in JDK?

Java 8 ha introdotto funzionalità come Streams, Lambdas e CompletableFuture. Queste funzionalità offrono molte funzionalità, ma presentano alcune limitazioni.

CompletableFuture fornisce funzionalità basate su callback, non bloccanti, e l'interfaccia CompletionStage consentiva una facile composizione di una serie di operazioni asincrone. Le espressioni lambda rendono queste API basate su push più leggibili. I flussi forniscono operazioni di tipo funzionale per gestire una raccolta di elementi di dati. Tuttavia, i flussi sono sincroni e non possono essere riutilizzati. CompletableFuture consente di effettuare una singola richiesta, di fornire supporto per un callback e di prevedere una singola risposta. Tuttavia, molti servizi cloud richiedono la possibilità di trasmettere i dati, ad esempio Hub eventi.

I flussi reattivi possono aiutare a superare queste limitazioni trasmettendo elementi da un'origine a un sottoscrittore. Quando un sottoscrittore richiede dati da un'origine, l'origine invia un numero qualsiasi di risultati. Non è necessario inviarli tutti contemporaneamente. Il trasferimento avviene in un periodo di tempo, ogni volta che l'origine contiene dati da inviare.

In questo modello, l'abbonato registra i gestori degli eventi per elaborare i dati quando arrivano. Queste interazioni basate su push notificano al sottoscrittore tramite segnali distinti:

  • Una onSubscribe() chiamata indica che il trasferimento dei dati sta per iniziare.
  • Una onError() chiamata indica che si è verificato un errore, che contrassegna anche la fine del trasferimento dei dati.
  • Una onComplete() chiamata indica il completamento corretto del trasferimento dei dati.

A differenza di Java Streams, i flussi reattivi considerano gli errori come eventi di prima classe. I flussi reattivi hanno un canale dedicato per l'origine per comunicare eventuali errori al sottoscrittore. Inoltre, i flussi reattivi consentono al sottoscrittore di negoziare la velocità di trasferimento dei dati per trasformare questi flussi in un modello push-pull.

La specifica Dei flussi reattivi fornisce uno standard per il modo in cui deve verificarsi il trasferimento dei dati. A livello generale, la specifica definisce le quattro interfacce seguenti e specifica le regole su come implementare queste interfacce.

  • Publisher è l'origine di un flusso di dati.
  • Il sottoscrittore è il consumatore di un flusso di dati.
  • La sottoscrizione gestisce lo stato del trasferimento dei dati tra un server di pubblicazione e un sottoscrittore.
  • Il processore è sia un pubblicatore che un sottoscrittore.

Esistono alcune librerie Java note che forniscono implementazioni di questa specifica, ad esempio RxJava, Akka Streams, Vert.x e Project Reactor.

Azure SDK per Java ha adottato Project Reactor per offrire le API asincrone. Il fattore principale che guida questa decisione è stato quello di fornire un'integrazione senza problemi con Spring Webflux, che utilizza anche Project Reactor. Un altro fattore che contribuisce a scegliere Project Reactor su RxJava era che Project Reactor usa Java 8 ma RxJava, al momento, era ancora in Java 7. Project Reactor offre anche un set completo di operatori componibili e che consentono di scrivere codice dichiarativo per la compilazione di pipeline di elaborazione dati. Un altro aspetto interessante di Project Reactor è che dispone di adattatori per la conversione dei tipi Project Reactor in altri tipi di implementazione comuni.

Confronto di API di operazioni sincrone e asincrone

Sono stati illustrati i client e le opzioni sincroni per i client asincroni. La tabella seguente riepiloga l'aspetto delle API progettate usando queste opzioni:

Tipo di API Nessun valore Valore singolo Valori multipli
Java Standard - API sincrone void T Iterable<T>
Java Standard - API asincrone CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Interfacce di flussi reattivi Publisher<Void> Publisher<T> Publisher<T>
Implementazione del reattore di progetto di flussi reattivi Mono<Void> Mono<T> Flux<T>

Per motivi di completezza, vale la pena ricordare che Java 9 ha introdotto la classe Flow che include le quattro interfacce di flussi reattivi. Tuttavia, questa classe non include alcuna implementazione.

Usare le API asincrone dell'Azure SDK per Java

La specifica dei flussi reattivi non distingue tra i tipi di editori. Nella specifica dei flussi reattivi, gli editori producono semplicemente zero o più elementi di dati. In molti casi, esiste una distinzione utile tra un editore che produce al massimo un elemento dati rispetto a uno che produce zero o più. Nelle API basate sul cloud questa distinzione indica se una richiesta restituisce una risposta a valore singolo o una raccolta. Project Reactor fornisce due tipi per fare questa distinzione: Mono e Flux. Un'API che restituisce un Mono oggetto conterrà una risposta con al massimo un valore e un'API che restituisce un Flux conterrà una risposta con zero o più valori.

Si supponga, ad esempio, di usare ConfigurationAsyncClient per recuperare una configurazione archiviata usando il servizio Configurazione app di Azure. Per altre informazioni, vedere Informazioni su Configurazione app di Azure.

Se si crea un oggetto ConfigurationAsyncClient e si chiama getConfigurationSetting() sul client, viene restituito un Monooggetto , che indica che la risposta contiene un singolo valore. Tuttavia, la chiamata a questo metodo da sola non esegue alcuna operazione. Il client non ha ancora effettuato una richiesta al servizio Configurazione app di Azure. In questa fase, l'oggetto Mono<ConfigurationSetting> restituito da questa API è semplicemente un "assembly" della pipeline di elaborazione dati. Ciò significa che la configurazione necessaria per l'utilizzo dei dati è completa. Per attivare effettivamente il trasferimento dei dati, cioè per effettuare la richiesta al servizio e ottenere la risposta, è necessario abbonarsi all'oggetto restituito Mono. Quindi, quando si gestiscono questi flussi reattivi, è necessario ricordare di chiamare subscribe() perché nulla accade fino a quando non si esegue questa operazione.

Nell'esempio seguente viene illustrato come iscriversi a Mono e stampare il valore di configurazione sul terminale.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Si noti che dopo aver chiamato getConfigurationSetting() sul client, il codice di esempio sottoscrive il risultato e fornisce tre espressioni lambda separate. La prima funzione lambda consuma dati ricevuti dal servizio, che viene attivata al ricevimento di una risposta positiva. La seconda espressione lambda viene attivata se si verifica un errore durante il recupero della configurazione. La terza espressione lambda viene richiamata al termine del flusso di dati, ovvero non sono previsti altri elementi dati da questo flusso.

Annotazioni

Come per tutta la programmazione asincrona, dopo la creazione della sottoscrizione, l'esecuzione procede come di consueto. Se non c'è nulla affinché il programma rimanga attivo ed in esecuzione, potrebbe terminare prima del completamento dell'operazione asincrona. Il thread principale chiamato subscribe() non attenderà finché non si effettua la chiamata di rete a Configurazione app di Azure e si riceve una risposta. Nei sistemi di produzione, è possibile continuare a elaborare qualcos'altro, ma in questo esempio è possibile aggiungere un piccolo ritardo chiamando Thread.sleep() o usando un CountDownLatch per consentire il completamento dell'operazione asincrona.

Come illustrato nell'esempio seguente, le API che restituiscono un Flux oggetto seguono anche un modello simile. La differenza è che il primo callback fornito al subscribe() metodo viene chiamato più volte per ogni elemento dati nella risposta. L'errore o i callback di completamento vengono chiamati esattamente una volta e vengono considerati come segnali del terminale. Nessun altro callback viene richiamato se uno di questi segnali viene ricevuto dall'editore.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Contropressione

Cosa accade quando l'origine produce i dati a una velocità superiore a quella che il sottoscrittore può gestire? L'utente può essere sopraffatto dai dati, che possono causare errori di memoria insufficiente. Il sottoscrittore deve essere in grado di comunicare con l'editore per rallentare quando non riesce a tenere il passo. Per impostazione predefinita, quando si chiama subscribe() su un oggetto Flux come illustrato nell'esempio precedente, il sottoscrittore richiede un flusso non associato di dati, che indica al server di pubblicazione di inviare i dati il più rapidamente possibile. Questo comportamento non è sempre auspicabile e il sottoscrittore potrebbe dover controllare il ritmo di pubblicazione tramite "backpressure". La backpressure consente all'abbonato di controllare il flusso degli elementi di dati. Un sottoscrittore richiederà un numero limitato di elementi dati che possono gestire. Dopo che il sottoscrittore ha completato l'elaborazione di questi elementi, il sottoscrittore può richiedere altro. Usando la backpressure, è possibile trasformare un modello push per il trasferimento dei dati in un modello push-pull.

L'esempio seguente illustra come controllare la frequenza con cui gli eventi vengono ricevuti dal consumer di Hub eventi:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Quando il sottoscrittore si connette per la prima volta al pubblicatore, il pubblicatore fornisce un'istanza Subscription, che gestisce lo stato del trasferimento dei dati. Si tratta Subscription del supporto attraverso il quale il sottoscrittore può applicare la pressione rovesciata chiamando request() per specificare il numero di elementi dati che può gestire.

Se il sottoscrittore richiede più di un elemento dati ogni volta che chiama onNext(), request(10) ad esempio, il server di pubblicazione invierà immediatamente i 10 elementi successivi se sono disponibili o quando diventano disponibili. Questi elementi si accumulano in un buffer dalla parte del sottoscrittore e, poiché ogni onNext() chiamata richiederà altri 10, il backlog continua a crescere fino a quando l'emittente non ha più elementi di dati da inviare, oppure il buffer del sottoscrittore si riempie e causa errori di memoria insufficiente.

Annullare una sottoscrizione

Una sottoscrizione gestisce lo stato del trasferimento dei dati tra un server di pubblicazione e un sottoscrittore. La sottoscrizione è attiva fino a quando il server di pubblicazione non ha completato il trasferimento di tutti i dati al sottoscrittore o il sottoscrittore non è più interessato a ricevere dati. Esistono due modi per annullare una sottoscrizione, come illustrato di seguito.

Nell'esempio seguente viene annullata la sottoscrizione eliminando il sottoscrittore:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

Nell'esempio seguente viene annullata la sottoscrizione chiamando il cancel() metodo in Subscription:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Conclusione

I thread sono risorse costose che non è consigliabile sprecare in attesa di risposte dalle chiamate al servizio remoto. Man mano che l'adozione di architetture di microservizi aumenta, la necessità di ridimensionare e usare le risorse diventa essenziale in modo efficiente. Le API asincrone sono favorevoli quando sono presenti operazioni associate alla rete. Azure SDK per Java offre un set completo di API per le operazioni asincrone per ottimizzare le risorse di sistema. Ti invitiamo vivamente a provare i nostri client asincroni.

Per altre informazioni sugli operatori più adatti alle proprie attività specifiche, vedere Quale operatore è necessario? nella Guida di riferimento di Reactor 3.

Passaggi successivi

Dopo aver compreso meglio i vari concetti di programmazione asincrona, è importante imparare a scorrere i risultati. Per altre informazioni sulle strategie di iterazione migliori e sui dettagli sul funzionamento della paginazione, vedere Paginazione e iterazione in Azure SDK per Java.