Partager via


Programmation asynchrone dans le Kit de développement logiciel (SDK) Azure pour Java

Cet article décrit le modèle de programmation asynchrone dans le Kit de développement logiciel (SDK) Azure pour Java.

Le Kit de développement logiciel (SDK) Azure contenait initialement uniquement des API asynchrones non bloquantes pour interagir avec les services Azure. Ces API vous permettent d’utiliser le Kit de développement logiciel (SDK) Azure pour créer des applications évolutives qui utilisent efficacement les ressources système. Toutefois, le Kit de développement logiciel (SDK) Azure pour Java contient également des clients synchrones pour répondre à un public plus large et rendre nos bibliothèques clientes accessibles aux utilisateurs qui ne connaissent pas la programmation asynchrone. (Consultez la section Accessible dans les directives de conception du SDK Azure.) Ainsi, toutes les bibliothèques clientes Java du SDK Azure pour Java offrent des clients asynchrones et synchrones. Toutefois, nous vous recommandons d’utiliser les clients asynchrones pour les systèmes de production afin d’optimiser l’utilisation des ressources système.

Flux réactifs

Si vous examinez la section Clients de service asynchrones dans les instructions de conception du Kit de développement logiciel (SDK) Azure Java, vous remarquerez que, au lieu d’utiliser CompletableFuture java 8, nos API asynchrones utilisent des types réactifs. Pourquoi avons-nous choisi des types réactifs sur des types qui sont disponibles en mode natif dans JDK ?

Java 8 a introduit des fonctionnalités telles que Streams, Lambdas et CompletableFuture. Ces fonctionnalités offrent de nombreuses fonctionnalités, mais présentent certaines limitations.

CompletableFuture fournit des fonctionnalités de rappel non bloquantes, et l’interface CompletionStage permet une composition facile d’une série d’opérations asynchrones. Les lambda rendent ces API basées sur push plus lisibles. Les flux fournissent des opérations de style fonctionnel pour gérer une collection d’éléments de données. Toutefois, les flux sont synchrones et ne peuvent pas être réutilisés. CompletableFuture vous permet d’effectuer une requête unique, fournit la prise en charge d’un rappel et attend une seule réponse. Toutefois, de nombreux services cloud nécessitent la possibilité de diffuser en continu des données ( Event Hubs par exemple).

Les flux réactifs peuvent aider à surmonter ces limitations en continuant des éléments d’une source vers un abonné. Lorsqu’un abonné demande des données à partir d’une source, la source renvoie un nombre quelconque de résultats. Il n’est pas nécessaire de les envoyer en même temps. Le transfert se produit sur une période donnée, chaque fois que la source a des données à envoyer.

Dans ce modèle, l’abonné enregistre des gestionnaires d’événements pour traiter les données lorsqu'elles arrivent. Ces interactions push informent l’abonné par le biais de signaux distincts :

  • Un onSubscribe() appel indique que le transfert de données est sur le point de commencer.
  • Un onError() appel indique qu’une erreur s’est produite, qui marque également la fin du transfert de données.
  • Un onComplete() appel indique la réussite du transfert de données.

Contrairement aux flux Java, les flux réactifs traitent les erreurs comme des événements de première classe. Les flux réactifs ont un canal dédié pour que la source communique toutes les erreurs à l’abonné. En outre, les flux réactifs permettent à l’abonné de négocier le taux de transfert de données pour transformer ces flux en modèle push-pull.

La spécification De flux réactifs fournit une norme pour la façon dont le transfert de données doit se produire. À un niveau élevé, la spécification définit les quatre interfaces suivantes et spécifie des règles sur la façon dont ces interfaces doivent être implémentées.

  • Publisher est la source d’un flux de données.
  • L’abonné est le consommateur d’un flux de données.
  • L’abonnement gère l’état du transfert de données entre un éditeur et un abonné.
  • Le processeur est à la fois un éditeur et un abonné.

Certaines bibliothèques Java connues fournissent des implémentations de cette spécification, telles que RxJava, Akka Streams, Vert.x et Project Reactor.

Le Kit de développement logiciel (SDK) Azure pour Java a adopté Project Reactor pour offrir ses API asynchrones. Le principal facteur qui a conduit à cette décision était de fournir une intégration fluide à Spring Webflux, qui utilise également Project Reactor. Un autre facteur contribuant à choisir Project Reactor sur RxJava était que Project Reactor utilise Java 8, mais RxJava, à l’époque, était toujours à Java 7. Project Reactor offre également un ensemble complet d’opérateurs composables et vous permet d’écrire du code déclaratif pour la création de pipelines de traitement des données. Une autre chose intéressante sur Project Reactor est qu’il a des adaptateurs pour convertir les types Project Reactor en d’autres types d’implémentation populaires.

Comparaison des API des opérations synchrones et asynchrones

Nous avons discuté des clients synchrones et des options pour les clients asynchrones. Le tableau ci-dessous récapitule à quoi ressemblent les API qui sont conçues à l’aide de ces options :

Type d’API Aucune valeur Valeur unique Valeurs multiples
Java Standard – API synchrones void T Iterable<T>
API asynchrones Java standard CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Interfaces de flux réactifs Publisher<Void> Publisher<T> Publisher<T>
Implémentation Project Reactor de flux réactifs Mono<Void> Mono<T> Flux<T>

Par souci d’exhaustivité, il est important de mentionner que Java 9 a introduit la classe Flow qui inclut les quatre interfaces de flux réactifs. Toutefois, cette classe n’inclut aucune implémentation.

Utiliser des API asynchrones dans le Kit de développement logiciel (SDK) Azure pour Java

La spécification des flux réactifs ne différencie pas les types d’éditeurs. Dans la spécification des flux réactifs, les éditeurs produisent simplement zéro ou plusieurs éléments de données. Dans de nombreux cas, il existe une distinction utile entre un éditeur produisant au plus un élément de données par rapport à un élément de données qui produit zéro ou plus. Dans les API basées sur le cloud, cette distinction indique si une requête retourne une réponse à valeur unique ou une collection. Project Reactor fournit deux types pour faire cette distinction - Mono et Flux. Une API qui retourne un Mono contiendra une réponse qui a au maximum une valeur, et une API qui retourne un Flux contiendra une réponse qui a zéro ou plusieurs valeurs.

Par exemple, supposons que vous utilisez un ConfigurationAsyncClient pour récupérer une configuration stockée à l’aide du service Azure App Configuration. (Pour plus d’informations, consultez Qu’est-ce qu’Azure App Configuration ?))

Si vous créez un ConfigurationAsyncClient et appelez getConfigurationSetting() sur le client, cela retourne un Mono, ce qui indique que la réponse contient une valeur unique. Toutefois, l’appel de cette méthode seul ne fait rien. Le client n’a pas encore effectué de demande au service Azure App Configuration. À ce stade, le résultat renvoyé par cette API n’est qu’un « assemblage » du pipeline de traitement des données. Cela signifie que la configuration requise pour consommer les données est terminée. Pour déclencher le transfert de données (autrement dit, pour envoyer la requête au service et obtenir la réponse), vous devez vous abonner au type Mono renvoyé. Ainsi, lorsque vous traitez de ces flux réactifs, vous devez vous rappeler d’appeler subscribe() parce que rien ne se passe jusqu’à ce que vous le fassiez.

L’exemple suivant montre comment s’abonner à Mono et imprimer la valeur de configuration sur la console.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Notez qu’après avoir appelé getConfigurationSetting() sur le client, l’exemple de code s’abonne au résultat et fournit trois expressions lambda distinctes. Le premier lambda consomme les données reçues du service, qui sont déclenchées lors d’une réponse réussie. La deuxième lambda est déclenchée en cas d’erreur lors de la récupération de la configuration. Le troisième lambda est appelé lorsque le flux de données est terminé, ce qui signifie qu’aucun autre élément de données n’est attendu à partir de ce flux.

Remarque

Comme avec toute la programmation asynchrone, une fois l’abonnement créé, l’exécution se poursuit comme d’habitude. S'il n'y a rien pour maintenir le programme actif et en cours d'exécution, il pourrait se terminer avant que l'opération asynchrone ne soit achevée. Le thread principal qui a appelé subscribe() n’attend pas que vous ayez effectué l’appel réseau vers Azure App Configuration et reçu une réponse. Dans les systèmes de production, vous pouvez continuer à traiter un autre élément, mais dans cet exemple, vous pouvez ajouter un petit délai en appelant Thread.sleep() ou en utilisant un CountDownLatch pour donner à l’opération asynchrone une chance de se terminer.

Comme illustré dans l'exemple suivant, les API qui retournent un Flux suivent un modèle similaire. La différence est que le premier rappel fourni à la subscribe() méthode est appelé plusieurs fois pour chaque élément de données dans la réponse. L’erreur ou les rappels d’achèvement sont appelés exactement une fois et sont considérés comme des signaux terminaux. Aucun autre rappel n’est appelé si l’un de ces signaux est reçu de la part du serveur de publication.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Contre-pression

Que se passe-t-il lorsque la source produit les données à un rythme plus rapide que l’abonné peut gérer ? L’abonné peut être submergé de données, ce qui peut entraîner des erreurs de mémoire insuffisante. L’abonné doit avoir un moyen de contacter le serveur de publication pour ralentir l’opération lorsqu’il ne peut pas suivre. Par défaut, lorsque vous appelez subscribe() sur un Flux, comme indiqué dans l'exemple ci-dessus, l'abonné demande un flux de données illimité, indiquant à l'éditeur d'envoyer les données le plus rapidement possible. Ce comportement n’est pas toujours souhaitable et l’abonné peut avoir à contrôler le taux de publication par le biais de « backpressure ». La rétropression permet à l’abonné de prendre le contrôle du flux d’éléments de données. Un abonné demande un nombre limité d’éléments de données qu’il peut gérer. Une fois que l’abonné a terminé le traitement de ces éléments, l’abonné peut demander plus d’informations. En utilisant la rétropression, vous pouvez transformer un modèle push pour le transfert de données en modèle push-pull.

L’exemple suivant montre comment contrôler la fréquence à laquelle les événements sont reçus par le consommateur Event Hubs :

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Lorsque l’abonné se connecte pour la première fois au serveur de publication, l’éditeur transfère à l’abonné une Subscription instance qui gère l’état du transfert de données. Ce paramètre Subscription est le moyen par lequel l’abonné peut appliquer une régulation en appelant request() pour spécifier le nombre d’éléments de données supplémentaires qu’il peut gérer.

Si l’abonné demande plusieurs éléments de données chaque fois qu’il appelle onNext(), request(10) par exemple, l’éditeur envoie immédiatement les 10 éléments suivants s’ils sont disponibles ou lorsqu’ils sont disponibles. Ces éléments s’accumulent dans une mémoire tampon du côté de l’abonné, et étant donné que chaque onNext() appel demande 10 de plus, le backlog continue de croître jusqu’à ce que l’éditeur n’ait plus d’éléments de données à envoyer, ou que la mémoire tampon de l’abonné déborde, entraînant ainsi des erreurs de mémoire insuffisante.

Annuler un abonnement

Un abonnement gère l’état du transfert de données entre un éditeur et un abonné. L’abonnement est actif jusqu’à ce que l’éditeur ait terminé de transférer toutes les données à l’abonné ou que l’abonné n’est plus intéressé par la réception de données. Vous pouvez annuler un abonnement de deux façons, comme indiqué ci-dessous.

L’exemple suivant annule l’abonnement en supprimant l’abonné :

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

L’exemple suivant annule l’abonnement en appelant la cancel() méthode sur Subscription:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Conclusion

Les threads sont des ressources coûteuses que vous ne devez pas gaspiller en attendant les réponses des appels de service distant. À mesure que l’adoption des architectures de microservices augmente, la nécessité de mettre à l’échelle et d’utiliser efficacement les ressources devient essentielle. Les API asynchrones sont favorables lorsqu’il existe des opérations liées au réseau. Le Kit de développement logiciel (SDK) Azure pour Java offre un ensemble complet d’API pour les opérations asynchrones afin d’optimiser vos ressources système. Nous vous encourageons vivement à essayer nos clients asynchrones.

Pour plus d’informations sur les opérateurs qui répondent le mieux à vos tâches particulières, consultez Quel opérateur ai-je besoin ? dans le Guide de référence du réacteur 3.

Étapes suivantes

Maintenant que vous comprenez mieux les différents concepts de programmation asynchrone, il est important d’apprendre à effectuer une itération sur les résultats. Pour plus d’informations sur les meilleures stratégies d’itération et sur le fonctionnement de la pagination, consultez Pagination et itération dans le Kit de développement logiciel (SDK) Azure pour Java.