Padrão de Caixa de Saída Transacional com o Azure Cosmos DB

Azure Cosmos DB
Barramento de Serviço do Azure
Funções do Azure

Implementar mensagens confiáveis em sistemas distribuídos pode ser um desafio. Este artigo descreve como usar o padrão de caixa de saída transacional para mensagens confiáveis e entrega garantida de eventos, uma parte importante do suporte ao processamento de mensagens idempotentes. Para fazer isso, você usará os lotes transacionais e o feed de alterações do Azure Cosmos DB juntamente com Barramento de Serviço do Azure.

Visão geral

As arquiteturas de microsserviços estão se popularizando cada vez mais e prometem resolver problemas como escalabilidade, capacidade de manutenção e agilidade, especialmente em aplicativos grandes. No entanto, esse padrão de arquitetura também apresenta desafios em relação ao tratamento de dados. Em aplicativos distribuídos, cada serviço mantém de maneira independente os dados necessários para operar em um armazenamento de dados pertencente a um serviço dedicado. Para dar suporte a esse cenário, normalmente você usa uma solução de mensagens, como o RabbitMQ, o Kafka ou o Barramento de Serviço do Azure, que distribui dados (eventos) de um serviço por meio de um barramento de mensagens para outros serviços do aplicativo. Os consumidores internos ou externos podem assinar essas mensagens e ser notificados de alterações assim que os dados são manipulados.

Um exemplo conhecido nessa área é um sistema de ordenação: quando um usuário deseja criar um pedido, um serviço Ordering recebe dados de um aplicativo cliente por meio de um ponto de extremidade REST. Ele mapeia o conteúdo para uma representação interna de um objeto Order para validar os dados. Após fazer commit com êxito no banco de dados, ele publica um evento OrderCreated em um barramento de mensagens. Qualquer outro serviço interessado em novos pedidos (por exemplo, um serviço Inventory ou Invoicing), assinaria mensagens OrderCreated, as processaria e as armazenaria no próprio banco de dados.

O seguinte pseudocódigo mostra a aparência típica desse processo da perspectiva do serviço Ordering:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Essa abordagem funciona bem até que ocorra um erro entre salvar o objeto de pedido (Order) e publicar o evento correspondente. Pode ocorrer falha ao enviar um evento neste momento por vários motivos:

  • Erros de rede
  • Interrupção do serviço de mensagem
  • Falha do host

Seja qual for o erro, o resultado é que o evento OrderCreated não pode ser publicado no barramento de mensagens. Outros serviços não serão notificados de que um pedido foi criado. O serviço Ordering agora precisa cuidar de várias coisas que não estão relacionadas ao processo empresarial real. Ele precisa manter o controle dos eventos que ainda precisam ser colocados no barramento de mensagens assim que ele ficar online novamente. Até mesmo o pior caso pode acontecer: inconsistências de dados no aplicativo devido a eventos perdidos.

Diagram that shows event handling without the Transactional Outbox pattern.

Solução

Há um padrão conhecido chamado Caixa de Saída Transacional que pode ajudar você a evitar essas situações. Ele garante que os eventos sejam salvos em um armazenamento de dados (normalmente em uma tabela da Caixa de Saída em seu banco de dados) antes que eles sejam, por fim, enviados por push para um agente de mensagem. Se o objeto comercial e os eventos correspondentes são salvos na mesma transação de banco de dados, é garantido que nenhum dado será perdido. Tudo será confirmado ou tudo será revertido se houver um erro. Para publicar o evento, um serviço ou processo de trabalho diferente consulta a tabela da Caixa de saída para entradas sem tratamento, publica os eventos e os marca como processados. Esse padrão garante que os eventos não sejam perdidos depois da criação ou modificação de um objeto comercial.

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

Baixe um Arquivo Visio dessa arquitetura.

Em um banco de dados relacional, a implementação do padrão é simples. Se o serviço usar o Entity Framework Core, por exemplo, ele usará um contexto do Entity Framework para criar uma transação de banco de dados, salvar o objeto comercial e o evento e fazer commit da transação (ou uma reversão). Além disso, o serviço de trabalho que está processando eventos é fácil de implementar: ele consulta periodicamente a tabela da Caixa de Saída para novas entradas, publica eventos recém-inseridos no barramento de mensagens e, por fim, marca essas entradas como processadas.

Na prática, as coisas não são tão fáceis quanto parecem. O mais importante é que você precisa garantir que a ordem dos eventos seja preservada para que um evento OrderUpdated não seja publicado antes de um evento OrderCreated.

Implementação no Azure Cosmos DB

Esta seção mostra como implementar o padrão de Caixa de Saída Transacional no Azure Cosmos DB para obter mensagens confiáveis e em ordem entre diferentes serviços com a ajuda do feed de alterações do Azure Cosmos DB e do Barramento de Serviço. Ela demonstra um serviço de exemplo que gerencia objetos Contact (informações FirstName, LastName, Email, Company e assim por diante). Ele usa o padrão CQRS (Separação das Operações de Comando e de Consulta) e segue os conceitos básicos de design baseado em domínio. Você pode encontrar o código de exemplo para a implementação no GitHub.

Um objeto Contact no serviço de exemplo tem a seguinte estrutura:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Assim que um Contact é criado ou atualizado, ele emite eventos que contêm informações sobre a alteração atual. Entre outros tipos, os eventos de domínio podem ser:

  • ContactCreated. Gerados quando um contato é adicionado.
  • ContactNameUpdated. Gerados quando FirstName ou LastName é alterado.
  • ContactEmailUpdated. Gerados quando o endereço de email é atualizado.
  • ContactCompanyUpdated. Gerados quando qualquer uma das propriedades da empresa é alterada.

Lotes transacionais

Para implementar esse padrão, você precisa garantir que o objeto comercial Contact e os eventos correspondentes sejam salvos na mesma transação de banco de dados. No Azure Cosmos DB, as transações funcionam de maneira diferente de sistemas de banco de dados relacional. As transações do Azure Cosmos DB, chamadas de lotes transacionais, operam em uma partição lógica, portanto, garantem propriedades ACID (atomicidade, consistência, isolamento e durabilidade). Você não pode salvar dois documentos em uma operação de lote transacional em diferentes contêineres ou partições lógicas. Para o serviço de exemplo, isso significa que o objeto comercial e o evento ou eventos serão colocados no mesmo contêiner e na mesma partição lógica.

Contexto, repositórios e UnitOfWork

O núcleo da implementação de exemplo é um contexto de contêiner que mantém o controle de objetos que salvos no mesmo lote transacional. Ele mantém uma lista de objetos criados e modificados e opera em um contêiner do Azure Cosmos DB. A interface dele tem esta aparência:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

A lista no componente de contexto do contêiner rastreia Contact e objetos DomainEvent. Os dois serão colocados no mesmo contêiner. Isso significa que vários tipos de objetos são armazenados no mesmo contêiner do Azure Cosmos DB e usam uma propriedade Type para distinguir entre um objeto comercial e um evento.

Para cada tipo, há um repositório dedicado que define e implementa o acesso a dados. A interface do repositório Contact fornece estes métodos:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

O repositório Event é semelhante, exceto que há apenas um método, que cria eventos no repositório:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

As implementações de ambas as interfaces do repositório obterão uma referência por meio da injeção de dependência em uma instância IContainerContext individual para garantir que ambas operem no mesmo contexto do Azure Cosmos DB.

O último componente é UnitOfWork, que faz commit das alterações realizadas na instância IContainerContext para o Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Tratamento de eventos: criação e publicação

Sempre que um objeto Contact é criado, modificado ou sofre uma exclusão temporária, o serviço gera um evento correspondente. O núcleo da solução fornecida é uma combinação de DDD (design baseado em domínio) e o padrão mediador proposto por Jimmy Bogard. Ele sugere manter uma lista de eventos que aconteceram devido a modificações do objeto de domínio e publicar esses eventos antes de salvar o objeto real no banco de dados.

A lista de alterações é mantida no próprio objeto de domínio para que nenhum outro componente possa modificar a cadeia de eventos. O comportamento de manutenção de eventos (instâncias IEvent) no objeto de domínio é definido por meio de uma interface IEventEmitter<IEvent> e implementado em uma classe abstrata DomainEntity:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

O objeto Contact gera eventos de domínio. A entidade Contact segue os conceitos básicos do DDD, configurando os setters das propriedades do domínio como privados. Não existem setters públicos na classe. Em vez disso, ele oferece métodos para manipular o estado interno. Nesses métodos, eventos apropriados para uma determinada modificação (por exemplo, ContactNameUpdated ou ContactEmailUpdated) podem ser gerados.

Veja um exemplo de atualizações para o nome de um contato. (O evento é gerado no final do método.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

O ContactNameUpdatedEvent correspondente, que controla as alterações, tem esta aparência:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Até agora, os eventos são registrados apenas no objeto de domínio e nada é salvo no banco de dados nem mesmo publicado em um agente de mensagem. Seguindo a recomendação, a lista de eventos será processada logo antes de o objeto comercial ser salvo no armazenamento de dados. Nesse caso, isso acontece no método SaveChangesAsync da instância IContainerContext, que é implementado em um método RaiseDomainEvents privado. (dObjs é a lista de entidades rastreadas do contexto do contêiner.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

Na última linha, o pacote MediatR, uma implementação do padrão mediador em C#, é usado para publicar um evento no aplicativo. Isso é possível porque todos os eventos como ContactNameUpdatedEvent implementam a interface INotification do pacote MediatR.

Esses eventos precisam ser processados por um manipulador correspondente. Aqui, a implementação IEventsRepository entra em vigor. Confira o exemplo do manipulador de eventos NameUpdated:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Uma instância IEventRepository é injetada na classe do manipulador por meio do construtor. Assim que um ContactNameUpdatedEvent é publicado no serviço, o método Handle é invocado e usa a instância do repositório de eventos para criar um objeto de notificação. Esse objeto de notificação, por sua vez, é inserido na lista de objetos rastreados no objeto IContainerContext e une os objetos salvos no mesmo lote transacional no Azure Cosmos DB.

Até agora, o contexto do contêiner sabe quais objetos processar. Para persistir os objetos rastreados no Azure Cosmos DB, a implementação IContainerContext cria o lote transacional, adiciona todos os objetos relevantes e executa a operação no banco de dados. O processo descrito é tratado no método SaveInTransactionalBatchAsync, que é invocado pelo método SaveChangesAsync.

Veja as partes importantes da implementação necessárias para criar e executar o lote transacional:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Veja abaixo uma visão geral de como o processo funciona até agora (para atualizar o nome em um objeto de contato):

  1. Um cliente deseja atualizar o nome de um contato. O método SetName é invocado no objeto de contato e as propriedades são atualizadas.
  2. O evento ContactNameUpdated é adicionado à lista de eventos no objeto de domínio.
  3. O método Update do repositório de contatos é invocado, o que adiciona o objeto de domínio ao contexto do contêiner. O objeto agora é rastreado.
  4. CommitAsync é invocado na instância UnitOfWork, que, por sua vez, chama SaveChangesAsync o contexto do contêiner.
  5. Em SaveChangesAsync, todos os eventos na lista do objeto de domínio são publicados por uma instância MediatR e adicionados por meio do repositório de eventos ao mesmo contexto de contêiner.
  6. Em SaveChangesAsync, um TransactionalBatch é criado. Ele conterá o objeto de contato e o evento.
  7. As execuções TransactionalBatch e os dados são confirmados no Azure Cosmos DB.
  8. SaveChangesAsync e CommitAsync retornam com êxito.

Persistência

Como você pode ver nos snippets de código anteriores, todos os objetos salvos no Azure Cosmos DB são encapsulados em uma instância DataObject. Este objeto fornece propriedades comuns:

  • ID.
  • PartitionKey.
  • Type.
  • State. Como Created, Updated não será persistido no Azure Cosmos DB.
  • Etag. Para bloqueio otimista.
  • TTL. Propriedade de vida útil para uma limpeza automática de documentos antigos.
  • Data. Objeto de dados genéricos.

Essas propriedades são definidas em uma interface genérica chamada IDataObject e usadas pelos repositórios e pelo contexto do contêiner:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Os objetos encapsulados em uma instância DataObject e salvos no banco de dados se parecerão com este exemplo (Contact e ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

Você pode ver que os documentos Contact e ContactNameUpdatedEvent (tipo domainEvent) têm a mesma chave de partição e que os dois documentos serão persistidos na mesma partição lógica.

Processamento do feed de alterações

Para ler o fluxo de eventos e enviá-los a um agente de mensagem, o serviço usará o feed de alterações do Azure Cosmos DB.

O feed de alterações é um log persistente de alterações em seu contêiner. Ele funciona em segundo plano e controla as modificações. Em uma partição lógica, a ordem das alterações é garantida. A maneira mais conveniente de ler o feed de alterações é usar uma função do Azure com um gatilho do Azure Cosmos DB. Outra opção é usar a biblioteca de processadores do feed de alterações. Ela permite integrar o processamento do feed de alterações em sua API Web como um serviço em segundo plano (por meio da interface IHostedService). O exemplo a seguir usa um aplicativo de console simples que implementa a classe abstrata BackgroundService para hospedar tarefas em segundo plano de execução prolongada em aplicativos .NET Core.

Para receber as alterações do feed de alterações do Azure Cosmos DB, você precisa criar uma instância de um objeto ChangeFeedProcessor, registrar um método de manipulador para processamento de mensagens e começar a escutar as alterações:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Um método de manipulador (HandleChangesAsync aqui) processa as mensagens. Neste exemplo, os eventos são publicados em um tópico do Barramento de Serviço que é particionado em relação à escalabilidade e tem o recurso de eliminação de duplicação habilitado. Qualquer serviço interessado em alterações em objetos Contact pode então assinar esse tópico do Barramento de Serviço e receber e processar as alterações para um contexto próprio.

As mensagens do Barramento de Serviço produzidas têm uma propriedade SessionId. Ao usar sessões no Barramento de Serviço, você garante que a ordem das mensagens seja preservada (PEPS). A preservação do pedido é necessária para esse caso de uso.

Este é o snippet de código que manipula as mensagens do feed de alterações:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Tratamento de erros

Se houver um erro enquanto as alterações estiverem sendo processadas, a biblioteca de feeds de alterações reiniciará a leitura de mensagens na posição em que processou com êxito o último lote. Por exemplo, se o aplicativo tiver processado com êxito dez mil mensagens e agora estiver trabalhando no lote de 10.001 a 10.025 e ocorrer um erro, ele poderá reiniciar e selecionar o trabalho na posição 10.001. A biblioteca rastreia automaticamente o que foi processado por meio de informações salvas em um contêiner Leases no Azure Cosmos DB.

É possível que o serviço já tenha enviado algumas das mensagens reprocessadas para o Barramento de Serviço. Normalmente, esse cenário levaria a um processamento de mensagens duplicado. Conforme observado anteriormente, o Barramento de Serviço tem um recurso para detecção de mensagens duplicadas que você precisa habilitar para esse cenário. O serviço verifica se uma mensagem já foi adicionada a um tópico (ou a uma fila) do Barramento de Serviço com base na propriedade MessageId da mensagem controlada pelo aplicativo. Essa propriedade é definida como a ID do documento de evento. Se a mesma mensagem for enviada novamente para o Barramento de Serviço, o serviço a ignorará e a removerá.

Manutenção

Em uma implementação de Caixa de Saída Transacional típica, o serviço atualiza os eventos tratados e define uma propriedade Processed como true, indicando que uma mensagem foi publicada com êxito. Esse comportamento pode ser implementado manualmente no método do manipulador. No cenário atual, não há necessidade para tal processo. O Azure Cosmos DB rastreia eventos processados usando o feed de alterações (em combinação com o contêiner Leases).

Como uma última etapa, ocasionalmente você precisa excluir os eventos do contêiner para manter somente os registros/documentos mais recentes. Para fazer uma limpeza periódica, a implementação aplica outro recurso do Azure Cosmos DB: vida útil (TTL) em documentos. O Azure Cosmos DB pode excluir documentos automaticamente com base em uma propriedade TTL que pode ser adicionada a um documento: um período de tempo em segundos. O serviço verificará constantemente o contêiner em busca de documentos que tenham uma propriedade TTL. Assim que um documento expirar, o Azure Cosmos DB o removerá do banco de dados.

Quando todos os componentes funcionam conforme o esperado, os eventos são processados e publicados em questão de segundos. Se houver um erro na Azure Cosmos DB, os eventos não serão enviados para o barramento de mensagens, porque o objeto comercial e os eventos correspondentes não poderão ser salvos no banco de dados. O único ponto a considerar é definir um valor TTL apropriado nos documentos DomainEvent quando o trabalho em segundo plano (processador do feed de alterações) ou o barramento de serviço não estiver disponível. Em um ambiente de produção, é melhor escolher um período de tempo de vários dias. Por exemplo, dez dias. Todos os componentes envolvidos terão tempo suficiente para processar/publicar alterações no aplicativo.

Resumo

O padrão de Caixa de Saída Transacional resolve o problema de publicar confiavelmente eventos de domínio em sistemas distribuídos. Ao confirmar o estado do objeto comercial e dos respectivos eventos no mesmo lote transacional e usar um processador em segundo plano como uma retransmissão de mensagens, você garante que outros serviços, internos ou externos recebam as informações de que dependem. Este exemplo não é uma implementação tradicional do padrão de Caixa de Saída Transacional. Ele usa recursos como o feed de alterações do e a vida útil do Azure Cosmos DB que mantêm tudo bem simples e limpo.

Veja um resumo dos componentes do Azure usados neste cenário:

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

Baixe um Arquivo Visio dessa arquitetura.

As vantagens dessa solução são:

  • Mensagens confiáveis e entrega garantida de eventos.
  • Ordem preservada de eventos e eliminação de duplicação de mensagens por meio do Barramento de Serviço.
  • Não é necessário manter uma propriedade Processed extra que indica o processamento bem-sucedido de um documento de evento.
  • Exclusão de eventos do Azure Cosmos DB por meio de TTL. O processo não consome unidades de solicitação necessárias para lidar com solicitações de usuário/aplicativo. Em vez disso, ele usa unidades de solicitação "restantes" em uma tarefa em segundo plano.
  • Processamento de mensagens à prova de erros por meio de ChangeFeedProcessor (ou uma função do Azure).
  • Opcional: vários processadores de feed de alterações, cada um mantendo o próprio ponteiro no feed de alterações.

Considerações

O aplicativo de exemplo discutido neste artigo demonstra como você pode implementar o padrão de Caixa de Saída Transacional no Azure com o Azure Cosmos DB e o Barramento de Serviço. Também há outras abordagens que usam bancos de dados NoSQL. Para garantir que o objeto comercial e os eventos sejam salvos de maneira confiável no banco de dados, você pode inserir a lista de eventos no documento de objeto comercial. A desvantagem dessa abordagem é que o processo de limpeza precisará atualizar cada documento que contém eventos. Isso não é o ideal, especialmente em termos de custo de unidade de solicitação em comparação com o uso de TTL.

Tenha em mente que você não deve considerar o código de exemplo fornecido aqui como um código pronto para produção. Ele tem algumas limitações em relação a multithreading, especialmente na maneira como os eventos são manipulados na classe DomainEntity e como os objetos CosmosContainerContext são controlados nas implementações. Use-o como ponto de partida para suas implementações. Como alternativa, considere usar bibliotecas existentes que já tenham essa funcionalidade incorporada a elas, como NServiceBus ou MassTransit.

Implantar este cenário

Você pode encontrar o código-fonte, os arquivos de implantação e as instruções para testar esse cenário no GitHub: https://github.com/mspnp/transactional-outbox-pattern.

Colaboradores

Esse artigo é mantido pela Microsoft. Ele foi originalmente escrito pelos colaboradores a seguir.

Autor principal:

Para ver perfis não públicos do LinkedIn, entre no LinkedIn.

Próximas etapas

Leia estes artigos para saber mais: