如前所述,使用 基于事件的通信时, 微服务 会在发生值得注意的情况(例如更新业务实体时)发布事件。 其他微服务订阅这些事件。 当微服务收到事件时,它可以更新其自己的业务实体,这可能会导致发布更多事件。 这是最终一致性概念的本质。 此 发布/订阅 系统通常通过使用事件总线的实现来执行。 事件总线可以设计为一个接口,其中包含订阅和取消订阅事件和发布事件所需的 API。 它还可以根据任何进程间或消息通信(例如消息队列或服务总线)具有一个或多个实现,这些实现支持异步通信和发布/订阅模型。
可以使用事件来实现跨多个服务的业务事务,从而在这些服务之间实现最终一致性。 最终一致事务由一系列分布式操作组成。 在每次操作中,微服务更新业务实体,并发布事件以触发下一步操作。 请注意,事务不跨越底层持久性和事件总线,因此需要处理幂等性。 下面的图 6-18 显示了通过事件总线发布了 PriceUpdated 事件,因此价格更新传播到购物篮和其他微服务。
图 6-18. 基于事件总线的事件驱动的通信
本部分介绍如何使用泛型事件总线接口实现此类与 .NET 的通信,如图 6-18 所示。 有多个潜在的实现,每个实现使用不同的技术或基础结构,例如 RabbitMQ、Azure 服务总线或任何其他第三方开源或商业服务总线。
将消息代理和服务总线用于生产系统
如体系结构部分所述,可以从多个消息传递技术中进行选择,以实现抽象事件总线。 但这些技术处于不同的水平。 例如,RabbitMQ 作为消息代理传输系统,处于较低层次,相比于 Azure 服务总线、NServiceBus、MassTransit 或 Brighter 等商业产品。 大多数这些产品都可以在 RabbitMQ 或 Azure 服务总线的基础上工作。 选择产品取决于应用程序需要多少功能以及多少无需额外设置的可扩展性。
为了在开发环境中实现事件总线概念验证,如 eShopOnContainers示例所示, 基于作为容器运行的RabbitMQ之上的简单实现可能就足够了。 但是,对于需要高可伸缩性的任务关键型系统和生产系统,可能需要评估和使用 Azure 服务总线。
如果您需要通过 Sagas 这样的高级抽象和更丰富的功能来简化长时运行流程,提高分布式开发的效率,那么其他商业和开源服务总线(如 NServiceBus、MassTransit 和 Brighter)都值得评估。 在这种情况下,要使用的抽象和 API 通常直接由这些高级服务总线提供,而不是你自己的抽象(例如 eShopOnContainers 中提供的简单事件总线抽象)。 为此,可研究使用 NServiceBus 的分叉 eShopOnContainer(由特定软件实现的其他派生示例)。
当然,你可以在 RabbitMQ 和 Docker 等较低级别的技术上构建自己的服务总线功能,但“重新发明轮子”所需的工作对于自定义企业应用程序来说可能会过于昂贵。
要重申:eShopOnContainers 示例中展示的示例事件总线抽象和实现仅用于概念证明。 确定要进行异步通信和事件驱动的通信后,应选择最适合生产需求的服务总线产品。
集成事件
集成事件用于跨多个微服务或外部系统同步域状态。 此功能是通过在微服务外部发布集成事件来完成的。 当事件发布到多个接收方微服务(与订阅集成事件一样多的微服务)时,每个接收方微服务中的相应事件处理程序将处理该事件。
集成事件基本上是一个数据保存类,如以下示例所示:
public class ProductPriceChangedIntegrationEvent : IntegrationEvent
{
public int ProductId { get; private set; }
public decimal NewPrice { get; private set; }
public decimal OldPrice { get; private set; }
public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice,
decimal oldPrice)
{
ProductId = productId;
NewPrice = newPrice;
OldPrice = oldPrice;
}
}
集成事件可以在每个微服务的应用程序级别定义,因此它们与其他微服务分离,其方式与在服务器和客户端中定义 ViewModel 的方式相当。 不建议在多个微服务之间共享通用集成事件库;这样做是将这些微服务与单个事件定义数据库耦合。 由于不想跨多个微服务共享通用域模型的原因,你不希望这样做:微服务必须完全自主。 有关详细信息,请参阅此博客文章要放入事件的数据量。 请注意不要差太多,因为其他博客文章描述了数据不足的消息可能会产生的问题。 你的活动设计应该准确符合消费者的需求,做到“恰到好处”。
只应跨微服务共享少数类型的库。 其中一种库是最终应用程序块,如 eShopOnContainer 中的事件总线客户端 API。 另一个是构成也可以作为 NuGet 组件共享的工具的库,如 JSON 序列化程序。
事件总线
事件总线允许在微服务之间发布/订阅样式的通信,而无需组件显式了解彼此,如图 6-19 所示。
图 6-19. 事件总线的发布/订阅基础知识
上图显示微服务 A 发布消息到事件总线,该总线将其分发给订阅的微服务 B 和 C,而发布者无需知道订阅者。 事件总线与观察者模式和发布-订阅模式相关。
观察者模式
在 观察者模式中,主要对象(称为可观察对象)会向其他感兴趣的对象(称为观察者)通知相关信息(事件)。
发布/订阅(Pub/Sub)模式
发布/订阅模式的用途与观察者模式相同:希望在发生某些事件时通知其他服务。 但是观察者模式和 Pub/Sub 模式之间存在一个重要区别。 在观察者模式中,信息传播直接从可观察对象传递到观察者们,因此他们彼此“了解”。 但是,使用 Pub/Sub 模式时,有第三个组件称为中转站或消息代理或事件总线,发布者和订阅者都知道该组件。 因此,使用发布/订阅模式时,发布服务器和订阅服务器通过所述的事件总线或消息中转站精确分离。
中转站或事件总线
如何在发布者和订阅者之间实现匿名? 一种简单的方法是让中间人负责所有沟通。 事件总线是一个这样的中转站。
事件总线通常由两部分组成:
抽象或接口。
一个或多个实现。
在图 6-19 中,从应用程序角度看,会发现事件总线实际上是一个发布/订阅通道。 实现此异步通信的方式可能有所不同。 它可以有多个实现,以便可以根据环境要求(例如生产环境与开发环境)在它们之间进行交换。
在图 6-20 中,可以看到基于 RabbitMQ、Azure 服务总线或其他事件/消息代理等基础结构消息传送技术的多个实现的事件总线抽象。
图 6- 20. 事件总线的多个实现
最好通过接口定义事件总线,以便可以使用 RabbitMQ、Azure 服务总线等多种技术实现事件总线。 不过,正如之前所提到的,只有在需要您自己的抽象支持的基本事件总线功能时,使用自己的抽象(事件总线接口)才是好的。 如果需要更丰富的服务总线功能,则应使用首选商业服务总线提供的 API 和抽象,而不是自己的抽象。
定义事件总线接口
让我们从事件总线接口的一些实现代码以及可能的实现方式开始,以供探索之用。 接口应是通用且简单的,如以下接口所示。
public interface IEventBus
{
void Publish(IntegrationEvent @event);
void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
void SubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
void UnsubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
void Unsubscribe<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
}
该方法 Publish
非常简单。 事件总线会将接收到的集成事件广播到所有订阅该事件的微服务或外部应用程序。 此方法由发布事件的微服务使用。
Subscribe
这些方法(根据参数可以有多个实现)由想要接收事件的微服务使用。 此方法有两个参数。 第一个是要订阅的集成事件 (IntegrationEvent
)。 第二个参数是集成事件处理程序(或回调方法),命名 IIntegrationEventHandler<T>
为在接收方微服务获取该集成事件消息时执行。
其他资源
一些适用于生产的消息传送解决方案:
Azure 服务总线
https://learn.microsoft.com/azure/service-bus-messaging/NServiceBus
https://particular.net/nservicebusMassTransit
https://masstransit-project.com/