订阅事件

小窍门

此内容摘自电子书《适用于容器化 .NET 应用程序的 .NET 微服务体系结构》,可以在 .NET Docs 上获取,也可以下载免费的 PDF 以供离线阅读。

适用于容器化 .NET 应用程序的 .NET 微服务体系结构电子书封面缩略图。

使用事件总线的第一步是为微服务订阅它们想要接收的事件。 应在接收方微服务中完成该功能。

以下简单代码显示了启动服务时每个接收方微服务需要实现的内容(即类中 Startup ),以便订阅它所需的事件。 在这种情况下, basket-api 微服务需要订阅 ProductPriceChangedIntegrationEventOrderStartedIntegrationEvent 消息。

例如,订阅 ProductPriceChangedIntegrationEvent 事件时,使购物篮微服务掌握产品价格的任何变动,并在该产品在用户的购物篮中时能够通知用户价格变更。

var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

eventBus.Subscribe<ProductPriceChangedIntegrationEvent,
                   ProductPriceChangedIntegrationEventHandler>();

eventBus.Subscribe<OrderStartedIntegrationEvent,
                   OrderStartedIntegrationEventHandler>();

运行此代码后,订阅者微服务将通过 RabbitMQ 通道侦听。 当 ProductPriceChangedIntegrationEvent 类型的任何消息到达时,代码将调用传递给它的事件处理程序并处理事件。

通过事件总线发布事件

最后,消息发送方(源微服务)使用类似于以下示例的代码发布集成事件。 (此方法是一个不考虑原子性的简化示例。每当必须跨多个微服务传播事件时,都可以实现类似的代码,通常在从源微服务提交数据或事务后立即实现。

首先,事件总线实现对象(基于 RabbitMQ 或基于服务总线)将注入控制器构造函数,如以下代码所示:

[Route("api/v1/[controller]")]
public class CatalogController : ControllerBase
{
    private readonly CatalogContext _context;
    private readonly IOptionsSnapshot<Settings> _settings;
    private readonly IEventBus _eventBus;

    public CatalogController(CatalogContext context,
        IOptionsSnapshot<Settings> settings,
        IEventBus eventBus)
    {
        _context = context;
        _settings = settings;
        _eventBus = eventBus;
    }
    // ...
}

然后你从控制器的方法中调用它,例如在 UpdateProduct 方法中:

[Route("items")]
[HttpPost]
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem product)
{
    var item = await _context.CatalogItems.SingleOrDefaultAsync(
        i => i.Id == product.Id);
    // ...
    if (item.Price != product.Price)
    {
        var oldPrice = item.Price;
        item.Price = product.Price;
        _context.CatalogItems.Update(item);
        var @event = new ProductPriceChangedIntegrationEvent(item.Id,
            item.Price,
            oldPrice);
        // Commit changes in original transaction
        await _context.SaveChangesAsync();
        // Publish integration event to the event bus
        // (RabbitMQ or a service bus underneath)
        _eventBus.Publish(@event);
        // ...
    }
    // ...
}

在这种情况下,由于源微服务是一个简单的 CRUD 微服务,因此该代码将直接放置在 Web API 控制器中。

在更高级的微服务中,例如使用 CQRS 方法时,可以在方法中的CommandHandlerHandle()类中实现它。

设计发布到事件总线时的原子性和复原能力

通过分布式消息传递系统(例如事件总线)发布集成事件时,您面临一个挑战,即确保以原子性方式更新原始数据库并发布事件(要么两个操作都完成,要么两个操作都不完成)。 例如,在前面所示的简化示例中,代码在更改产品价格时向数据库提交数据,然后发布 ProductPriceChangedIntegrationEvent 消息。 最初,这两个操作看起来必须以原子方式执行。 但是,如果使用涉及数据库和消息代理的分布式事务,就像在旧式系统 (如 Microsoft 消息队列(MSMQ)中所做的那样,不建议出于 CAP 定理中所述的原因使用此方法。

基本上,使用微服务生成可缩放且高度可用的系统。 CAP 定理稍微简化一下,表示你无法生成一个(分布式)数据库(或拥有其模型的微服务),该模型持续可用、非常一致 能够容忍任何分区。 必须选择这三个属性中的两个。

在基于微服务的体系结构中,应选择可用性和容忍度,并且应取消强调强一致性。 因此,在大多数基于微服务的应用程序中,通常不希望在消息传递中使用分布式事务,就像在 MSMQ 中基于 Windows 分布式事务协调器 (DTC) 实现分布式事务时一样。

让我们回到初始问题及其示例。 如果服务在更新数据库后崩溃(在本例中,就在代码 _context.SaveChangesAsync()行之后),但在发布集成事件之前,整个系统可能会变得不一致。 此方法可能对业务至关重要,具体取决于您要处理的特定业务操作。

如体系结构部分前面所述,可以使用多种方法来处理此问题:

对于此方案,使用完整的事件溯源(ES)模式是最佳方法之一( 如果不是最佳 方法)。 但是,在许多应用程序方案中,可能无法实现完整的 ES 系统。 ES 表示仅将域事件存储在事务数据库中,而不是存储当前状态数据。 仅存储域事件可以带来很大的好处,例如,系统历史记录可用,并且能够确定系统过去的任何时刻的状态。 但是,实现完整的 ES 系统需要重新构建大部分系统,并引入许多其他复杂性和要求。 例如,建议使用专门为事件溯源设计的数据库,例如 Event Store,或者使用面向文档的数据库,如 Azure Cosmos DB、MongoDB、Cassandra、CouchDB 或 RavenDB。 ES 是此问题的好方法,但不是最简单的解决方案,除非你已经熟悉事件溯源。

使用事务日志挖掘的选项最初看起来是透明的。 但是,若要使用此方法,必须将微服务耦合到 RDBMS 事务日志,例如 SQL Server 事务日志。 这种方法可能不理想。 另一个缺点是事务日志中记录的低级别更新可能与高级集成事件不在同一级别。 如果是这样,反向工程这些事务日志操作的过程会很困难。

平衡方法是事务数据库表和简化的 ES 模式的组合。 可以使用“准备发布事件”等状态,在将事件提交到集成事件表时在原始事件中设置。 然后尝试将事件发布到事件总线。 如果发布事件作成功,则启动源服务中的另一个事务,并将状态从“准备发布事件”移动到“已发布的事件”。

如果事件总线中的发布事件操作失败,源微服务中的数据仍然保持一致,并标记为“准备发布事件”;对于其他服务,数据最终会达到一致性。 你始终可以通过后台作业来检查事务或集成事件的状态。 如果作业找到状态为“准备发布事件”的事件,它可以尝试将该事件重新发布到事件总线。

请注意,使用此方法时,只保留每个源微服务的集成事件,并且只保留要与其他微服务或外部系统通信的事件。 相比之下,在完整的 ES 系统中,你也会存储所有域事件。

因此,这种平衡的方法是简化的 ES 系统。 你需要包含其当前状态的集成事件列表(“准备发布”与“已发布”)。 但只需为集成事件实现这些状态。 在此方法中,无需将所有域数据存储为事务数据库中的事件,就像在完整的 ES 系统中一样。

如果已经在使用关系数据库,则可以使用事务表来存储集成事件。 若要在应用程序中实现原子性,可以使用基于本地事务的过程,该过程包含两个步骤。 基本上,您在包含域实体的同一个数据库中有一个名为 IntegrationEvent 的表。 该表用于保证实现原子性,以便将持久性集成事件添加到提交域数据的相同事务中。

分步,过程如下:

  1. 应用程序开始处理一项本地数据库事务。

  2. 然后,它会更新域实体的状态,并将事件插入集成事件表中。

  3. 最后,它将提交事务,如此即可获得所需的原子性,然后

  4. 你以某种方式发布事件(下一步)。

在实现发布事件的步骤时,可以选择以下选项:

  • 在提交事务后立即发布集成事件,并使用另一个本地事务将表中的事件标记为已发布。 然后,将该表作为一种工具来跟踪集成事件,以便在远程微服务出现问题时,可以根据存储的集成事件执行补偿措施。

  • 将该表用作一种队列。 单独的应用程序线程或进程查询集成事件表,将事件发布到事件总线,然后使用本地事务将事件标记为已发布。

图 6-22 显示了这些方法中的第一个体系结构。

不使用辅助角色微服务发布时的原子性关系图。

图 6-22. 将事件发布到事件总线时的原子性

图 6-22 中所示的方法缺少一个额外的工作微服务,它负责检查和确认已发布集成事件的成功。 如果失败,其他检查器辅助角色微服务可以从表中读取事件并重新发布事件,即重复步骤 2。

关于第二种方法:将 EventLog 表用作队列,并始终使用辅助微服务来发布消息。 在这种情况下,该过程类似于图 6-23 中显示的过程。 这表明了一个新增的微服务,发布事件时使用该表作为唯一来源。

使用辅助角色微服务发布时的原子性关系图。

图 6-23. 使用辅助微服务将事件发布到事件总线时的原子性

为简单起见,eShopOnContainers 示例使用第一种方法(没有附加进程或检查器微服务)和事件总线。 但是,eShopOnContainers 示例未处理所有可能的故障案例。 在部署到云的实际应用程序中,必须接受最终会出现问题的事实,并且必须实现该检查和重新发送逻辑。 如果在通过事件总线发布事件(使用辅助角色)时,该表是单一的事件源,那么,将该表用作队列可能比第一种方法更有效。

通过事件总线发布集成事件时实现原子性

以下代码演示如何创建涉及多个 DbContext 对象的单个事务-一个与要更新的原始数据相关的上下文,以及与 IntegrationEventLog 表相关的第二个上下文。

如果与数据库的连接在代码运行时出现问题,则以下示例代码中的事务将无法复原。 这可能发生在基于云的系统(例如 Azure SQL DB)中,这可能会跨服务器移动数据库。 若要跨多个上下文实现可复原事务,请参阅本指南后面的 “实现可复原 Entity Framework Core SQL 连接 ”部分。

为了清楚起见,以下示例在单个代码片段中显示了整个过程。 但是,eShopOnContainers 实现是重构的,并将此逻辑拆分为多个类,以便更易于维护。

// Update Product from the Catalog microservice
//
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem productToUpdate)
{
  var catalogItem =
       await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id ==
                                                               productToUpdate.Id);
  if (catalogItem == null) return NotFound();

  bool raiseProductPriceChangedEvent = false;
  IntegrationEvent priceChangedEvent = null;

  if (catalogItem.Price != productToUpdate.Price)
          raiseProductPriceChangedEvent = true;

  if (raiseProductPriceChangedEvent) // Create event if price has changed
  {
      var oldPrice = catalogItem.Price;
      priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id,
                                                                  productToUpdate.Price,
                                                                  oldPrice);
  }
  // Update current product
  catalogItem = productToUpdate;

  // Just save the updated product if the Product's Price hasn't changed.
  if (!raiseProductPriceChangedEvent)
  {
      await _catalogContext.SaveChangesAsync();
  }
  else  // Publish to event bus only if product price changed
  {
        // Achieving atomicity between original DB and the IntegrationEventLog
        // with a local transaction
        using (var transaction = _catalogContext.Database.BeginTransaction())
        {
           _catalogContext.CatalogItems.Update(catalogItem);
           await _catalogContext.SaveChangesAsync();

           await _integrationEventLogService.SaveEventAsync(priceChangedEvent);

           transaction.Commit();
        }

      // Publish the integration event through the event bus
      _eventBus.Publish(priceChangedEvent);

      _integrationEventLogService.MarkEventAsPublishedAsync(
                                                priceChangedEvent);
  }

  return Ok();
}

创建 ProductPriceChangedIntegrationEvent 集成事件后,存储原始域操作(更新目录项)的事务还将事件持久化到 EventLog 表中。 这使得它成为一个单一的事务,使您始终能够检查事件消息是否已发送。

通过对同一数据库使用本地事务,可使用原始数据库操作以原子方式更新事件日志表。 如果任何操作失败,则会引发异常,其事务会回滚所有已完成的操作,从而保持域操作与保存到表中的事件消息之间的一致性。

从订阅接收消息:接收方微服务中的事件处理程序

除了事件订阅逻辑,还需要实现集成事件处理程序的内部代码(如回调方法)。 事件处理程序指定接收和处理特定类型的事件消息的位置。

事件处理器首先从事件总线接收到事件实例。 然后,系统会定位与该集成事件相关的待处理组件,并将事件作为状态变化传播和持久化到接收方微服务中。 例如,如果 ProductPriceChanged 事件源自目录微服务,则会在购物篮微服务中进行处理,并更改此接收器篮微服务的状态,如以下代码所示。

namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.EventHandling
{
    public class ProductPriceChangedIntegrationEventHandler :
        IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
    {
        private readonly IBasketRepository _repository;

        public ProductPriceChangedIntegrationEventHandler(
            IBasketRepository repository)
        {
            _repository = repository;
        }

        public async Task Handle(ProductPriceChangedIntegrationEvent @event)
        {
            var userIds = await _repository.GetUsers();
            foreach (var id in userIds)
            {
                var basket = await _repository.GetBasket(id);
                await UpdatePriceInBasketItems(@event.ProductId, @event.NewPrice, basket);
            }
        }

        private async Task UpdatePriceInBasketItems(int productId, decimal newPrice,
            CustomerBasket basket)
        {
            var itemsToUpdate = basket?.Items?.Where(x => int.Parse(x.ProductId) ==
                productId).ToList();
            if (itemsToUpdate != null)
            {
                foreach (var item in itemsToUpdate)
                {
                    if(item.UnitPrice != newPrice)
                    {
                        var originalPrice = item.UnitPrice;
                        item.UnitPrice = newPrice;
                        item.OldUnitPrice = originalPrice;
                    }
                }
                await _repository.UpdateBasket(basket);
            }
        }
    }
}

事件处理程序需要验证产品是否存在于任何篮子实例中。 它还会更新每个相关购物车订单项的商品价格。 最后,它会创建一个警报来向用户显示价格变化,如图 6-24 所示。

浏览器的屏幕截图,其中显示了用户购物车上的价格更改通知。

图 6-24. 根据集成事件传达的信息显示购物车中商品的价格变化

更新消息事件中的幂等性

更新消息事件的一个重要方面是,如果通信在任何时间点发生故障,应重试发送消息。 否则,后台任务可能会尝试发布已发布的事件,从而创建竞争条件。 请确保更新是幂等的,或者它们提供足够的信息来确保你能检测到重复事件、放弃该事件并仅发回一个响应。

如前所述,幂等性意味着一个操作可以多次执行而不改变结果。 在消息传递环境中,比如在传递事件时,如果一个事件可以被传递多次而不改变接收者微服务的结果,则该事件是幂等的。 这可能是必要的,因为事件本身的性质,或因为系统处理事件的方式。 消息幂等性对于任何使用消息传送的应用程序都很重要,而不仅仅是在实现事件总线模式的应用程序中。

幂等运算的一个示例是一个 SQL 语句,仅当该数据不在表中时,才会将数据插入表中。 运行插入 SQL 语句的次数并不重要;结果相同 , 表将包含该数据。 在处理消息时,如果消息可能被多次发送并因此被多次处理,那么这样的幂等性也是必要的。 例如,如果重试逻辑导致发送方多次发送完全相同的消息,则需要确保消息是幂等的。

设计幂等消息是可行的。 例如,可以创建一个事件,该事件显示“将产品价格设置为 $25”,而不是“将 $5 添加到产品价格”。你可以安全地处理第一条消息的次数,结果将相同。 对于第二条消息,情况并非如此。 但即使在第一种情况下,你可能也不想处理第一个事件,因为系统也可能发送了更新的 price-change 事件,你会覆盖掉新价格。

另一个示例可能是传播到多个订阅者的 order-completed 事件。 即使同一订单完成事件存在重复的消息事件,应用也必须确保在其他系统中仅更新一次订单信息。

可以方便地为每个事件提供某种标识,以便可以创建一个逻辑,强制每个接收方只处理一次每个事件。

某些消息处理本质上是幂等的。 例如,如果系统生成图像缩略图,则处理所生成的缩略图的消息数可能无关紧要;结果是生成缩略图,并且每次都相同。 另一方面,诸如调用支付网关来对信用卡收费之类的操作可能完全不幂等。 在这些情况下,需要确保多次处理消息具有预期的效果。

其他资源

删除重复的集成事件消息

可以确保在不同层次上,每个订阅者只发送和处理一次消息事件。 一种方法是使用你正在使用的消息传送基础结构提供的重复数据删除功能。 另一种是在目标微服务中实现自定义逻辑。 在传输级别和应用程序级别进行验证是最佳选择。

在 EventHandler 级别删除重复的消息事件

确保任何接收方只处理一次事件的方法之一是在处理事件处理程序中的消息事件时实现特定逻辑。 例如,在 eShopOnContainers 应用程序中,当收到 集成事件时,您可以在 UserCheckoutAcceptedIntegrationEvent中看到此方法。 (在这种情况下,CreateOrderCommand 会使用 IdentifiedCommand 进行包装,并会先将 eventMsg.RequestId 用作标识符后,再将其发送到命令处理程序)。

使用 RabbitMQ 时删除重复消息

发生间歇性网络故障时,消息可以重复,并且消息接收器必须准备好处理这些重复的消息。 如果可能,接收者应以幂等的方式处理消息,这比使用重复数据删除功能显式处理消息要好。

根据 RabbitMQ 文档,“如果在将某个消息传递给使用者后将其重新排入队列(例如,因为在使用者连接断开之前未确认该消息),RabbitMQ 会在重新传递该消息(无论是传递给相同的使用者还是不同的使用者)时对其设置“已重新传递”标志。

如果设置了“重新传送”标志,则接收方必须考虑到这一点,因为消息可能已经处理过。 但这不能保证:消息在离开消息代理后可能从未到达接收方,可能是因为网络问题。 另一方面,如果未设置“重新传送”标志,则保证消息未多次发送。 因此,只有在消息中设置了“已重新传递”标志时,接收者才需要以幂等方式删除重复消息或处理消息。

其他资源