Modello Outbox transazionale con Azure Cosmos DB

Azure Cosmos DB
Bus di servizio di Azure
Funzioni di Azure

L'implementazione della messaggistica affidabile nei sistemi distribuiti può essere complessa. Questo articolo descrive come usare il modello Outbox transazionale per la messaggistica affidabile e il recapito garantito degli eventi, una parte importante del supporto dell'elaborazione dei messaggi idempotenti. A tale scopo, si useranno batch transazionali di Azure Cosmos DB e il feed di modifiche in combinazione con il bus di servizio di Azure.

Panoramica

Le architetture di microservizi stanno diventando sempre più diffuse con la promessa di risolvere problemi come scalabilità, gestibilità e agilità, soprattutto nelle applicazioni di grandi dimensioni. Questo modello di architettura introduce tuttavia alcune problematiche in merito alla gestione dei dati. Nelle applicazioni distribuite ogni servizio gestisce in modo indipendente i dati necessari per operare in un archivio dati dedicato di proprietà del servizio. Per supportare questo scenario, in genere si usa una soluzione di messaggistica come RabbitMQ, Kafka o il bus di servizio di Azure che distribuisce dati (eventi) da un servizio tramite un bus di messaggistica ad altri servizi dell'applicazione. I consumer interni o esterni possono quindi sottoscrivere tali messaggi e ricevere notifiche delle modifiche non appena i dati vengono modificati.

Un esempio noto è un sistema di gestione degli ordini: quando un utente vuole creare un ordine, un servizio Ordering riceve i dati da un'applicazione client tramite un endpoint REST. Esegue il mapping del payload a una rappresentazione interna di un oggetto Order per convalidare i dati. Dopo il commit nel database, pubblica un evento OrderCreated in un bus di messaggi. Qualsiasi altro servizio interessato a nuovi ordini, ad esempio un servizio Inventoryo Invoicing, sottoscrive i messaggi OrderCreated, li elabora e li archivia nel proprio database.

Lo pseudocodice seguente mostra l'aspetto tipico di questo processo dal punto di vista del servizio Ordering:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Questo approccio funziona correttamente fino a quando non si verifica un errore tra il salvataggio dell'oggetto ordine e la pubblicazione dell'evento corrispondente. In questa fase, l'invio di un evento potrebbe non riuscire per diversi motivi:

  • errori di rete
  • Interruzione del servizio messaggi
  • Errore dell'host

Indipendentemente dall'errore, il risultato è che l'evento OrderCreated non può essere pubblicato nel bus di messaggi. Ad altri servizi non verrà notificato che è stato creato un ordine. Il servizio Ordering deve ora gestire vari elementi che non sono correlati al processo di business effettivo. Deve tenere traccia degli eventi che devono ancora essere inseriti nel bus di messaggi non appena torna online. Può anche verificarsi lo scenario peggiore: incoerenze dei dati nell'applicazione a causa di eventi persi.

Diagramma che mostra la gestione degli eventi senza il modello Outbox transazionale.

Soluzione

Esiste un modello noto denominato Outbox transazionale che consente di evitare queste situazioni. Garantisce che gli eventi siano salvati in un archivio dati (in genere in una tabella Outbox nel database) prima di essere infine inviati a un broker di messaggi. Se l'oggetto business e gli eventi corrispondenti vengono salvati all'interno della stessa transazione di database, è garantito che non verrà perso alcun dato. Verrà eseguito il commit di tutti gli elementi oppure verrà eseguito il rollback di tutti gli elementi in caso di errore. Per pubblicare l'evento, un altro servizio o processo di lavoro esegue una query sulla tabella Outbox per trovare voci non gestite, pubblica gli eventi e li contrassegna come elaborati. Questo modello garantisce che gli eventi non vadano persi dopo la creazione o la modifica di un oggetto business.

Diagramma che mostra la gestione degli eventi con il modello Outbox transazionale e un servizio di inoltro per la pubblicazione di eventi nel broker di messaggi.

Scaricare un file di Visio di questa architettura.

In un database relazionale l'implementazione del modello è semplice. Se ad esempio il servizio usa Entity Framework Core, userà un contesto Entity Framework per creare una transazione di database, salvare l'oggetto business e l'evento ed eseguire il commit della transazione o eseguire un rollback. Anche il servizio del ruolo di lavoro che elabora gli eventi è facile da implementare: esegue periodicamente query sulla tabella Outbox per trovare nuove voci, pubblica gli eventi appena inseriti nel bus di messaggi e infine contrassegna queste voci come elaborate.

In pratica, le cose non sono così semplici come potrebbero sembrare inizialmente. Soprattutto, è necessario assicurarsi che l'ordine degli eventi venga mantenuto affinché un evento OrderUpdated non venga pubblicato prima di un evento OrderCreated.

Implementazione in Azure Cosmos DB

Questa sezione illustra come implementare il modello Outbox transazionale in Azure Cosmos DB per ottenere un servizio di messaggistica affidabile e in ordine tra servizi diversi con l'aiuto del feed di modifiche di Azure Cosmos DB e del bus di servizio. Viene illustrato un servizio di esempio che gestisce gli oggetti Contact (informazioni su FirstName, LastName, Email, Company e così via). Usa il modello Command and Query Responsibility Segregation (CQRS) e segue i concetti di base relativi alla progettazione basata su dominio. Iil codice di esempio per l'implementazione è disponibile in GitHub.

Un oggetto Contact nel servizio di esempio presenta la struttura seguente:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Non appena un oggetto Contact viene creato o aggiornato, genera eventi che contengono informazioni sulla modifica corrente. Tra gli altri, gli eventi di dominio possono essere:

  • ContactCreated. Generati quando viene aggiunto un contatto.
  • ContactNameUpdated. Generati quando viene modificato FirstName o LastName.
  • ContactEmailUpdated. Generati quando l'indirizzo di posta elettronica viene aggiornato.
  • ContactCompanyUpdated. Generati quando viene modificata una delle proprietà relative all'azienda.

Batch transazionali

Per implementare questo modello, è necessario assicurarsi che l'oggetto business Contact e gli eventi corrispondenti verranno salvati nella stessa transazione di database. In Azure Cosmos DB, le transazioni funzionano in modo diverso rispetto ai sistemi di database relazionali. Le transazioni di Azure Cosmos DB, denominate batch transazionali, operano su una singola partizione logica, in modo da garantire le proprietà di atomicità, coerenza, isolamento e durabilità (ACID). Non è possibile salvare due documenti in un'operazione batch transazionale in contenitori o partizioni logiche diverse. Per il servizio di esempio, significa che l'oggetto business e l'evento o gli eventi verranno inseriti nello stesso contenitore e nella stessa partizione logica.

Contesto, repository e unità di lavoro

Il nucleo dell'implementazione di esempio è un contesto del contenitore che tiene traccia degli oggetti salvati nello stesso batch transazionale. Gestisce un elenco di oggetti creati e modificati e opera in un singolo contenitore di Azure Cosmos DB. L'interfaccia è simile alla seguente:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

L'elenco nel componente del contesto del contenitore tiene traccia degli oggetti Contact e DomainEvent. Entrambi verranno inseriti nello stesso contenitore. Ciò significa che più tipi di oggetti vengono archiviati nello stesso contenitore di Azure Cosmos DB e usano una proprietà Type per la distinzione tra un oggetto business e un evento.

Per ogni tipo è presente un repository dedicato che definisce e implementa l'accesso ai dati. L'interfaccia del repository Contact fornisce questi metodi:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Il repository Event ha un aspetto simile, ad eccezione del fatto che è presente un solo metodo che crea nuovi eventi nell'archivio:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

Le implementazioni di entrambe le interfacce del repository ottengono un riferimento tramite inserimento delle dipendenze in un'istanza di IContainerContext singola per garantire che entrambe funzionino nello stesso contesto di Azure Cosmos DB.

L'ultimo componente è UnitOfWork, che esegue il commit delle modifiche contenute nell'istanza di IContainerContext in Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Gestione degli eventi: creazione e pubblicazione

Ogni volta che un oggetto Contact viene creato, modificato o eliminato (temporaneamente),il servizio genera un evento corrispondente. Il nucleo della soluzione fornita è una combinazione di progettazione basata su domini e il modello di mediatore proposto da Jimmy Bogard. Suggerisce di mantenere un elenco di eventi che si sono verificati a causa di modifiche dell'oggetto dominio e di pubblicare questi eventi prima di salvare l'oggetto effettivo nel database.

L'elenco delle modifiche viene mantenuto nell'oggetto dominio stesso affinché nessun altro componente possa modificare la catena di eventi. Il comportamento di gestione degli eventi (istanze di IEvent) nell'oggetto dominio viene definito tramite un'interfaccia IEventEmitter<IEvent> e implementato in una classe DomainEntity astratta:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

L'oggetto Contact genera gli eventi di dominio. L'entità Contact segue i concetti della progettazione basata su domini di base, configurando i setter delle proprietà di dominio come privati. Nella classe non sono presenti setter pubblici. Offre invece metodi per modificare lo stato interno. In questi metodi possono essere generati gli eventi appropriati per una determinata modifica (ad esempio ContactNameUpdated o ContactEmailUpdated).

Di seguito è riportato un esempio per gli aggiornamenti al nome di un contatto. L'evento viene generato alla fine del metodo.

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

L'oggetto ContactNameUpdatedEventcorrispondente, che tiene traccia delle modifiche, è simile al seguente:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Finora, gli eventi vengono semplicemente registrati nell'oggetto dominio e nulla viene salvato nel database o persino pubblicato in un broker di messaggi. Seguendo la raccomandazione, l'elenco di eventi verrà elaborato subito prima che l'oggetto business venga salvato nell'archivio dati. In questo caso, l'operazione si verifica nel metodo SaveChangesAsync dell'istanza di IContainerContext, implementata in un metodo RaiseDomainEvents privato. dObjs è l'elenco di entità tracciate del contesto del contenitore.

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

Nell'ultima riga, il pacchetto MediatR, un'implementazione del modello di mediatore in C#, viene usato per pubblicare un evento all'interno dell'applicazione. Questa operazione è possibile perché tutti gli eventi come ContactNameUpdatedEvent implementano l'interfaccia INotification del pacchetto MediatR.

Questi eventi devono essere elaborati da un gestore corrispondente. In questo caso, entra in gioco l'implementazione di IEventsRepository. Ecco l'esempio del gestore dell'evento NameUpdated:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Un'istanza di IEventRepository viene inserita nella classe del gestore tramite il costruttore. Non appena un oggetto ContactNameUpdatedEvent viene pubblicato nel servizio, viene richiamato il metodo Handle e viene utilizzata l'istanza del repository di eventi per creare un oggetto notifica. Tale oggetto notifica viene a sua volta inserito nell'elenco di oggetti tracciati nell'oggetto IContainerContext e aggiunge gli oggetti salvati nello stesso batch transazionale ad Azure Cosmos DB.

Finora, il contesto del contenitore conosce gli oggetti da elaborare. Per rendere persistenti gli oggetti tracciati in Azure Cosmos DB, l'implementazione di IContainerContext crea il batch transazionale, aggiunge tutti gli oggetti pertinenti ed esegue l'operazione sul database. Il processo descritto viene gestito nel metodo SaveInTransactionalBatchAsync, che viene richiamato dal metodo SaveChangesAsync.

Ecco le parti importanti dell'implementazione necessarie per creare ed eseguire il batch transazionale:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects);

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Ecco una panoramica del funzionamento del processo fino a questo punto (per aggiornare il nome in un oggetto contatto):

  1. Un client vuole aggiornare il nome di un contatto. Il metodo SetName viene richiamato sull'oggetto contatto e le proprietà vengono aggiornate.
  2. L'evento ContactNameUpdated viene aggiunto all'elenco di eventi nell'oggetto dominio.
  3. Viene richiamato il metodo del repository Update dei contatti, che aggiunge l'oggetto dominio al contesto del contenitore. L'oggetto viene ora tracciato.
  4. CommitAsync viene richiamato sull'istanza di UnitOfWork, che a sua volta chiama SaveChangesAsync sul contesto del contenitore.
  5. All'interno di SaveChangesAsync tutti gli eventi nell'elenco dell'oggetto dominio vengono pubblicati da un'istanza di MediatR e vengono aggiunti tramite il repository di eventi allo stesso contesto del contenitore.
  6. In SaveChangesAsync viene creato un oggetto TransactionalBatch. Includerà sia l'oggetto contatto che l'evento.
  7. Viene eseguito TransactionalBatch e viene eseguito il commit dei dati in Azure Cosmos DB.
  8. Vengono quindi restituiti SaveChangesAsync e CommitAsync.

Persistenza

Come si può osservare nei frammenti di codice precedenti, tutti gli oggetti salvati in Azure Cosmos DB vengono incapsulati in un'istanza di DataObject. Questo oggetto fornisce proprietà comuni:

  • ID.
  • PartitionKey.
  • Type.
  • State. Come Created, Updated non verrà salvato in modo permanente in Azure Cosmos DB.
  • Etag. Per il blocco ottimistico.
  • TTL. Proprietà Durata TTL per la pulizia automatica dei documenti precedenti.
  • Data. Oggetto dati generico.

Queste proprietà sono definite in un'interfaccia generica denominata IDataObject e usata dai repository e dal contesto del contenitore:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Gli oggetti di cui è stato eseguito il wrapping in un'istanza di DataObject e salvati nel database saranno quindi simili a questo esempio (Contact e ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

È possibile osservare che i documenti Contact e ContactNameUpdatedEvent (tipo domainEvent) hanno la stessa chiave di partizione e che entrambi i documenti verranno salvati in modo permanente nella stessa partizione logica.

Elaborazione del feed di modifiche

Per leggere il flusso di eventi e inviarli a un broker di messaggi, il servizio userà il feed di modifiche di Azure Cosmos DB.

Il feed di modifiche è un log persistente delle modifiche nel contenitore. Opera in background e tiene traccia delle modifiche. All'interno di una partizione logica, l'ordine delle modifiche è garantito. Il modo più pratico per leggere il feed di modifiche è usare una funzione di Azure con un trigger di Azure Cosmos DB. In alternativa, è possibile usare la libreria del processore del feed di modifiche. Consente di integrare l'elaborazione del feed di modifiche nell'API Web come servizio in background (tramite l'interfaccia IHostedService). L'esempio usa una semplice applicazione console che implementa la classe astratta BackgroundService per ospitare attività in background a esecuzione prolungata nelle applicazioni .NET Core.

Per ricevere le modifiche dal feed di modifiche di Azure Cosmos DB, è necessario creare un'istanza di un oggetto ChangeFeedProcessor, registrare un metodo gestore per l'elaborazione dei messaggi e avviare l'ascolto delle modifiche:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Un metodo gestore (in questo caso, HandleChangesAsync) elabora quindi i messaggi. In questo esempio gli eventi vengono pubblicati in un argomento del bus di servizio partizionato per la scalabilità e con la funzionalità di deduplicazione abilitata. Qualsiasi servizio interessato alle modifiche agli oggetti Contact può quindi sottoscrivere tale argomento del bus di servizio e ricevere ed elaborare le modifiche per il proprio contesto.

I messaggi del bus di servizio generati hanno una proprietà SessionId. Quando si usano sessioni in bus di servizio, si garantisce che l'ordine dei messaggi venga mantenuto (prima in, first out (FIFO)). Il rispetto dell'ordine è necessario per questo caso d'uso.

Ecco il frammento di codice che gestisce i messaggi dal feed di modifiche:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Gestione degli errori

Se si verifica un errore durante l'elaborazione delle modifiche, la libreria del feed di modifiche riavvierà la lettura dei messaggi nella posizione in cui ha elaborato correttamente l'ultimo batch. Ad esempio, se l'applicazione ha elaborato correttamente 10.000 messaggi e mentre sta operando sul batch da 10.001 a 10.025 si verifica un errore, può riavviare e riprendere l'operazione nella posizione 10.001. La libreria tiene automaticamente traccia degli elementi elaborati tramite le informazioni salvate in un contenitore Leases in Azure Cosmos DB.

È possibile che il servizio abbia già inviato alcuni dei messaggi rielaborati al bus di servizio. In genere, questo scenario potrebbe determinare l'elaborazione di messaggi duplicati. Come specificato in precedenza, il bus di servizio dispone di una funzionalità per il rilevamento dei messaggi duplicati che è necessario abilitare per questo scenario. Il servizio controlla se un messaggio è già stato aggiunto a un argomento del bus di servizio (o coda) in base alla proprietà MessageId del messaggio controllata dall'applicazione. Tale proprietà è impostata sull'oggetto ID del documento dell'evento. Se lo stesso messaggio viene inviato di nuovo al bus di servizio, il servizio lo ignora e lo elimina.

Gestione

In una tipica implementazione del modello Outbox transazionale, il servizio aggiorna gli eventi gestiti e imposta una proprietà Processed su true, per indicare che un messaggio è stato pubblicato correttamente. Questo comportamento può essere implementato manualmente nel metodo gestore. Nello scenario corrente non è necessario un processo di questo tipo. Azure Cosmos DB tiene traccia degli eventi elaborati usando il feed di modifiche (in combinazione con il contenitore Leases).

Come ultimo passaggio, in alcuni casi è necessario eliminare gli eventi dal contenitore in modo da mantenere solo i record o i documenti più recenti. Per eseguire una pulizia periodica, l'implementazione applica un'altra funzionalità di Azure Cosmos DB, ovvero Durata (TTL) (TTL) sui documenti. Azure Cosmos DB può eliminare automaticamente i documenti in base a una proprietà TTL che può essere aggiunta a un documento: un intervallo di tempo in secondi. Il servizio controllerà costantemente il contenitore per verificare la presenza di documenti con una proprietà TTL. Non appena un documento scade, Azure Cosmos DB lo rimuoverà dal database.

Quando tutti i componenti funzionano come previsto, gli eventi vengono elaborati e pubblicati rapidamente in pochi secondi. Se si verifica un errore in Azure Cosmos DB, gli eventi non verranno inviati al bus di messaggi, perché sia l'oggetto business che gli eventi corrispondenti non possono essere salvati nel database. L'unico aspetto da considerare è impostare un valore TTL appropriato nei documenti DomainEvent quando il ruolo di lavoro in background (processore del feed di modifiche) o il bus di servizio non sono disponibili. In un ambiente di produzione è consigliabile scegliere un intervallo di tempo di più giorni. Ad esempio: 10 giorni. Tutti i componenti coinvolti avranno quindi tempo sufficiente per elaborare/pubblicare le modifiche all'interno dell'applicazione.

Riepilogo

Il modello Outbox transazionale risolve il problema della pubblicazione affidabile degli eventi di dominio nei sistemi distribuiti. Il commit dello stato dell'oggetto business e dei relativi eventi nello stesso batch transazionale e l'uso di un processore in background come inoltro dei messaggi garantiscono che altri servizi, interni o esterni, ricevano le informazioni da cui dipendono. Questo esempio non è un'implementazione tradizionale del modello Outbox transazionale. Usa funzionalità come il feed di modifiche di Azure Cosmos DB e la Durata (TT)L per garantire semplicità e ordine.

Ecco un riepilogo dei componenti di Azure usati in questo scenario:

Diagramma che mostra i componenti di Azure per implementare La posta in uscita transazionale con Azure Cosmos DB e bus di servizio di Azure.

Scaricare un file di Visio di questa architettura.

I vantaggi di questa soluzione sono:

  • Messaggistica affidabile e recapito garantito di eventi.
  • Ordine degli eventi mantenuto e deduplicazione dei messaggi tramite il bus di servizio.
  • Non è necessario mantenere una proprietà Processed aggiuntiva che indica la corretta elaborazione di un documento di eventi.
  • Eliminazione di eventi da Azure Cosmos DB tramite durata (TTL). Il processo non utilizza le unità richiesta necessarie per la gestione delle richieste utente/applicazione. Usa invece unità richiesta "residue" in un'attività in background.
  • Elaborazione a prova di errore dei messaggi tramite ChangeFeedProcessor (o una funzione di Azure).
  • Facoltativo: più processori di feed di modifiche, ognuno dei quali mantiene il proprio puntatore nel feed di modifiche.

Considerazioni

L'applicazione di esempio illustrata in questo articolo illustra come implementare il modello Outbox transazionale in Azure con Azure Cosmos DB e il bus di servizio. Esistono anche altri approcci che usano database NoSQL. Per garantire che l'oggetto business e gli eventi verranno salvati in modo affidabile nel database, è possibile incorporare l'elenco di eventi nel documento dell'oggetto business. Lo svantaggio di questo approccio è che il processo di pulizia dovrà aggiornare ogni documento che contiene eventi. Non si tratta della soluzione ideale, soprattutto in termini di costo di unità richiesta, rispetto ai vantaggi offerti dalla Durata (TTL).

Tenere presente che il codice di esempio fornito qui non deve essere considerato come codice pronto per l'ambiente di produzione. Presenta alcune limitazioni relative al multithreading, in particolare il modo in cui gli eventi vengono gestiti nella classe DomainEntity e il modo in cui gli oggetti vengono registrati nelle implementazioni di CosmosContainerContext. Usarlo come punto di partenza per le proprie implementazioni personalizzate. In alternativa, prendere in considerazione l'uso di librerie esistenti che dispongono già di questa funzionalità incorporata, ad esempio NServiceBus o MassTransit.

Distribuire lo scenario

Il codice sorgente, i file di distribuzione e le istruzioni per testare questo scenario sono disponibili in GitHub: https://github.com/mspnp/transactional-outbox-pattern.

Collaboratori

Questo articolo viene gestito da Microsoft. Originariamente è stato scritto dai seguenti contributori.

Autore principale:

Per visualizzare i profili LinkedIn non pubblici, accedere a LinkedIn.

Passaggi successivi

Per altre informazioni, vedere questi articoli: