Programmation asynchrone dans le Kit de développement logiciel (SDK) Azure pour Java

Cet article décrit le modèle de programmation asynchrone dans le SDK Azure pour Java.

Initialement, le SDK Azure ne contenait que des API asynchrones non bloquantes pour interagir avec les services Azure. Ces API vous permettent d’utiliser le Kit de développement logiciel (SDK) Azure pour créer des applications évolutives qui utilisent les ressources système de manière efficace. Cependant, le SDK Azure pour Java contient également des clients synchrones pour satisfaire un public plus large et permettre à des utilisateurs qui ne maîtrisent pas la programmation asynchrone d’utiliser nos bibliothèques de client. (Voir Approcheable dans les instructions de conception du Kit de développement logiciel (SDK) Azure.) Par conséquent, toutes les bibliothèques clientes Java du Kit de développement logiciel (SDK) Azure pour Java offrent des clients asynchrones et synchrones. Toutefois, nous vous recommandons d’utiliser les clients asynchrones pour les systèmes de production afin d’optimiser l’utilisation des ressources système.

Flux réactifs

Si vous examinez la section Clients de service asynchrones des règles de conception du SDK Azure Java, vous remarquez que, au lieu d’utiliser le type CompletableFuture fourni par Java 8, nos API asynchrones utilisent des types réactifs. Pourquoi avons-nous choisi des types réactifs plutôt que des types qui sont disponibles en mode natif dans JDK ?

Java 8 a introduit des fonctionnalités telles que les flux, les expressions lambda et CompletableFuture. Ces fonctionnalités offrent de nombreuses possibilités, mais présentent des limitations.

CompletableFuture fournit des fonctionnalités non bloquantes basées sur le rappel, et l’interface CompletionStage facilite la constitution d’une série d’opérations asynchrones. Les expressions lambda rendent ces API basées sur des transmission de type push plus lisibles. Les flux fournissent des opérations de style fonctionnel pour gérer une collection d’éléments de données. Toutefois, les flux sont synchrones et ne peuvent pas être réutilisés. CompletableFuture vous permet d’effectuer une requête unique, prend en charge le rappel et attend une réponse unique. De nombreux services cloud exigent cependant la possibilité de diffuser des données, à l’instar de Event Hubs.

Les flux réactifs peuvent aider à surmonter ces limitations en diffusant des éléments d’une source vers un abonné. Lorsqu’un abonné demande des données à partir d’une source, la source envoie un nombre quelconque de résultats. Elle n’a pas besoin de les envoyer tous en même temps. Le transfert se produit sur une certaine durée, chaque fois que la source a des données à envoyer.

Dans ce modèle, l’abonné inscrit des gestionnaires d’événements pour traiter les données lorsqu’elles arrivent. Ces interactions basées sur des notifications push informent l’abonné à l’aide de signaux distincts :

  • Un appel onSubscribe() indique que le transfert de données va commencer.
  • Un appel onError() indique qu’une erreur s’est produite, et marque également la fin du transfert de données.
  • Un appel onComplete() indique que le transfert de données a réussi.

Contrairement aux flux Java, les flux réactifs traitent les erreurs comme des événements de première classe. Ils disposent d’un canal dédié permettant à la source de communiquer les erreurs à l’abonné. Ils permettent également à l’abonné de négocier la vitesse de transfert de données pour transformer ces flux en modèle push-pull.

La spécification Flux réactifs fournit une norme indiquant la façon dont le transfert de données doit se produire. À un niveau élevé, la spécification définit les quatre interfaces suivantes et définit des règles concernant leur implémentation.

  • Serveur de publication est la source d’un flux de données.
  • Abonné est le consommateur d’un flux de données.
  • Abonnement gère l’état du transfert de données entre un serveur de publication et un abonné.
  • Processeur désigne à la fois un serveur de publication et un abonné.

Certaines bibliothèques Java connues fournissent des implémentations de cette spécification, notamment RxJava, Akka Streams, Vert.x et Project Reactor.

Le Kit de développement logiciel (SDK) Azure pour Java a adopté Project Reactor pour proposer ses API asynchrones. La raison principale derrière cette décision était la volonté de fournir une intégration fluide à Spring Webflux, qui utilise également Project Reactor. Une autre raison ayant motivé le choix de Project Reactor plutôt que RxJava était que Project Reactor utilisait Java 8 tandis que RxJava, à ce moment-là, utilisait encore Java 7. Project Reactor propose également un ensemble complet d’opérateurs composables qui vous permettent d’écrire du code déclaratif pour créer des pipelines de traitement de données. Autre aspect intéressant, Project Reactor contient des adaptateurs pour convertir des types Project Reactor en d’autres types d’implémentation courants.

Comparer des API d’opérations synchrones et asynchrones

Nous avons abordé les clients synchrones et les options pour les clients asynchrones. Le tableau ci-dessous résume les API qui sont conçues à l’aide de ces options :

Type d’API Aucune valeur Valeur unique Valeurs multiples
API synchrones Java standard void T Iterable<T>
API asynchrones Java standard CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Interfaces de flux réactifs Publisher<Void> Publisher<T> Publisher<T>
Implémentation Project Reactor de flux réactifs Mono<Void> Mono<T> Flux<T>

Par souci d’exhaustivité, il est intéressant de mentionner que Java 9 a introduit la classe Flux, qui comprend les quatre interfaces de flux réactifs. Toutefois, cette classe n’inclut aucune implémentation.

Utiliser des API asynchrones dans le Kit de développement logiciel (SDK) Azure pour Java

La spécification de flux réactifs ne fait pas la différence entre les types de serveurs de publication. Dans la spécification Flux réactifs, les serveurs de publication produisent simplement « zéro ou plus » éléments de données. Dans de nombreux cas, il existe une distinction utile entre un serveur de publication produisant au plus un élément de données et un autre qui en produit « zéro ou plus ». Dans les API basées sur le cloud, cette distinction indique si une requête renvoie une réponse à valeur unique ou une collection. Project Reactor fournit deux types pour faire cette distinction : Mono et Flux. Une API qui renvoie Mono contient une réponse ayant au plus une seule valeur, et une API qui renvoie Flux contient une réponse ayant « zéro ou plus » valeurs.

Par exemple, supposons que vous utilisiez un client ConfigurationAsyncClient pour récupérer une configuration stockée à l’aide du service Azure App Configuration. (Pour plus d’informations, consultez Qu’est-ce qu’Azure App Configuration ?.)

Si vous créez un ConfigurationAsyncClient et appelez getConfigurationSetting() sur le client, la méthode renvoie Mono, ce qui indique que la réponse contient une valeur unique. Toutefois, l’appel de cette méthode seule n’a aucun effet. Le client n’a pas encore effectué de requête auprès du service Azure App Configuration. À ce stade, le paramètre Mono<ConfigurationSetting> renvoyé par cette API est simplement un « assembly » du pipeline de traitement des données. Cela signifie que la configuration requise pour consommer les données est terminée. Pour déclencher le transfert de données (autrement dit, pour envoyer la requête au service et obtenir la réponse), vous devez vous abonner au type Mono renvoyé. Ainsi, lorsque vous traitez ces flux réactifs, vous devez penser à appeler subscribe(), car rien ne se produit tant que vous ne le faites pas.

L’exemple suivant montre comment s’abonner au type Mono et imprimer la valeur de configuration dans la console.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Notez qu’après avoir appelé getConfigurationSetting() sur le client, l’exemple de code s’abonne au résultat et fournit trois expressions lambda distinctes. La première expression lambda utilise des données reçues du service, qui sont déclenchées en cas de réponse correcte. La deuxième expression lambda est déclenchée en cas d’erreur lors de la récupération de la configuration. La troisième expression lambda est appelée lorsque le flux de données est terminé, ce qui signifie qu’aucun autre élément de données n’est attendu de ce flux.

Remarque

Comme pour toutes les programmations asynchrones, une fois l’abonnement créé, l’exécution se poursuit normalement. S’il n’y a rien à faire pour que le programme reste actif et en cours d’exécution, il peut se terminer avant la fin de l’opération asynchrone. Le thread principal qui a appelé subscribe() n’attend pas que vous ayez effectué l’appel réseau vers Azure App Configuration et reçu une réponse. Dans les systèmes de production, vous pouvez continuer à traiter d’autres opérations, mais dans cet exemple, vous pouvez ajouter un bref délai en appelant Thread.sleep() ou utiliser un paramètre CountDownLatch pour permettre à l’opération asynchrone de se terminer.

Comme indiqué dans l’exemple suivant, les API qui renvoient Flux suivent également un modèle similaire. La différence est que le premier rappel fourni à la méthode subscribe() est appelé plusieurs fois pour chaque élément de données dans la réponse. L’erreur ou les rappels d’achèvement sont appelés exactement une fois et sont considérés comme des signaux terminaux. Aucun autre rappel n’est appelé si l’un de ces signaux est reçu de la part du serveur de publication.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Régulation

Que se passe-t-il si la source produit les données à un rythme plus rapide que ce que l’abonné peut traiter ? L’abonné peut être submergé de données, ce qui peut entraîner des erreurs de mémoire insuffisante. L’abonné doit avoir un moyen de contacter le serveur de publication pour ralentir l’opération lorsqu’il ne peut pas suivre. Par défaut, lorsque vous appelez subscribe() sur un Flux comme indiqué dans l’exemple ci-dessus, l’abonné demande un flux de données illimité, indiquant au serveur de publication d’envoyer les données le plus rapidement possible. Ce comportement n’est pas toujours souhaitable et l’abonné peut avoir à contrôler la vitesse de publication par « régulation ». La régulation permet à l’abonné de prendre le contrôle du flux des éléments de données. Un abonné demande un nombre limité d’éléments de données qu’il peut gérer. Une fois que l’abonné a terminé le traitement de ces éléments, il peut en demander plus. Avec la régulation, vous pouvez transformer un modèle push de transfert de données en modèle push-pull.

L’exemple suivant montre la manière dont vous pouvez contrôler le rythme de réception des événements par le consommateur Event Hubs :

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Lorsque l’abonné se connecte pour la première fois au serveur de publication, le serveur de publication transmet à l’abonné une instance de Subscription, qui gère l’état du transfert de données. Ce paramètre Subscription est le moyen par lequel l’abonné peut appliquer une régulation en appelant request() pour spécifier le nombre d’éléments de données supplémentaires qu’il peut gérer.

Si l’abonné demande plusieurs éléments de données chaque fois qu’il appelle onNext(), request(10) par exemple, le serveur de publication envoie les 10 éléments suivants immédiatement s’ils sont disponibles ou dès qu’ils le sont. Ces éléments s’accumulent dans une mémoire tampon du côté de l’abonné. Étant donné que chaque appel onNext() demande 10 éléments supplémentaires, le backlog continue de croître jusqu’à ce que le serveur de publication n’ait plus d’éléments de données à envoyer ou que la mémoire tampon de l’abonné soit saturée, entraînant des erreurs de mémoire insuffisante.

Annuler un abonnement

Un abonnement gère l’état du transfert de données entre un serveur de publication et un abonné. L’abonnement est actif jusqu’à ce que le serveur de publication termine le transfert de toutes les données vers l’abonné ou que l’abonné ne souhaite plus recevoir de données. Il existe deux façons d’annuler un abonnement, comme indiqué ci-dessous.

L’exemple suivant annule l’abonnement en supprimant l’abonné :

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

L’exemple suivant annule l’abonnement en appelant la méthode cancel() sur Subscription :

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Conclusion

Les threads sont des ressources onéreuses qu’il serait dommage de gaspiller en attendant les réponses des appels de service distant. À mesure que l’adoption d’architectures de microservices augmente, la nécessité de mettre à l’échelle et d’utiliser efficacement les ressources devient vitale. Les API asynchrones sont favorables lorsqu’il existe des opérations liées au réseau. Le Kit de développement logiciel (SDK) Azure pour Java offre un ensemble complet d’API pour les opérations asynchrones afin d’optimiser vos ressources système. Nous vous encourageons vivement à essayer nos clients asynchrones.

Pour plus d’informations sur les opérateurs les plus adaptés à vos tâches, consultez Quel opérateur choisir ? dans le Guide de référence Reactor 3.

Étapes suivantes

Maintenant que vous connaissez mieux les différents concepts de la programmation asynchrone, il est important d’apprendre à itérer sur les résultats. Pour plus d’informations sur les meilleures stratégies d’itération et sur le fonctionnement de la pagination, consultez Pagination et itération dans le Kit de développement logiciel (SDK) Azure pour Java.