你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure Cosmos DB 的事务性发件箱模式

Azure Cosmos DB
Azure 服务总线
Azure Functions

在分布式系统中实现可靠的消息传递可能会很困难。 本文介绍如何使用事务发件箱模式可靠地传送消息和确保事件的正常送达,这对于幂等消息处理支持很重要。 为实现此目的,你需要将 Azure Cosmos DB 的事务性批处理、更改源与 Azure 服务总线结合使用。

概述

微服务体系结构日益普及,并彰显了解决可伸缩性、可维护性和敏捷性(尤其对于大型应用程序)等问题的承诺。 但此体系结构模式也在数据处理方面带来了挑战。 在分布式应用程序中,每项服务独立维护在专用服务拥有的数据存储中操作所需的数据。 要支持此类方案,通常使用 RabbitMQ、Kafka 或 Azure 服务总线等消息传递解决方案,该解决方案通过消息传递总线将数据(事件)从一项服务传递到应用程序的其他服务。 然后,内部和外部使用者可以订阅这些消息,在操作数据后立即获得更改通知。

订购系统是该领域的一个众所周知的示例:当用户想要创建订单时,Ordering 服务通过 REST 终结点从客户端应用程序接收数据。 它将有效负载映射到 Order 对象的内部表示形式以验证数据。 成功提交到数据库后,它会将 OrderCreated 事件发布到消息总线。 对新订单感兴趣的任何其他服务(例如 InventoryInvoicing 服务)会订阅 OrderCreated 消息、处理消息,然后将它们存储在其自己的数据库中。

以下伪代码演示通常如何从 Ordering 角度看待此过程:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

此方法在保存订单对象并发布相应事件之间出现错误之前有效。 出于多种原因,发送事件可能失败:

  • 网络错误
  • 消息服务中断
  • 主机故障

无论出现哪种错误,都会导致无法将 OrderCreated 事件发布到消息总线。 其他服务不会收到订单已创建的通知。 现在,该 Ordering 服务必须处理与实际业务流程无关的各种问题。 它需要跟踪事件,这些事件在消息总线恢复联机后仍需要放在消息总线上。 即使最坏的情况也可能发生:由于事件丢失,导致应用程序中的数据不一致。

显示不带事务性发件箱模式的事件处理的关系图。

解决方案

有一种称为“事务性发件箱” 的已知模式,可帮助你避免这些情况。 它确保将事件保存在数据存储(通常在数据库的发件箱表中),然后再最终推送到消息代理。 如果业务对象和相应的事件保存在同一数据库事务中,则保证不会丢失任何数据。 所有内容都将被提交,或在出错时所有内容回滚。 为了最终发布事件,其他服务或工作进程会在发件箱表中查询未处理项,发布这些事件,并将其标记为已处理。 此模式可确保创建或修改业务对象后不会丢失事件。

该图显示了事务性发件箱模式的事件处理和用于将事件发布到消息代理的中继服务。

下载此体系结构的 Visio 文件

在关系数据库中,模式的实现非常简单。 例如,如果服务使用 Entity Framework Core,它将使用实体框架上下文来创建数据库事务、保存业务对象和事件,并提交事务或执行回滚。 此外,处理事件的辅助角色服务易于实现:它会定期查询发件箱表中的新条目,将新插入的事件发布到消息总线,最后将这些项标记为已处理。

实际上,事情并不像它们一眼看到的那么简单。 最重要的是,你需要确保事件的顺序保持不变,以便 OrderUpdated 事件不会在 OrderCreated 事件之前发布。

Azure Cosmos DB 中的实现

本部分说明如何在 Azure Cosmos DB 中实现事务性发件箱模式,以便在不同服务之间实现可靠的按顺序消息传递,以及 Azure Cosmos DB 更改源和服务总线。 它演示了管理 Contact 对象(FirstNameLastNameEmailCompany 信息等)的示例服务。 它使用命令和查询责任分离模式 (CQRS),并遵循基本的域驱动设计 (DDD) 概念。 可以在 GitHub 上找到实现的示例代码。

示例服务中的 Contact 对象采用以下结构:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Contact 一旦创建或更新,它就会发出包含有关当前更改的信息的事件。 除此之外,域事件可以是:

  • ContactCreated。 添加联系人时引发。
  • ContactNameUpdated。 当 FirstNameLastName 发生更改时引发。
  • ContactEmailUpdated。 在更新电子邮件地址时引发。
  • ContactCompanyUpdated。 任何公司属性更改时引发。

事务性批处理

要实现此模式,需要确保将 Contact 业务对象和相应的事件保存在同一数据库事务中。 在 Azure Cosmos DB 中,事务的工作方式与关系数据库系统中不同。 Azure Cosmos DB 事务(称为“事务性批处理”)对单个逻辑分区执行操作,因此它们保证 ACID 属性的原子性、一致性、隔离性、持久性。 不能将两个文档保存在不同容器或逻辑分区中的事务性批处理操作中。 对于示例服务,这意味着业务对象和事件将放入同一容器和逻辑分区中。

上下文、存储库和 UnitOfWork

示例实现的核心是一个容器上下文,用于跟踪保存在同一事务性批处理中的对象。 它维护已创建和已修改对象的列表,并且对单个 Azure Cosmos DB 容器执行操作。 它的接口如下所示:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

容器上下文组件中的列表跟踪 ContactDomainEvent 对象。 两者将放入同一容器中。 这意味着,在同一 Azure Cosmos DB 容器中存储了多个类型的对象,并使用 Type 属性来区分业务对象和事件。

对于每个类型,都有一个用于定义和实现数据访问的专用存储库。 Contact 存储库接口提供以下方法:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Event 存储库看起来类似,只不过只有一个方法在存储中创建新事件:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

这两个存储库接口的实现通过依赖关系注入到单个 IContainerContext 实例获取一个引用,以确保这两个操作在同一 Azure Cosmos DB 上下文中运行。

最后一个组件是 UnitOfWork,该组件将 IContainerContext 实例中保存的更改提交到 Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

事件处理:创建和发布

每次创建、修改或(软)删除 Contact 对象时,服务都会引发相应的事件。 提供的解决方案的核心是域驱动设计 (DDD) 和由 Jimmy Bogard 建议的中介者模式的组合。 如果要在将实际对象保存到数据库之前修改域对象并发布这些事件,则建议维护一系列事件。

更改列表保存在域对象本身中,以便其他组件无法修改事件链。 维护域对象中事件(IEvent 实例)的行为由接口 IEventEmitter<IEvent> 定义,并在抽象 DomainEntity 类中实现:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Contact 对象引发域事件。 Contact 实体遵循基本的 DDD 概念,将域属性的 setter 配置为专用。 类中不存在公共 setter。 相反,它提供用于操作内部状态的方法。 在这些方法中,可以引发特定修改的事件(例如 ContactNameUpdatedContactEmailUpdated)。

下面是联系人姓名更新的示例。 (在方法末尾引发此事件。)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

跟踪更改的相应 ContactNameUpdatedEvent 如下所示:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

到目前为止,事件仅记录在域对象中,而不会将任何内容保存到数据库,甚至不会发布到消息代理。 按照此建议,在将业务对象保存到数据存储区之前,将立即处理事件列表。 在这种情况下,它发生在 IContainerContext 实例的 SaveChangesAsync 方法中,该实例在专用 RaiseDomainEvents 方法中实现。 (dObjs 是容器上下文的被跟踪实体的列表。)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

在最后一行中,MediatR 包是 C# 中介者模式的实现,用于在应用程序中发布事件。 这样做是有可行的,因为所有事件(如 ContactNameUpdatedEvent )都实现 MediatR 包的 INotification 接口。

这些事件需要由相应的处理程序处理。 在这里,IEventsRepository 实现开始发挥作用。 下面是 NameUpdated 事件处理程序示例:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

IEventRepository 实例通过构造函数注入到处理程序类中。 一旦在服务中发布 ContactNameUpdatedEvent,将调用 Handle 方法,并使用事件存储库实例创建通知对象。 反过来,该通知对象会插入 IContainerContext 对象的跟踪对象列表中,并将保存在同一事务性批处理中的对象联接到 Azure Cosmos DB。

到目前为止,容器上下文知道要处理的对象。 要最终将跟踪对象保留到 Azure Cosmos DB 中,IContainerContext 实现将创建事性务批处理、添加所有相关对象,并对数据库运行操作。 描述的过程在 SaveInTransactionalBatchAsync 方法中进行处理,该方法由 SaveChangesAsync 方法调用。

下面是创建和运行事务性批处理所需的实现的重要部分:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects);

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

下面概述了此过程的工作原理(更新联系人对象上的姓名):

  1. 客户想要更新联系人的姓名。 在联系人对象上调用 SetName 方法,并更新属性。
  2. ContactNameUpdated 事件将添加到域对象的事件列表中。
  3. 调用联系人存储库的 Update 方法,这会将域对象添加到容器上下文中。 此时将跟踪该对象。
  4. UnitOfWork 实例调用 CommitAsync 后,后者又在容器上下文中调用 SaveChangesAsync
  5. SaveChangesAsync 中,域对象列表中的所有事件都由 MediatR 实例发布,并通过事件存储库添加到相同的容器上下文中。
  6. SaveChangesAsync 中创建了 TransactionalBatch。 它将同时包含联系人对象和事件。
  7. TransactionalBatch 运行并将数据提交到 Azure Cosmos DB。
  8. 成功返回 SaveChangesAsyncCommitAsync

持久性

如前面的代码片段中所示,保存到 Azure Cosmos DB 的所有对象都包装在一个 DataObject 实例中。 此对象提供通用属性:

  • ID
  • PartitionKey
  • Type
  • State。 与 Created 一样,Updated 不会保留在 Azure Cosmos DB 中。
  • Etag。 用于乐观锁定
  • TTL。 自动清理旧文档的生存时间属性。
  • Data。 泛型数据对象。

这些属性在名为 IDataObject 的泛型接口中定义,由存储库和容器上下文使用:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

包装在 DataObject 实例中并保存到数据库中的对象将如以下示例所示(ContactContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

你可以看到 ContactContactNameUpdatedEvent(类型 domainEvent)文档具有相同的分区键,并且这两个文档将保留在同一逻辑分区中。

更改源处理

要读取事件流并将其发送到消息代理,该服务将使用 Azure Cosmos DB 更改源

更改源是容器中更改的持久日志。 它在后台运行并跟踪修改。 在一个逻辑分区中,可保证更改顺序。 读取更改源的最便捷方法是将 Azure 函数与 Azure Cosmos DB 触发器一起使用。 另一种方法是使用更改源处理器库。 允许你将 Web API 中的更改源处理集成为(通过 IHostedService 接口)的后台服务。 此处的示例使用了一个简单的控制台应用程序,该应用程序实现抽象类 BackgroundService 以托管 .NET Core 应用程序中长期运行的后台任务。

要接收来自 Azure Cosmos DB 更改源的更改,需要实例化 ChangeFeedProcessor 对象,为消息处理注册处理程序方法,并开始侦听更改:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

处理程序方法(此处的 HandleChangesAsync)然后会处理消息。 在此示例中,事件发布到已分区以实现可伸缩性并已启用重复数据删除功能的服务总线主题。 任何对 Contact 对象更改感兴趣的服务都可以订阅该服务总线主题,接收并处理其自身上下文的更改。

生成的服务总线消息具有 SessionId 属性。 在服务总线中使用会话时,可保证保留消息顺序(先进先出 (FIFO))。 对于此用例,必须保留顺序。

以下代码片段将处理来自更改源的消息:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

错误处理

如果在处理更改时出现错误,更改源库将在成功处理最后一个批的位置重新开始读取消息。 例如,如果应用程序已成功处理 10,000 条消息,现在正在处理 10,001 到 10,025 批,其中发生了错误,它可以重新启动并在 10,001 位置处继续处理。 通过保存在 Azure Cosmos DB 中 Leases 容器内的信息,该库可以自动跟踪已处理的内容。

服务可能将已重新处理的某些消息发送到服务总线。 通常情况下,这种情况会导致重复处理消息。 如前文所述,服务总线具有重复消息检测功能,你可能需要为此方案启用该功能。 服务将检查是否已基于该消息的应用程序控制 MessageId 属性将消息添加到服务总线主题(或队列)中。 该属性设置为事件文档的 ID。 如果再次将同一消息发送到服务总线,则该服务将忽略并删除该消息。

保养工作

在典型的事务性发件箱实现中,服务更新已处理事件,并将 Processed 属性设置为 true,指示消息已成功发布。 可以在处理程序方法中手动实现此行为。 在当前情况下,无需执行此类流程。 Azure Cosmos DB 通过使用更改源(与 Leases 容器结合使用)来跟踪已处理的事件。

在最后一步中,有时需要删除容器中的事件,以便只保留最新记录/文档。 为定期执行清理,实现将应用 Azure Cosmos DB 的另一项功能:文档生存时间 (TTL)。 Azure Cosmos DB 可以根据可添加到文档的 TTL 属性自动删除文档:时间跨度(以秒为单位)。 服务将不断检查容器中是否存在具有 TTL 属性的文档。 文档过期后,Azure Cosmos DB 将从数据库中删除该文档。

当所有组件按预期运行时,系统将快速处理并发布事件:数秒之内。 如果 Azure Cosmos DB 中存在错误,则不会将事件发送到消息总线,因为不能将业务对象和相应事件保存到数据库中。 唯一需要注意一点,当后台辅助角色(更改源处理器)或服务总线不可用时,应在 TTL 文档上设置适当的 DomainEvent 值。 在生产环境中,最好选择多天的时间跨度。 例如,10 天。 所有相关组件都将有充分的时间来处理/发布应用程序中的更改。

总结

事务性发件箱模式解决了在分布式系统中可靠地发布域事件的问题。 通过在同一事务性批处理中提交业务对象的状态及其事件,并使用后台处理器作为消息中继,你可以确保其他服务(内部或外部)最终会收到它们所依赖的信息。 此示例并非传统的事务性发件箱模式实现。 它使用 Azure Cosmos DB 更改源和生存时间等功能,让一切简单明了。

下面是此方案中使用的 Azure 组件摘要:

该图显示了用于使用 Azure Cosmos DB 和 Azure 服务总线实现事务性发件箱的 Azure 组件。

下载此体系结构的 Visio 文件

此解决方案的优点是:

  • 可靠的消息传送,且保证传递事件。
  • 通过服务总线保留事件顺序,消除重复消息。
  • 无需保留额外的 Processed 属性来指示已成功处理事件文档。
  • 通过生存时间 (TTL) 从 Azure Cosmos DB 中删除事件。 此过程不会使用处理用户/应用程序请求所需的请求单位。 相反,它会使用后台任务中的“剩余”请求单位。
  • 通过 ChangeFeedProcessor(或 Azure 函数)对消息进行防错验证处理。
  • 可选:多个更改源处理器,每个处理器都在更改源中维护自己的指针。

注意事项

本文讨论的示例应用程序演示了如何在 Azure 上利用 Azure Cosmos DB 和服务总线实现事务性发件箱模式。 还有其他使用 NoSQL 数据库的方法。 为了保证业务对象和事件能够可靠地保存在数据库中,你可以在业务对象文档中嵌入事件列表。 此方法的缺点在于,清理过程需要更新每个包含事件的文档。 这与使用 TTL 相比并不理想,尤其表现在请求单位成本方面。

请记住,你不应将此处提供的示例代码作为生产就绪代码。 它对于多线程处理存在一些限制,尤其是在 DomainEntity 类中处理事件的方式,以及如何在 CosmosContainerContext 实现中跟踪对象。 将其用作自己实现的起点。 或者,请考虑使用已内置此功能的现有库,例如 NServiceBusMassTransit

部署此方案

你可以在 GitHub 上查找源代码、部署文件和测试此方案的说明:https://github.com/mspnp/transactional-outbox-pattern

作者

本文由 Microsoft 维护, 它最初是由以下贡献者撰写的。

主要作者:

要查看非公开的 LinkedIn 个人资料,请登录到 LinkedIn。

后续步骤

请查看以下文章了解详细信息: