Patrón de bandeja de salida transaccional con Azure Cosmos DB

Azure Cosmos DB
Azure Service Bus
Azure Functions

La implementación de mensajería confiable en sistemas distribuidos puede resultar compleja. En este artículo se describe cómo usar el patrón de bandeja de salida de transacciones para una mensajería confiable y la entrega garantizada de eventos, una parte importante de la compatibilidad con el procesamiento de mensajes idempotentes. Para ello, usará los lotes transaccionales de Azure Cosmos DB y la fuente de cambios en combinación con Azure Service Bus.

Información general

Las arquitecturas de microservicios son cada vez más populares y muestran la promesa de resolver problemas como la escalabilidad, el mantenimiento y la agilidad, especialmente en aplicaciones de gran tamaño. Pero este patrón arquitectónico también presenta desafíos en lo que respecta al control de datos. En las aplicaciones distribuidas, cada servicio mantiene de forma independiente los datos que necesita para funcionar en un almacén de datos propiedad del servicio dedicado. Para admitir este escenario, normalmente se usa una solución de mensajería como RabbitMQ, Kafka o Azure Service Bus, que distribuye datos (eventos) desde un servicio a través de un bus de mensajería a otros servicios de la aplicación. Los consumidores internos o externos pueden suscribirse a esos mensajes y recibir notificaciones de los cambios en cuanto se manipulan los datos.

Un ejemplo conocido de esa área es un sistema de ordenación: cuando un usuario quiere crear un pedido, un servicio Ordering recibe datos de una aplicación cliente a través de un punto de conexión REST. Asigna la carga a una representación interna de un objeto Order para validar los datos. Después de una confirmación correcta en la base de datos, publica un evento OrderCreated en un bus de mensajes. Cualquier otro servicio interesado en nuevos pedidos (por ejemplo, un servicio Inventory o Invoicing) se suscribiría a los mensajes OrderCreated, los procesaría y los almacenaría en su propia base de datos.

El pseudocódigo siguiente muestra el aspecto habitual de este proceso desde la perspectiva del servicio Ordering:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Este enfoque funciona bien hasta que se produce un error entre guardar el objeto de pedido y publicar el evento correspondiente. El envío de un evento puede producir un error en este momento por muchas razones:

  • Errores de red
  • Interrupción del servicio de mensajes
  • Error del host

Sea cual sea el error, el resultado es que el evento OrderCreated no se puede publicar en el bus de mensajes. No se notificará a otros servicios que se ha creado un pedido. El servicio Ordering ahora tiene que ocuparse de varias cosas que no están relacionadas con el proceso empresarial real. Debe realizar un seguimiento de los eventos que todavía deben colocarse en el bus de mensajes en cuanto vuelva a estar en línea. Incluso puede darse el peor de los casos: incoherencias en los datos de la aplicación debido a la pérdida de eventos.

Diagram that shows event handling without the Transactional Outbox pattern.

Solución

Hay un patrón conocido denominado Bandeja de salida transaccional que puede ayudarle a evitar estas situaciones. Garantiza que los eventos se guardan en un almacén de datos (normalmente en una tabla de bandeja de salida de la base de datos) antes de insertarse en última instancia en un agente de mensajes. Si el objeto empresarial y los eventos correspondientes se guardan en la misma transacción de base de datos, se garantiza que no se perderá ningún dato. Todo se confirmará, o todo se revertirá si se produce un error. Para publicar el evento, un proceso de trabajo o servicio diferente consulta la tabla de bandeja de salida en busca de entradas no controladas, publica los eventos y los marca como procesados. Este patrón garantiza que los eventos no se pierdan después de crear o modificar un objeto empresarial.

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

Descargue un archivo Visio de esta arquitectura.

En una base de datos relacional, la implementación del patrón es sencilla. Si el servicio usa Entity Framework Core, por ejemplo, usará un contexto de Entity Framework para crear una transacción de base de datos, guardar el objeto empresarial y el evento y confirmar la transacción —o realizar una reversión—. Además, el servicio de trabajo que procesa eventos es fácil de implementar: consulta periódicamente en la tabla Bandeja de salida nuevas entradas, publica los eventos recién insertados en el bus de mensajes y, por último, marca estas entradas como procesadas.

En la práctica, las cosas no son tan fáciles como podrían parecer a primera vista. Lo más importante es que debe asegurarse de que el orden de los eventos se conserva para que un evento OrderUpdated no se publique antes de un evento OrderCreated.

Implementación en Azure Cosmos DB

En esta sección se muestra cómo implementar el patrón Bandeja de salida transaccional en Azure Cosmos DB para lograr una mensajería confiable y en orden entre distintos servicios con la ayuda de la fuente de cambios de Azure Cosmos DB y Service Bus. Muestra un servicio de ejemplo que administra objetos Contact (información de FirstName, LastName, Email, Company, etc). Usa el patrón de segregación de responsabilidades de comandos y consultas (CQRS) y sigue los conceptos básicos de diseño basado en dominios. Puede encontrar el código de ejemplo de nuestra implementación en GitHub.

Un objeto Contact del servicio de ejemplo tiene la estructura siguiente:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

En cuanto se crea o actualiza Contact, emite eventos que contienen información sobre el cambio actual. Entre otros, los eventos de dominio pueden ser:

  • ContactCreated. Se genera cuando se agrega un contacto.
  • ContactNameUpdated. Se genera cuando se cambia FirstName o LastName.
  • ContactEmailUpdated. Se genera cuando se actualiza la dirección de correo electrónico.
  • ContactCompanyUpdated. Se genera cuando se cambia cualquiera de las propiedades de la empresa.

Lotes transaccionales

Para implementar este patrón, debe asegurarse de que el objeto empresarial Contact y los eventos correspondientes se guarden en la misma transacción de base de datos. En Azure Cosmos DB, las transacciones funcionan de forma diferente que en los sistemas de bases de datos relacionales. Las transacciones de Azure Cosmos DB, denominadas lotes transaccionales, funcionan en una sola partición lógica, por lo que garantizan propiedades de atomicidad, coherencia, aislamiento y durabilidad (ACID). No se pueden guardar dos documentos de una operación de lote transaccional en diferentes contenedores o particiones lógicas. Para el servicio de ejemplo, esto significa que tanto el objeto empresarial como el evento o eventos se colocarán en el mismo contenedor y partición lógica.

Contexto, repositorios y UnitOfWork

El núcleo de la implementación de ejemplo es un contexto de contenedor que realiza un seguimiento de los objetos guardados en el mismo lote transaccional. Mantiene una lista de objetos creados y modificados y funciona en un único contenedor de Azure Cosmos DB. La interfaz para ello tiene este aspecto:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

La lista del componente de contexto de contenedor realiza un seguimiento de los objetos Contact y DomainEvent. Ambos se colocarán en el mismo contenedor. Esto significa que varios tipos de objetos se almacenan en el mismo contenedor de Azure Cosmos DB y usan una propiedad Type para distinguir entre un objeto empresarial y un evento.

Para cada tipo, hay un repositorio dedicado que define e implementa el acceso a datos. La interfaz del repositorio Contact proporciona estos métodos:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

El repositorio Event tiene un aspecto similar, salvo que solo hay un método, que crea nuevos eventos en el almacén:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

Las implementaciones de ambas interfaces de repositorio obtienen una referencia a través de la inserción de dependencias en una sola instancia de IContainerContext para asegurarse de que ambas funcionan en el mismo contexto de Azure Cosmos DB.

El último componente es UnitOfWork, que confirma los cambios contenidos en la instancia de IContainerContext en Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Control de eventos: creación y publicación

Cada vez que se crea, modifica o elimina (temporalmente) un objeto Contact, el servicio genera un evento correspondiente. El núcleo de la solución proporcionada es una combinación de diseño basado en dominios (DDD) y el patrón de mediador propuesto por Jimmy Bogard. Sugiere mantener una lista de eventos que se han producido debido a modificaciones del objeto de dominio y publicar estos eventos antes de guardar el objeto real en la base de datos.

La lista de cambios se mantiene en el propio objeto de dominio para que ningún otro componente pueda modificar la cadena de eventos. El comportamiento de mantener eventos (instancias de IEvent) en el objeto de dominio se define a través de una interfaz IEventEmitter<IEvent> y se implementa en una clase DomainEntity abstracta:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

El objeto Contact genera eventos de dominio. La entidad Contact sigue los conceptos básicos de DDD, configurando los establecedores de las propiedades de dominio como privados. No existe ningún establecedor público en la clase. En su lugar, ofrece métodos para manipular el estado interno. En estos métodos, se pueden generar los eventos adecuados para una determinada modificación (por ejemplo, ContactNameUpdated o ContactEmailUpdated).

Este es un ejemplo de las actualizaciones del nombre de un contacto. (El evento se genera al final del método).

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

El ContactNameUpdatedEvent correspondiente, que realiza el seguimiento de los cambios, tiene el siguiente aspecto:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Hasta ahora, los eventos solo se registran en el objeto de dominio y no se guarda nada en la base de datos ni se publica en un agente de mensajes. Siguiendo la recomendación, la lista de eventos se procesará justo antes de que el objeto empresarial se guarde en el almacén de datos. En este caso, sucede en el método SaveChangesAsync de la instancia de IContainerContext, que se implementa en un método RaiseDomainEvents privado. (dObjs es la lista de entidades con seguimiento del contexto del contenedor).

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

En la última línea, se usa el paquete MediatR, una implementación del patrón de mediador en C#, para publicar un evento dentro de la aplicación. Esto es posible porque todos los eventos como ContactNameUpdatedEvent implementan la interfaz INotification del paquete MediatR.

Estos eventos deben procesarse por un controlador correspondiente. En este caso, la implementación IEventsRepository entra en juego. Este es el ejemplo del controlador de eventos NameUpdated:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Una instancia de IEventRepository se inserta en la clase de controlador a través del constructor. En cuanto se publica ContactNameUpdatedEvent en el servicio, se invoca el método Handle y usa la instancia del repositorio de eventos para crear un objeto de notificación. Ese objeto de notificación, a su vez, se inserta en la lista de objetos con seguimiento del objeto IContainerContext y une los objetos guardados en el mismo lote transaccional en Azure Cosmos DB.

Hasta ahora, el contexto del contenedor sabe qué objetos procesar. Para conservar finalmente los objetos con seguimiento en Azure Cosmos DB, la implementación IContainerContext crea el lote transaccional, agrega todos los objetos pertinentes y ejecuta la operación en la base de datos. El proceso descrito se controla en el método SaveInTransactionalBatchAsync, invocado por el método SaveChangesAsync.

Estas son las partes importantes de la implementación que necesita para crear y ejecutar el lote transaccional:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Esta es una introducción al funcionamiento del proceso hasta ahora (para actualizar el nombre en un objeto de contacto):

  1. Un cliente quiere actualizar el nombre de un contacto. Se invoca el método SetName en el objeto de contacto y se actualizan las propiedades.
  2. Se agrega el evento ContactNameUpdated a la lista de eventos del objeto de dominio.
  3. Se invoca el método Update del repositorio de contactos, que agrega el objeto de dominio al contexto del contenedor. Ahora se realiza el seguimiento del objeto.
  4. CommitAsync se invoca en la instancia de UnitOfWork, que a su vez llama a SaveChangesAsync en el contexto del contenedor.
  5. Dentro de SaveChangesAsync, todos los eventos de la lista del objeto de dominio se publican por una instancia de MediatR y se agregan a través del repositorio de eventos al mismo contexto de contenedor.
  6. En SaveChangesAsync, se crea un objeto TransactionalBatch. Contendrá el objeto de contacto y el evento.
  7. Se ejecuta TransactionalBatch y los datos se confirman en Azure Cosmos DB.
  8. SaveChangesAsync y CommitAsync se devuelven correctamente.

Persistencia

Como puede ver en los fragmentos de código anteriores, todos los objetos guardados en Azure Cosmos DB se encapsulan en una instancia de DataObject. Este objeto proporciona propiedades comunes:

  • ID.
  • PartitionKey.
  • Type.
  • State. Al igual que Created, Updated no se conservará en Azure Cosmos DB.
  • Etag. Para el bloqueo optimista.
  • TTL. Propiedad Período de vida para la limpieza automática de documentos antiguos.
  • Data. Objeto de datos genérico.

Estas propiedades se definen en una interfaz genérica a la que se llama IDataObject y que usan los repositorios y el contexto del contenedor:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Los objetos encapsulados en una instancia de DataObject y guardados en la base de datos tendrán un aspecto similar a este ejemplo (Contact y ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

Puede ver que los documentos Contact y ContactNameUpdatedEvent (tipo domainEvent) tienen la misma clave de partición y que ambos documentos se conservarán en la misma partición lógica.

Procesamiento de fuente de cambios

Para leer el flujo de eventos y enviarlos a un agente de mensajes, el servicio usará la fuente de cambios de Azure Cosmos DB.

La fuente de cambios es un registro persistente de cambios en el contenedor. Funciona en segundo plano y realiza un seguimiento de las modificaciones. Dentro de una partición lógica, se garantiza el orden de los cambios. La manera más cómoda de leer la fuente de cambios es usar una función de Azure con un desencadenador de Azure Cosmos DB. Otra opción es usar la biblioteca de procesadores de fuente de cambios. Le permite integrar el procesamiento de la fuente de cambios en la API web como un servicio en segundo plano (a través de la interfaz IHostedService). En el ejemplo siguiente se usa una sencilla aplicación de consola que implementa la clase abstracta BackgroundService para hospedar tareas en segundo plano de ejecución prolongada en aplicaciones de .NET Core.

Para recibir los cambios de la fuente de cambios de Azure Cosmos DB, debe crear una instancia de un objeto ChangeFeedProcessor, registrar un método de controlador para el procesamiento de mensajes y empezar a escuchar los cambios:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

A continuación, un método de controlador (HandleChangesAsync aquí) procesa los mensajes. En este ejemplo, los eventos se publican en un tema de Service Bus que está particionado por escalabilidad y tiene habilitada la característica de desduplicación. Cualquier servicio interesado en los cambios en los objetos Contact puede suscribirse a ese tema de Service Bus y recibir y procesar los cambios para su propio contexto.

Los mensajes del Service Bus generados tienen una propiedad SessionId. Al usar sesiones en Service Bus, se garantiza que se conserva el orden de los mensajes (el primero en entrar es el primero en salir). La conservación del orden es necesaria para este caso de uso.

Este es el fragmento de código que controla los mensajes de la fuente de cambios:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Control de errores

Si se produce un error mientras se procesan los cambios, la biblioteca de la fuente de cambios reiniciará la lectura de mensajes en la posición en la que procesó correctamente el último lote. Por ejemplo, si la aplicación procesó correctamente 10 000 mensajes, ahora está trabajando en el lote 10 001 a 10 025 y se produce un error, puede reiniciar y retomar su trabajo en la posición 10 001. La biblioteca realiza un seguimiento automático de lo que se ha procesado a través de la información guardada en un contenedor Leases en Azure Cosmos DB.

Es posible que el servicio ya haya enviado algunos de los mensajes que se han vuelto a procesar a Service Bus. Normalmente, ese escenario daría lugar a un procesamiento de mensajes duplicado. Como se indicó anteriormente, Service Bus tiene una característica para la detección de mensajes duplicados que debe habilitar para este escenario. El servicio comprueba si ya se ha agregado un mensaje a un tema (o cola) de Service Bus en función de la propiedad MessageId controlada por la aplicación del mensaje. Esa propiedad se establece en el valor ID del documento de evento. Si el mismo mensaje se envía de nuevo a Service Bus, el servicio lo omitirá y lo quitará.

Mantenimiento

En una implementación típica de bandeja de salida transaccional, el servicio actualiza los eventos manipulados y establece una propiedad Processed en true, lo que indica que un mensaje se ha publicado correctamente. Este comportamiento podría implementarse manualmente en el método de controlador. En el escenario actual, no es necesario realizar este proceso. Azure Cosmos DB realiza un seguimiento de los eventos procesados mediante la fuente de cambios (en combinación con el contenedor Leases).

Como último paso, en ocasiones debe eliminar los eventos del contenedor para conservar solo los registros o documentos más recientes. Para realizar una limpieza periódicamente, la implementación aplica otra característica de Azure Cosmos DB: Período de vida (TTL) en los documentos. Azure Cosmos DB puede eliminar automáticamente documentos basados en una propiedad TTL que se puede agregar a un documento: un intervalo de tiempo en segundos. El servicio comprobará constantemente el contenedor en busca de documentos que tengan una propiedad TTL. En cuanto expire un documento, Azure Cosmos DB lo quitará de la base de datos.

Cuando todos los componentes funcionan según lo previsto, los eventos se procesan y publican rápidamente: en cuestión de segundos. Si se produce un error en Azure Cosmos DB, los eventos no se enviarán al bus de mensajes, ya que el objeto empresarial y los eventos correspondientes no se pueden guardar en la base de datos. Lo único que hay que tener en cuenta es establecer un valor TTL adecuado en los documentos DomainEvent cuando el trabajo en segundo plano (procesador de fuente de cambios) o el bus de servicio no están disponibles. En un entorno de producción, es mejor elegir un intervalo de tiempo de varios días. Por ejemplo, 10 días. Todos los componentes implicados tendrán tiempo suficiente para procesar o publicar cambios dentro de la aplicación.

Resumen

El patrón de bandeja de salida transaccional resuelve el problema de la publicación confiable de eventos de dominio en sistemas distribuidos. Al confirmar el estado del objeto empresarial y sus eventos en el mismo lote transaccional y usar un procesador en segundo plano como retransmisión de mensajes, se asegura de que otros servicios, internos o externos, recibirán finalmente la información de la que dependen. Este ejemplo no es una implementación tradicional del patrón de bandeja de salida transaccional. Usa características como la fuente de cambios de Azure Cosmos DB y Período de vida que mantienen las cosas sencillas y ordenadas.

Este es un resumen de los componentes de Azure que se usan en este escenario:

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

Descargue un archivo Visio de esta arquitectura.

Las ventajas de esta solución son:

  • Mensajería confiable y entrega garantizada de eventos.
  • Orden conservado de eventos y desduplicación de mensajes a través de Service Bus.
  • No es necesario mantener una propiedad Processed adicional que indique el procesamiento correcto de un documento de eventos.
  • Eliminación de eventos de Azure Cosmos DB mediante Período de vida (TTL). El proceso no consume las unidades de solicitud necesarias para controlar las solicitudes de usuario o aplicación. En su lugar, usa unidades de solicitud "sobrantes" en una tarea en segundo plano.
  • Procesamiento a prueba de errores de mensajes a través de ChangeFeedProcessor (o una función de Azure).
  • Opcional: varios procesadores de fuente de cambios, cada uno manteniendo su propio puntero en la fuente de cambios.

Consideraciones

La aplicación de ejemplo que se describe en este artículo muestra cómo puede implementar el patrón Bandeja de salida transaccional en Azure con Azure Cosmos DB y Service Bus. También hay otros enfoques que usan bases de datos NoSQL. Para garantizar que el objeto empresarial y los eventos se guardarán de forma confiable en la base de datos, puede insertar la lista de eventos en el documento de objeto empresarial. El inconveniente de este enfoque es que el proceso de limpieza tendrá que actualizar cada documento que contiene eventos. Esto no es ideal, especialmente en términos de costo de unidad de solicitud, en comparación con el uso de TTL.

Tenga en cuenta que no debe considerar el código de ejemplo que se proporciona aquí como código listo para la producción. Tiene algunas limitaciones con respecto al multithreading, especialmente la forma en que se controlan los eventos en la clase DomainEntity y cómo se realiza el seguimiento de los objetos en las implementaciones de CosmosContainerContext. Úselo como punto de partida para sus propias implementaciones. Como alternativa, considere la posibilidad de usar bibliotecas existentes que ya tienen esta funcionalidad integrada, como NServiceBus o MassTransit.

Implementación de este escenario

Puede encontrar el código fuente, los archivos de implementación y las instrucciones para probar este escenario en GitHub: https://github.com/mspnp/transactional-outbox-pattern

Colaboradores

Microsoft mantiene este artículo. Originalmente lo escribieron los siguientes colaboradores.

Autor principal:

Para ver los perfiles no públicos de LinkedIn, inicie sesión en LinkedIn.

Pasos siguientes

Para más información, consulte estos artículos: