Partager via


Abonnement aux événements

Conseil / Astuce

Ce contenu est un extrait du livre électronique 'Architecture des microservices .NET pour les applications .NET conteneurisées', disponible sur .NET Docs ou en tant que PDF téléchargeable gratuitement, lisible hors ligne.

Architecture de microservices .NET pour les applications .NET conteneurisées - vignette de couverture du livre électronique.

La première étape de l’utilisation du bus d’événements consiste à abonner les microservices aux événements qu’ils souhaitent recevoir. Cette fonctionnalité doit être effectuée dans les microservices récepteurs.

Le code simple suivant montre ce que chaque microservice récepteur doit implémenter lors du démarrage du service (autrement dit, dans la Startup classe) afin qu’il s’abonne aux événements dont il a besoin. Dans ce cas, le microservice basket-api doit s’abonner à ProductPriceChangedIntegrationEvent et aux messages OrderStartedIntegrationEvent.

Par exemple, lors de l’abonnement à l’événement ProductPriceChangedIntegrationEvent , qui rend le microservice panier conscient des modifications apportées au prix du produit et lui permet d’avertir l’utilisateur de la modification si ce produit se trouve dans le panier de l’utilisateur.

var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

eventBus.Subscribe<ProductPriceChangedIntegrationEvent,
                   ProductPriceChangedIntegrationEventHandler>();

eventBus.Subscribe<OrderStartedIntegrationEvent,
                   OrderStartedIntegrationEventHandler>();

Une fois ce code exécuté, le microservice de l’abonné écoute par le biais de canaux RabbitMQ. Quand un message de type ProductPriceChangedIntegrationEvent arrive, le code appelle le gestionnaire d’événements qui lui est transmis et traite l’événement.

Publication d’événements via le bus d’événements

Enfin, l’expéditeur du message (microservice d’origine) publie les événements d’intégration avec du code similaire à l’exemple suivant. (Cette approche est un exemple simplifié qui ne prend pas en compte l’atomicité.) Vous implémentez du code similaire chaque fois qu’un événement doit être propagé sur plusieurs microservices, généralement juste après avoir commité des données ou des transactions à partir du microservice d’origine.

Tout d’abord, l’objet d’implémentation de bus d’événements (basé sur RabbitMQ ou basé sur un bus de service) est injecté au constructeur du contrôleur, comme dans le code suivant :

[Route("api/v1/[controller]")]
public class CatalogController : ControllerBase
{
    private readonly CatalogContext _context;
    private readonly IOptionsSnapshot<Settings> _settings;
    private readonly IEventBus _eventBus;

    public CatalogController(CatalogContext context,
        IOptionsSnapshot<Settings> settings,
        IEventBus eventBus)
    {
        _context = context;
        _settings = settings;
        _eventBus = eventBus;
    }
    // ...
}

Ensuite, vous l’utilisez à partir des méthodes de votre contrôleur, comme dans la méthode UpdateProduct :

[Route("items")]
[HttpPost]
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem product)
{
    var item = await _context.CatalogItems.SingleOrDefaultAsync(
        i => i.Id == product.Id);
    // ...
    if (item.Price != product.Price)
    {
        var oldPrice = item.Price;
        item.Price = product.Price;
        _context.CatalogItems.Update(item);
        var @event = new ProductPriceChangedIntegrationEvent(item.Id,
            item.Price,
            oldPrice);
        // Commit changes in original transaction
        await _context.SaveChangesAsync();
        // Publish integration event to the event bus
        // (RabbitMQ or a service bus underneath)
        _eventBus.Publish(@event);
        // ...
    }
    // ...
}

Dans ce cas, étant donné que le microservice d’origine est un microservice CRUD simple, ce code est placé directement dans un contrôleur d’API web.

Dans des microservices plus avancés, comme lors de l’utilisation d’approches CQRS, il peut être implémenté dans la CommandHandler classe, dans la Handle() méthode.

Conception de l’atomicité et de la résilience lors de la publication d’événements dans le bus d’événements

Lorsque vous publiez des événements d’intégration via un système de messagerie distribué comme votre bus d’événements, vous avez le problème de mettre à jour atomiquement la base de données d’origine et de publier un événement (autrement dit, les deux opérations se terminent ou aucune d’entre elles). Par exemple, dans l’exemple simplifié présenté précédemment, le code valide les données dans la base de données lorsque le prix du produit est modifié, puis publie un message ProductPriceChangedIntegrationEvent. Au départ, il peut sembler essentiel que ces deux opérations soient effectuées atomiquement. Toutefois, si vous utilisez une transaction distribuée impliquant la base de données et le répartiteur de messages, comme vous le faites dans des systèmes plus anciens comme Microsoft Message Queuing (MSMQ), cette approche n’est pas recommandée pour les raisons décrites par le théorème CAP.

En fait, vous utilisez des microservices pour créer des systèmes évolutifs et hautement disponibles. Simplifier un peu, le théorème CAP indique que vous ne pouvez pas créer une base de données (distribuée) (ou un microservice propriétaire de son modèle) qui est continuellement disponible, fortement cohérente et tolérante à n’importe quelle partition. Vous devez choisir deux de ces trois propriétés.

Dans les architectures basées sur des microservices, vous devez privilégier la disponibilité et la tolérance, et vous devez diminuer l'importance de la cohérence forte. Par conséquent, dans la plupart des applications basées sur des microservices modernes, vous ne souhaitez généralement pas utiliser de transactions distribuées dans la messagerie, comme vous le faites lorsque vous implémentez des transactions distribuées basées sur le coordinateur de transactions distribuées Windows (DTC) avec MSMQ.

Revenons au problème initial et à son exemple. Si le service se bloque après la mise à jour de la base de données (dans ce cas, juste après la ligne de code avec _context.SaveChangesAsync()), mais avant la publication de l’événement d’intégration, le système global peut devenir incohérent. Cette approche peut être critique pour l’entreprise, en fonction de l’opération métier spécifique avec laquelle vous traitez.

Comme mentionné précédemment dans la section architecture, vous pouvez avoir plusieurs approches pour traiter ce problème :

  • Utilisation du modèle complet Event Sourcing.

  • Utilisation de l'analyse des journaux de transactions.

  • Utilisation du modèle Outbox. Il s’agit d’une table transactionnelle pour stocker les événements d’intégration (extension de la transaction locale).

Pour ce scénario, l’utilisation du modèle ES (Event Sourcing) complet est l’une des meilleures approches, si ce n’est pas le meilleur. Toutefois, dans de nombreux scénarios d’application, il se peut que vous ne puissiez pas implémenter un système ES complet. ES signifie stocker uniquement les événements de domaine dans votre base de données transactionnelle, au lieu de stocker les données d’état actuelles. Le stockage d’événements de domaine uniquement peut avoir d’excellents avantages, tels que l’historique de votre système disponible et la possibilité de déterminer l’état de votre système à tout moment dans le passé. Toutefois, l’implémentation d’un système ES complet vous oblige à réarchitecturer la plupart de votre système et introduit de nombreuses autres complexités et exigences. Par exemple, vous souhaitez utiliser une base de données spécifiquement conçue pour l’approvisionnement d’événements, comme Event Store ou une base de données orientée document comme Azure Cosmos DB, MongoDB, Cassandra, CouchDB ou RavenDB. ES est une excellente approche pour ce problème, mais pas la solution la plus simple, sauf si vous êtes déjà familiarisé avec l’approvisionnement en événements.

L’option permettant d’utiliser l’exploration de journaux de transactions semble initialement transparente. Toutefois, pour utiliser cette approche, le microservice doit être couplé à votre journal des transactions SGBDR, tel que le journal des transactions SQL Server. Cette approche n’est probablement pas souhaitable. Un autre inconvénient est que les mises à jour de bas niveau enregistrées dans le journal des transactions peuvent ne pas être au même niveau que vos événements d’intégration de haut niveau. Dans ce cas, le processus d’ingénierie inverse de ces opérations de journal des transactions peut être difficile.

Une approche équilibrée est une combinaison d’une table de base de données transactionnelle et d’un modèle ES simplifié. Vous pouvez utiliser un état tel que « prêt à publier l’événement », que vous définissez dans l’événement d’origine lorsque vous le validez dans la table des événements d’intégration. Ensuite, vous essayez de publier l’événement dans le bus d’événements. Si l’action publish-event réussit, vous démarrez une autre transaction dans le service d’origine et déplacez l’état de « prêt à publier l’événement » vers « événement déjà publié ».

Si l’action de publication-événement dans le bus d’événements échoue, les données ne sont toujours pas incohérentes dans le microservice d’origine , elle est toujours marquée comme « prête à publier l’événement » et, en ce qui concerne le reste des services, elle sera finalement cohérente. Vous pouvez toujours avoir des travaux en arrière-plan qui vérifient l’état des transactions ou des événements d’intégration. Si la tâche détecte un événement avec l’état « prêt à publier l’événement », elle peut tenter de republier cet événement dans le bus d’événements.

Notez qu’avec cette approche, vous conservez uniquement les événements d’intégration pour chaque microservice d’origine et seuls les événements que vous souhaitez communiquer avec d’autres microservices ou systèmes externes. En revanche, dans un système ES complet, vous stockez également tous les événements de domaine.

Par conséquent, cette approche équilibrée est un système ES simplifié. Vous avez besoin d’une liste d’événements d’intégration avec leur état actuel (« prêt à publier » et « publié »). Mais vous devez uniquement implémenter ces états pour les événements d’intégration. Dans cette approche, vous n’avez pas besoin de stocker toutes vos données de domaine en tant qu’événements dans la base de données transactionnelle, comme vous le feriez dans un système ES complet.

Si vous utilisez déjà une base de données relationnelle, vous pouvez utiliser une table transactionnelle pour stocker les événements d’intégration. Pour obtenir l’atomicité dans votre application, vous utilisez un processus en deux étapes en fonction des transactions locales. En fait, vous disposez d’une table IntegrationEvent dans la même base de données que celle où vous avez vos entités de domaine. Cette table fonctionne comme une garantie d’obtention de l’atomicité, et vous permet d’inclure les événements d’intégration persistants dans les transactions qui valident vos données de domaine.

Pas à pas, le processus se présente comme suit :

  1. L’application commence une transaction de base de données locale.

  2. Il met ensuite à jour l’état de vos entités de domaine et insère un événement dans la table d’événements d’intégration.

  3. Enfin, il valide la transaction, de sorte que vous obtenez l’atomicité souhaitée, puis

  4. Vous publiez l'événement d'une certaine manière (suivant).

Lorsque vous implémentez les étapes de publication des événements, vous avez les choix suivants :

  • Publiez l’événement d’intégration juste après avoir commité la transaction et utilisez une autre transaction locale pour marquer les événements dans la table comme étant publiés. Ensuite, utilisez la table comme artefact pour suivre les événements d’intégration en cas de problèmes dans les microservices distants et effectuer des actions correctives basées sur les événements d’intégration stockés.

  • Utilisez la table comme type de file d’attente. Un thread d’application ou un processus distinct interroge la table d’événements d’intégration, publie les événements dans le bus d’événements, puis utilise une transaction locale pour marquer les événements comme publiés.

La figure 6-22 montre l’architecture de la première de ces approches.

Diagramme montrant l’atomicité dans le cadre d’une publication sans microservice de worker.

Figure 6-22. Atomicité lors de la publication d’événements dans le bus d’événements

Il manque à l’approche illustrée à la figure 6-22 un microservice de worker supplémentaire chargé de vérifier et de confirmer la publication des événements d’intégration. En cas d’échec, le microservice de worker de vérification supplémentaire peut lire les événements de la table et les republier (autrement dit, répétez l’étape 2).

À propos de la deuxième approche : vous utilisez la table EventLog comme file d’attente et utilisez toujours un microservice worker pour publier les messages. Dans ce cas, le processus est semblable à celui présenté dans la figure 6-23. On y voit un microservice supplémentaire, ainsi que la table qui est la seule source lors de la publication d’événements.

Diagramme montrant l’atomicité dans le cadre d’une publication avec un microservice de worker.

Figure 6-23. Atomicité lors de la publication d’événements dans le bus d’événements avec un microservice de worker

Par souci de simplicité, l’exemple eShopOnContainers utilise la première approche (sans processus supplémentaires ni microservices de vérification) ainsi que le bus d’événements. Toutefois, l’exemple eShopOnContainers ne gère pas tous les cas d’échec possibles. Dans une application réelle déployée sur le cloud, vous devez adopter le fait que les problèmes se produisent finalement, et vous devez implémenter cette logique de vérification et de réexécriture. L’utilisation de la table comme une file d’attente peut se révéler plus efficace que la première approche si cette table est la seule source d’événements quand vous les publiez (avec le worker) à l’aide du bus d’événements.

Implémentation de l’atomicité lors de la publication d’événements d’intégration via le bus d’événements

Le code suivant montre comment créer une transaction unique impliquant plusieurs objets DbContext , un contexte lié aux données d’origine mises à jour et le deuxième contexte lié à la table IntegrationEventLog.

La transaction dans l’exemple de code ci-dessous ne sera pas résiliente si les connexions à la base de données rencontrent un problème au moment de l’exécution du code. Cela peut se produire dans des systèmes cloud comme Azure SQL DB, qui peuvent déplacer des bases de données entre des serveurs. Pour implémenter des transactions résilientes dans plusieurs contextes, consultez la section Implémentation des connexions SQL Entity Framework Core résilientes plus loin dans ce guide.

Pour plus de clarté, l’exemple suivant montre l’ensemble du processus dans un seul élément de code. Toutefois, l’implémentation eShopOnContainers est refactorisée et fractionne cette logique en plusieurs classes afin qu’elle soit plus facile à gérer.

// Update Product from the Catalog microservice
//
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem productToUpdate)
{
  var catalogItem =
       await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id ==
                                                               productToUpdate.Id);
  if (catalogItem == null) return NotFound();

  bool raiseProductPriceChangedEvent = false;
  IntegrationEvent priceChangedEvent = null;

  if (catalogItem.Price != productToUpdate.Price)
          raiseProductPriceChangedEvent = true;

  if (raiseProductPriceChangedEvent) // Create event if price has changed
  {
      var oldPrice = catalogItem.Price;
      priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id,
                                                                  productToUpdate.Price,
                                                                  oldPrice);
  }
  // Update current product
  catalogItem = productToUpdate;

  // Just save the updated product if the Product's Price hasn't changed.
  if (!raiseProductPriceChangedEvent)
  {
      await _catalogContext.SaveChangesAsync();
  }
  else  // Publish to event bus only if product price changed
  {
        // Achieving atomicity between original DB and the IntegrationEventLog
        // with a local transaction
        using (var transaction = _catalogContext.Database.BeginTransaction())
        {
           _catalogContext.CatalogItems.Update(catalogItem);
           await _catalogContext.SaveChangesAsync();

           await _integrationEventLogService.SaveEventAsync(priceChangedEvent);

           transaction.Commit();
        }

      // Publish the integration event through the event bus
      _eventBus.Publish(priceChangedEvent);

      _integrationEventLogService.MarkEventAsPublishedAsync(
                                                priceChangedEvent);
  }

  return Ok();
}

Une fois l’événement d’intégration ProductPriceChangedIntegrationEvent créé, la transaction qui stocke l’opération de domaine d’origine (mettre à jour l’élément de catalogue) inclut également la persistance de l’événement dans la table EventLog. Cela en fait une seule transaction et vous serez toujours en mesure de vérifier si les messages d’événement ont été envoyés.

La table du journal des événements est mise à jour atomiquement avec l’opération de base de données d’origine, à l’aide d’une transaction locale sur la même base de données. Si l’une des opérations échoue, une exception est levée et la transaction restaure toute opération terminée, ce qui permet de maintenir la cohérence entre les opérations de domaine et les messages d’événement enregistrés dans la table.

Réception de messages à partir d’abonnements : gestionnaires d’événements dans les microservices récepteurs

En plus de la logique d’abonnement aux événements, vous devez implémenter le code interne pour les gestionnaires d’événements d’intégration (comme une méthode de rappel). Le gestionnaire d’événements est l’endroit où vous spécifiez où les messages d’événement d’un certain type seront reçus et traités.

Un gestionnaire d’événements reçoit d’abord une instance d’événement à partir du bus d’événements. Ensuite, il localise le composant à traiter en lien avec cet événement d’intégration, en propageant et en persistant l’événement en tant que changement d’état dans le microservice récepteur. Par exemple, si un événement ProductPriceChanged provient du microservice catalogue, il est géré dans le microservice panier et modifie l’état dans ce microservice de panier récepteur, comme indiqué dans le code suivant.

namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.EventHandling
{
    public class ProductPriceChangedIntegrationEventHandler :
        IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
    {
        private readonly IBasketRepository _repository;

        public ProductPriceChangedIntegrationEventHandler(
            IBasketRepository repository)
        {
            _repository = repository;
        }

        public async Task Handle(ProductPriceChangedIntegrationEvent @event)
        {
            var userIds = await _repository.GetUsers();
            foreach (var id in userIds)
            {
                var basket = await _repository.GetBasket(id);
                await UpdatePriceInBasketItems(@event.ProductId, @event.NewPrice, basket);
            }
        }

        private async Task UpdatePriceInBasketItems(int productId, decimal newPrice,
            CustomerBasket basket)
        {
            var itemsToUpdate = basket?.Items?.Where(x => int.Parse(x.ProductId) ==
                productId).ToList();
            if (itemsToUpdate != null)
            {
                foreach (var item in itemsToUpdate)
                {
                    if(item.UnitPrice != newPrice)
                    {
                        var originalPrice = item.UnitPrice;
                        item.UnitPrice = newPrice;
                        item.OldUnitPrice = originalPrice;
                    }
                }
                await _repository.UpdateBasket(basket);
            }
        }
    }
}

Le gestionnaire d’événements doit vérifier si le produit existe dans l’une des instances de panier. Il met également à jour le prix des articles pour chaque article de ligne de panier associé. Enfin, il crée une alerte à afficher à l’utilisateur sur le changement de prix, comme illustré dans la figure 6-24.

Capture d’écran d’un navigateur montrant la notification de modification de prix sur le panier utilisateur.

Figure 6-24. Affichage d’un changement de prix d’article dans un panier, tel que communiqué par les événements liés à l'intégration.

Idempotence des événements de mise à jour des messages

Un aspect important des événements de message de mise à jour est qu’un échec à un moment quelconque de la communication doit provoquer une nouvelle tentative du message. Sinon, une tâche en arrière-plan pourrait tenter de publier un événement qui a déjà été publié, ce qui aurait pour effet de créer une condition de concurrence. Assurez-vous que les mises à jour sont idempotentes ou qu’elles fournissent suffisamment d’informations pour vous assurer que vous pouvez détecter un doublon, l’ignorer et renvoyer une seule réponse.

Comme indiqué précédemment, l’idempotency signifie qu’une opération peut être effectuée plusieurs fois sans modifier le résultat. Dans un environnement de messagerie, comme lors de la communication d’événements, un événement est idempotent s’il peut être remis plusieurs fois sans modifier le résultat pour le microservice récepteur. Cela peut être nécessaire en raison de la nature de l’événement lui-même, ou de la façon dont le système gère l’événement. L'idempotence des messages est importante dans toute application qui utilise la messagerie, pas seulement dans celles qui implémentent le modèle de bus d'événements.

Un exemple d’opération idempotente est une instruction SQL qui insère des données dans une table uniquement si ces données ne se trouvent pas déjà dans la table. Il n’importe pas combien de fois vous exécutez cette instruction d’insertion SQL ; le résultat sera le même : la table contiendra ces données. L’idempotence comme celle-ci peut également être nécessaire lors du traitement des messages si les messages peuvent potentiellement être envoyés et par conséquent traités plusieurs fois. Par exemple, si la logique de réessai amène un expéditeur à envoyer exactement le même message plusieurs fois, vous devez vous assurer qu'il est idempotent.

Il est possible de concevoir des messages idempotents. Par exemple, vous pouvez créer un événement qui indique « définir le prix du produit sur 25 $ » au lieu de « ajouter 5 $ au prix du produit ». Vous pouvez traiter en toute sécurité le premier message n’importe quel nombre de fois et le résultat sera le même. Ce n’est pas vrai pour le deuxième message. Mais même dans le premier cas, vous ne souhaiterez peut-être pas traiter le premier événement, car le système pourrait aussi avoir envoyé un événement de changement de prix plus récent et ainsi remplacer le nouveau prix.

Un autre exemple peut être un événement de commande terminée propagé vers plusieurs abonnés. L'application doit s'assurer que les informations de commande sont mises à jour dans d'autres systèmes une seule fois, même en cas de messages d'événements dupliqués concernant le même événement de commande terminée.

Il est pratique d’avoir un type d’identité par événement afin de pouvoir créer une logique qui applique que chaque événement n’est traité qu’une seule fois par récepteur.

Certains traitements de messages sont intrinsèquement idempotents. Par exemple, si un système génère des miniatures d’image, cela peut ne pas avoir d’importance sur le nombre de fois où le message sur la miniature générée est traité ; le résultat est que les miniatures sont générées et qu’elles sont identiques à chaque fois. D’autre part, les opérations telles que l’appel d’une passerelle de paiement pour facturer une carte de crédit peuvent ne pas être idempotentes du tout. Dans ces cas, vous devez vous assurer que le traitement d’un message plusieurs fois a l’effet attendu.

Ressources supplémentaires

Déduplication des messages d’événements d’intégration

Vous pouvez vous assurer que les événements de message sont envoyés et traités une seule fois par abonné à différents niveaux. L’une des façons consiste à utiliser une fonctionnalité de déduplication offerte par l’infrastructure de messagerie que vous utilisez. Une autre consiste à implémenter une logique personnalisée dans votre microservice de destination. Avoir des validations au niveau du transport et au niveau de l’application est votre meilleur résultat.

Déduplication d’événements de message au niveau d’EventHandler

Une façon de s’assurer qu’un événement n’est traité qu’une seule fois par n’importe quel récepteur est en implémentant une certaine logique lors du traitement des événements dans les gestionnaires d’événements. Par exemple, il s’agit de l’approche utilisée dans l’application eShopOnContainers, comme vous pouvez le voir dans le code source de la classe UserCheckoutAcceptedIntegrationEventHandler lorsqu’elle reçoit un événement d’intégration UserCheckoutAcceptedIntegrationEvent . (Dans ce cas, le CreateOrderCommand est entouré par un IdentifiedCommand, à l’aide de l’identificateur eventMsg.RequestId, avant de l’envoyer au gestionnaire de commandes).

Déduplication des messages avec RabbitMQ

Lorsque des défaillances réseau intermittentes se produisent, les messages peuvent être dupliqués et le récepteur de messages doit être prêt à gérer ces messages dupliqués. Si possible, les récepteurs doivent gérer les messages de manière idempotente, ce qui est mieux que de les gérer explicitement avec la déduplication.

Conformément à la documentation RabbitMQ, si un message est remis à un utilisateur, puis est replacé dans la file d’attente (parce qu’il n’a pas été accepté avant la perte de la connexion de l’utilisateur, par exemple), RabbitMQ lui associe l’indicateur de redistribution quand il est remis à nouveau (au même utilisateur ou à un autre).

Si l’indicateur « redelivered » est défini, le destinataire doit tenir compte de cela, car le message a peut-être déjà été traité. Mais cela n’est pas garanti ; le message n’a peut-être jamais atteint le destinataire après avoir quitté le répartiteur de messages, peut-être en raison de problèmes réseau. En revanche, si l’indicateur « redeliver » n’est pas défini, il est garanti que le message n’a pas été envoyé plusieurs fois. Ainsi, le récepteur doit dédupliquer les messages ou les traiter de manière idempotente uniquement si l’indicateur de redistribution est défini dans le message.

Ressources supplémentaires