Modifier

Modèle de boîte d’envoi transactionnelle avec Azure Cosmos DB

Azure Cosmos DB
Azure Service Bus
Azure Functions

L’implémentation de la messagerie fiable dans des systèmes distribués peut être difficile. Cet article explique comment utiliser le modèle de boîte d’envoi transactionnelle pour une messagerie fiable et une remise garantie des événements, un aspect important de la prise en charge du traitement des messages idempotent. Pour ce faire, vous utilisez le flux de modification et des lots transactionnels Azure Cosmos DB en association avec Azure Service Bus.

Vue d’ensemble

Les architectures de microservices sont de plus en plus populaires et semblent prometteuses pour la résolution de problèmes tels que la scalabilité, la maintenabilité et l’agilité, en particulier dans les grandes applications. Toutefois, ce modèle architectural soulève également des défis en termes de gestion des données. Dans les applications distribuées, chaque service gère indépendamment les données nécessaires à son fonctionnement dans un magasin de données dédié lui appartenant. Pour prendre en charge un tel scénario, vous utilisez généralement une solution de messagerie telle que RabbitMQ, Kafka ou Azure Service Bus qui distribue les données (événements) d’un service via un bus de messagerie à d’autres services de l’application. Les consommateurs internes ou externes peuvent alors s’abonner à ces messages et être avertis des modifications dès que les données sont manipulées.

Un exemple bien connu dans ce domaine est un système de commande : quand un utilisateur souhaite créer une commande, un service Ordering reçoit des données d’une application cliente via un point de terminaison REST. Il mappe la charge utile à une représentation interne d’un objet Order pour valider les données. Après une validation réussie dans la base de données, il publie un événement OrderCreated dans un bus de messages. Tout autre service intéressé par les nouvelles commandes (par exemple, un service Inventory ou Invoicing), s’abonne aux messages OrderCreated, les traite et les stocke dans sa propre base de données.

Le pseudo-code suivant montre à quoi ressemble ce processus du point de vue du service Ordering :

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Cette approche fonctionne bien jusqu’à ce qu’une erreur se produise entre l’enregistrement de l’objet de commande et la publication de l’événement correspondant. L’envoi d’un événement peut échouer à ce stade pour de nombreuses raisons :

  • Erreurs réseau
  • Panne du service de messages
  • Défaillance de l’hôte

Quelle que soit l’erreur, le résultat est que l’événement OrderCreated ne peut pas être publié dans le bus de messages. Les autres services ne sont pas informés qu’une commande a été créée. Le service Ordering doit maintenant prendre en charge divers éléments qui ne sont pas liés au processus métier réel. Il doit effectuer le suivi des événements qui doivent toujours être placés sur le bus de messages dès qu’il est de nouveau en ligne. Dans le pire des cas, des incohérences de données peuvent se produire dans l’application en raison de la perte d’événements.

Diagram that shows event handling without the Transactional Outbox pattern.

Solution

Il existe un modèle bien connu appelé boîte d’envoi transactionnelle qui peut vous aider à éviter ces situations. Il permet de s’assurer que les événements sont enregistrés dans un magasin de données (généralement dans une table de boîte d’envoi dans votre base de données) avant d’être envoyés à un répartiteur de messages. Si l’objet métier et les événements correspondants sont enregistrés dans la même transaction de base de données, il est garanti qu’aucune donnée ne sera perdue. Tout est validé ou, en cas d’erreur, tout est restauré. Pour parvenir à publier l’événement, un service ou un processus Worker différent interroge la table de boîte d’envoi pour déterminer les entrées non gérées, publie les événements et les marque comme étant traités. Ce modèle garantit que les événements ne sont pas perdus après la création ou la modification d’un objet métier.

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

Téléchargez un fichier Visio de cette architecture.

Dans une base de données relationnelle, l’implémentation du modèle est simple. Si le service utilise Entity Framework Core, par exemple, il se sert d’un contexte Entity Framework pour créer une transaction de base de données, enregistrer l’objet métier et l’événement, puis valider la transaction ou effectuer une restauration. En outre, le service Worker qui traite les événements est facile à implémenter : il interroge régulièrement la table de boîte d’envoi pour déterminer les nouvelles entrées, publie dans le bus de messages les événements récemment insérés et, enfin, marque ces entrées comme traitées.

Dans la pratique, les choses ne sont pas aussi simples qu’elles le paraissent de prime abord. Vous devez tout particulièrement vous assurer que l’ordre des événements est préservé afin qu’un événement OrderUpdated ne soit pas publié avant un événement OrderCreated.

Implémentation dans Azure Cosmos DB

Cette section montre comment implémenter le modèle de boîte d’envoi transactionnelle dans Azure Cosmos DB pour obtenir une messagerie fiable et conservant l’ordre entre différents services à l’aide du flux de modification Azure Cosmos DB et de Service Bus. Elle illustre un exemple de service qui gère des objets Contact (information FirstName, LastName, Email, Company et ainsi de suite). Elle utilise le modèle CQRS (séparation des responsabilités en matière de commande et de requête) et suit les concepts fondamentaux de la conception basée sur le domaine. Vous trouverez l’exemple de code de l’implémentation sur GitHub.

Un objet Contact dans l’exemple de service présente la structure suivante :

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Dès qu’un Contact est créé ou mis à jour, il émet des événements qui contiennent des informations sur la modification actuelle. Entre autres, les événements de domaine peuvent être :

  • ContactCreated. Déclenché quand un contact est ajouté.
  • ContactNameUpdated. Déclenché quand FirstName ou LastName est changé.
  • ContactEmailUpdated. Déclenché quand l’adresse e-mail est mise à jour.
  • ContactCompanyUpdated. Déclenché quand l’une des propriétés de la société est changée.

Lots transactionnels

Pour implémenter ce modèle, vous devez vous assurer que l’objet métier Contact et les événements correspondants sont enregistrés dans la même transaction de base de données. Dans Azure Cosmos DB, les transactions fonctionnent différemment que dans les systèmes de base de données relationnelle. Les transactions Azure Cosmos DB, appelées lots transactionnels, opèrent sur une seule partition logique, de sorte qu’elles garantissent les propriétés ACID (atomicité, cohérence, isolation et durabilité). Vous ne pouvez pas enregistrer deux documents dans une opération de traitement par lot transactionnel dans des conteneurs ou des partitions logiques différents. Pour l’exemple de service, cela signifie que l’objet métier et l’événement (ou les événements) sont placés dans le même conteneur et la même partition logique.

Contexte, dépôts et UnitOfWork

Le cœur de l’exemple d’implémentation est un contexte de conteneur qui effectue le suivi des objets enregistrés dans le même lot transactionnel. Il gère une liste d’objets créés et modifiés et opère sur un seul conteneur de Azure Cosmos DB. L’interface correspondante ressemble à ceci :

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

La liste dans le composant de contexte de conteneur effectue le suivi des objets Contact et DomainEvent. Les deux sont placés dans le même conteneur. Cela signifie que plusieurs types d’objets sont stockés dans le même conteneur Azure Cosmos DB et qu’ils utilisent une propriété Type pour faire la distinction entre un objet métier et un événement.

Pour chaque type, il existe un dépôt dédié qui définit et implémente l’accès aux données. L’interface du dépôt Contact fournit les méthodes suivantes :

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Le dépôt Event est similaire, à ceci près qu’il n’y a qu’une seule méthode, qui crée des événements dans le magasin :

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

Les implémentations des deux interfaces de dépôt obtiennent une référence via l’injection de dépendances à une instance IContainerContext unique pour que les deux fonctionnent sur le même contexte d’Azure Cosmos DB.

Le dernier composant est UnitOfWork, qui valide les modifications contenues dans l’instance IContainerContext dans Azure Cosmos DB :

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Gestion des événements : création et publication

Chaque fois qu’un objet Contact est créé, modifié ou supprimé (y compris de manière réversible), le service déclenche un événement correspondant. Le cœur de la solution fournie est une combinaison de conception basée sur le domaine (DDD, Domain-Driven Design) et du modèle de médiateur proposé par Jimmy Bogard. Il suggère de conserver une liste des événements qui se sont produits en raison des modifications apportées à l’objet de domaine et de publier ces événements avant d’enregistrer l’objet réel dans la base de données.

La liste des modifications est conservée dans l’objet de domaine lui-même afin qu’aucun autre composant ne puisse modifier la chaîne d’événements. Le comportement de la gestion des événements (instances IEvent) dans l’objet de domaine est défini via une interface IEventEmitter<IEvent> et implémenté dans une classe DomainEntity abstraite :

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

L’objet Contact déclenche des événements de domaine. L’entité Contact suit les concepts DDD de base, en configurant les méthodes setter des propriétés de domaine comme privées. Il n’existe pas de méthodes setter publiques dans la classe. Au lieu de cela, elle offre des méthodes pour manipuler l’état interne. Dans ces méthodes, les événements appropriés pour une certaine modification (par exemple, ContactNameUpdated ou ContactEmailUpdated) peuvent être déclenchés.

Voici un exemple de mise à jour du nom d’un contact. (L’événement est déclenché à la fin de la méthode.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

Le ContactNameUpdatedEvent correspondant, qui effectue le suivi des modifications, ressemble à ceci :

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Jusqu’à présent, les événements sont simplement journalisés dans l’objet de domaine et rien n’est enregistré dans la base de données ou même publié dans un répartiteur de messages. À la suite de la recommandation, la liste des événements est traitée juste avant l’enregistrement de l’objet métier dans le magasin de données. En l’occurrence, cela se produit dans la méthode SaveChangesAsync de l’instance IContainerContext, qui est implémentée dans une méthode RaiseDomainEvents privée. (dObjs est la liste des entités suivies du contexte de conteneur.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

Sur la dernière ligne, le package MediatR, implémentation du modèle de médiateur en C#, est utilisé pour publier un événement au sein de l’application. Cela est possible, car tous les événements comme ContactNameUpdatedEventimplémentent l’interface INotification du package MediatR.

Ces événements doivent être traités par un gestionnaire correspondant. Ici, l’implémentation IEventsRepository entre en jeu. Voici l’exemple du gestionnaire d’événements NameUpdated :

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Une instance IEventRepository est injectée dans la classe de gestionnaire par le biais du constructeur. Dès qu’un ContactNameUpdatedEvent est publié dans le service, la méthode Handle est appelée et utilise l’instance de dépôt des événements pour créer un objet de notification. Cet objet de notification est ensuite inséré dans la liste des objets suivis dans l’objet IContainerContext et rejoint les objets qui sont enregistrés dans le même lot transactionnel dans Azure Cosmos DB.

Jusque-là, le contexte de conteneur sait quels objets traiter. Pour parvenir à conserver les objets suivis dans Azure Cosmos DB, l’implémentation IContainerContext crée le lot transactionnel, ajoute tous les objets pertinents et exécute l’opération sur la base de données. Le processus décrit est géré dans la méthode SaveInTransactionalBatchAsync, qui est appelée par la méthode SaveChangesAsync.

Voici les parties importantes de l’implémentation dont vous avez besoin pour créer et exécuter le lot transactionnel :

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Voici une vue d’ensemble du fonctionnement du processus jusque-là (pour la mise à jour du nom sur un objet de contact) :

  1. Un client souhaite mettre à jour le nom d’un contact. La méthode SetName est appelée sur l’objet de contact et les propriétés sont mises à jour.
  2. L’événement ContactNameUpdated est ajouté à la liste des événements dans l’objet de domaine.
  3. La méthode Update du dépôt de contacts est appelée, qui ajoute l’objet de domaine au contexte de conteneur. L’objet est maintenant suivi.
  4. CommitAsync est appelé sur l’instance UnitOfWork, qui à son tour appelle SaveChangesAsync sur le contexte de conteneur.
  5. Dans SaveChangesAsync, tous les événements de la liste de l’objet de domaine sont publiés par une instance MediatR et sont ajoutés via le dépôt d’événements au même contexte de conteneur.
  6. Dans SaveChangesAsync, un TransactionalBatch est créé. Il est destiné à contenir à la fois l’objet de contact et l’événement.
  7. Le TransactionalBatch s’exécute et les données sont validées dans Azure Cosmos DB.
  8. SaveChangesAsync et CommitAsync se terminent correctement.

Persistance

Comme vous pouvez le voir dans les extraits de code précédents, tous les objets enregistrés dans Azure Cosmos DB sont wrappés dans une instance DataObject. Cet objet fournit des propriétés communes :

  • ID.
  • PartitionKey.
  • Type.
  • State. Comme Created, Updated n’est pas persistant dans Azure Cosmos DB.
  • Etag. Pour le verrouillage optimiste.
  • TTL. Propriété durée de vie pour le nettoyage automatique des anciens documents.
  • Data. Objet de données générique.

Ces propriétés sont définies dans une interface générique qui est appelée IDataObject et est utilisée par les dépôts et le contexte de conteneur :


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Les objets wrappés dans une instance DataObject et enregistrés dans la base de données ressemblent alors à cet exemple (Contact et ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

Vous pouvez voir que les documents Contact et ContactNameUpdatedEvent (type domainEvent) ont la même clé de partition et que les deux documents sont conservés dans la même partition logique.

Traitement du flux de modification

Pour lire le flux d’événements et les envoyer à un répartiteur de messages, le service utilise le flux de modification Azure Cosmos DB.

Le flux de modification est un journal persistant des modifications apportées à votre conteneur. Il fonctionne en arrière-plan et effectue le suivi des modifications. Dans une partition logique, l’ordre des modifications est garanti. La méthode la plus pratique pour lire le flux de modification consiste à utiliser une fonction Azure avec un déclencheur Azure Cosmos DB. Une autre option consiste à utiliser la bibliothèque du processeur de flux de modification. Elle vous permet d’intégrer le traitement du flux de modification à votre API web en tant que service d’arrière-plan (par le biais de l’interface IHostedService). L’exemple ci-dessous utilise une application console simple qui implémente la classe abstraite BackgroundService pour héberger des tâches en arrière-plan longues dans les applications .NET Core.

Pour recevoir les modifications du flux de modification Azure Cosmos DB, vous devez instancier un objet ChangeFeedProcessor, inscrire une méthode de gestionnaire pour le traitement des messages et commencer à écouter les modifications :

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Une méthode de gestionnaire (ici, HandleChangesAsync) traite ensuite les messages. Dans cet exemple, les événements sont publiés dans une rubrique Service Bus partitionnée pour à des fins de scalabilité et dont la fonctionnalité de déduplication est activée. Tout service intéressé par les modifications apportées aux objets Contact peut ensuite s’abonner à cette rubrique Service Bus, puis recevoir et traiter les modifications pour son propre contexte.

Les messages Service Bus générés ont une propriété SessionId. Quand vous utilisez des sessions dans Service Bus, vous garantissez que l’ordre des messages est préservé (FIFO). La conservation de l’ordre est nécessaire pour ce cas d’usage.

Voici l’extrait de code qui gère les messages du flux de modification :

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Gestion des erreurs

En cas d’erreur pendant le traitement des modifications, la bibliothèque de flux de modification redémarre la lecture des messages à la position où elle a correctement traité le dernier lot. Par exemple, si l’application a traité avec succès 10 000 messages, qu’elle travaille maintenant sur le lot 10 001 à 10 025 et qu’une erreur se produit, elle peut redémarrer et reprendre son travail à la position 10 001. La bibliothèque suit automatiquement ce qui a été traité via les informations enregistrées dans un Leasesconteneur dans Azure Cosmos DB.

Il est possible que le service ait déjà envoyé à Service Bus certains des messages qui sont retraités. Normalement, ce scénario entraînerait un doublon du traitement des messages. Comme indiqué plus haut, Service Bus dispose d’une fonctionnalité de détection des messages dupliqués que vous devez activer pour ce scénario. Le service vérifie si un message a déjà été ajouté à une rubrique Service Bus (ou à une file d’attente) en fonction de la propriété MessageId du message contrôlée par l’application. Cette propriété est définie sur l’ID du document d’événement. Si le même message est renvoyé à Service Bus, le service l’ignore et le supprime.

Tâches de nettoyage

Dans une implémentation typique de la boîte d’envoi transactionnelle, le service met à jour les événements gérés et définit une propriété Processed sur true, ce qui indique qu’un message a été publié avec succès. Ce comportement peut être implémenté manuellement dans la méthode du gestionnaire. Dans le scénario actuel, aucun processus de ce type n’est nécessaire. Azure Cosmos DB effectue le suivi des événements qui ont été traités avec le flux de modification (en association avec le conteneur Leases).

En guise de dernière étape, vous devez occasionnellement supprimer les événements du conteneur afin de conserver uniquement les enregistrements/documents les plus récents. Pour effectuer régulièrement un nettoyage, l’implémentation applique une autre fonctionnalité d’Azure Cosmos DB : la durée de vie (TTL) sur les documents. Azure Cosmos DB peut supprimer automatiquement des documents en fonction d’une propriété TTL qui peut être ajoutée à un document : un intervalle de temps en secondes. Le service vérifie constamment si le conteneur contient des documents qui ont une propriété TTL. Dès qu’un document expire, Azure Cosmos DB le supprime de la base de données.

Quand tous les composants fonctionnent comme prévu, les événements sont traités et publiés rapidement, en quelques secondes. En cas d’erreur dans Azure Cosmos DB, les événements ne sont pas envoyés au bus de messages, car l’objet métier et les événements correspondants ne peuvent pas être enregistrés dans la base de données. La seule chose à envisager est de définir une valeur TTL appropriée sur les documents DomainEvent quand le Worker d’arrière-plan (processeur de flux de modification) ou le bus de service ne sont pas disponibles. Dans un environnement de production, il est préférable de sélectionner un intervalle de temps de plusieurs jours. Par exemple, 10 jours. Tous les composants impliqués disposeront alors de suffisamment de temps pour traiter/publier les modifications au sein de l’application.

Résumé

Le modèle de boîte d’envoi transactionnelle résout le problème lié à la publication fiable d’événements de domaine dans des systèmes distribués. En validant l’état de l’objet métier et ses événements dans le même lot transactionnel et en utilisant un processeur d’arrière-plan en tant que relais de message, vous vous assurez que les autres services, internes ou externes, finissent par recevoir les informations dont ils dépendent. Cet exemple n’est pas une implémentation traditionnelle du modèle de boîte d’envoi transactionnelle. Il utilise des fonctionnalités telles que la durée de vie et le flux de modification Azure Cosmos DB, grâce auxquelles les choses demeurent simples et claires.

Voici un récapitulatif des composants Azure utilisés dans ce scénario :

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

Téléchargez un fichier Visio de cette architecture.

Les avantages de cette solution sont les suivants :

  • Messagerie fiable et livraison garantie des événements.
  • Conservation de l’ordre des événements et déduplication des messages via Service Bus.
  • Il n’est pas nécessaire de conserver une propriété Processed supplémentaire qui indique la réussite du traitement d’un document d’événement.
  • Suppression d’événements d’Azure Cosmos DB via TTL. Le processus ne consomme pas d’unités de requête nécessaires pour gérer les demandes de l’utilisateur et de l’application. Au lieu de cela, il utilise des unités de requête « restantes » dans une tâche en arrière-plan.
  • Traitement sans erreur des messages via ChangeFeedProcessor (ou une fonction Azure).
  • Facultatif : plusieurs processeurs de flux de modification, chacun conservant son propre pointeur dans le flux de modification.

Considérations

L’exemple d’application abordé dans cet article montre comment vous pouvez implémenter le modèle de boîte d’envoi transactionnelle sur Azure avec Azure Cosmos DB et Service Bus. Il existe également d’autres approches qui utilisent des bases de données NoSQL. Pour que l’objet métier et les événements soient enregistrés de manière fiable dans la base de données, vous pouvez incorporer la liste des événements dans le document de l’objet métier. L’inconvénient de cette approche est que le processus de nettoyage doit mettre à jour chaque document qui contient des événements. Ce n’est pas idéal, en particulier en termes de coût d’unité de demande, par rapport à l’utilisation de la durée de vie.

Ne considérez surtout pas que l’exemple de code fourni ici est prêt pour la production. Il présente certaines limitations en matière de multithreading, notamment la façon dont les événements sont gérés dans la classe DomainEntity et la façon dont les objets sont suivis dans les implémentations CosmosContainerContext. Utilisez-le comme point de départ pour vos propres implémentations. Vous pouvez également utiliser des bibliothèques existantes qui ont déjà cette fonctionnalité intégrée, comme NServiceBus ou MassTransit.

Déployer ce scénario

Vous pouvez trouver le code source, les fichiers de déploiement et les instructions permettant de tester ce scénario sur GitHub : https://github.com/mspnp/transactional-outbox-pattern.

Contributeurs

Cet article est géré par Microsoft. Il a été écrit à l’origine par les contributeurs suivants.

Auteur principal :

Pour afficher les profils LinkedIn non publics, connectez-vous à LinkedIn.

Étapes suivantes

Pour en savoir plus, consultez les articles suivants :