本部分简要概述了 Orleans Stream 实现。 它描述在应用程序级别不可见的概念和详细信息。 如果你只打算使用流,则不必要阅读本部分。
术语:
我们用“队列”一词来表示任何可以引入流事件,并允许拉取事件或提供基于推送的机制来使用事件的持久存储技术。 通常,为了提供可伸缩性,这些技术提供分片/分区队列。 例如,Azure 队列允许创建多个队列,事件中心具有多个中心。
持久流
所有 Orleans 持久性流提供程序共享一个通用实现 PersistentStreamProvider。 需要使用特定于技术的 IQueueAdapterFactory 来配置这些泛型流提供程序。
例如,出于测试目的,我们有队列适配器生成其测试数据,而不是从队列中读取数据。 下面的代码演示如何配置持久流提供程序以使用我们自定义的(生成器)队列适配器。 为此,该代码使用了用于创建适配器的工厂函数来配置持久流提供程序。
hostBuilder.AddPersistentStreams(
StreamProviderName, GeneratorAdapterFactory.Create);
当流生成者生成新的流项并调用 stream.OnNext()
时, Orleans 流运行时会在该流提供程序的 IQueueAdapter 上调用相应的方法,将该项直接排入相应的队列。
拉取代理
持久流提供程序的核心是拉取代理。 拉取代理从一组持久队列中拉取事件,并将这些事件传递给 grain 中使用这些事件的应用程序代码。 人们可以将拉取代理程式视为一种分布式“微服务”,即一种具有分区特性、高可用性和弹性的分布式组件。 拉取代理在托管应用程序 grain 的同一 silo 内部运行,并由 Orleans 流式处理运行时完全管理。
StreamQueueMapper
和 StreamQueueBalancer
使用 IStreamQueueMapper 和 IStreamQueueBalancer 参数化拉取代理。
IStreamQueueMapper
提供了所有队列的列表,并负责将流映射到队列。 这样,持久化流提供程序的生产者端就知道要将消息排入哪个队列。
IStreamQueueBalancer
表示队列在 Orleans silo 和代理之间的平衡方式。 目标是以均衡的方式将队列分配给代理,以防止瓶颈和支持弹性。 当往Orleans 集群中添加新的 silo 时,队列会在旧 silo 和新 silo 之间自动重新平衡。 该 StreamQueueBalancer
允许自定义这个过程。
Orleans 有多个内置的流队列平衡器,用于支持不同的均衡方案(大量和少量队列)和不同的环境(Azure、本地部署、静态)。
使用上述测试生成器示例,下面的代码演示了如何配置队列映射器和队列均衡器。
hostBuilder
.AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
providerConfigurator =>
providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
ob => ob.Configure(options => options.TotalQueueCount = 8))
.UseDynamicClusterConfigDeploymentBalancer());
上述代码配置 GeneratorAdapterFactory 使用包含 8 个队列的队列映射器,并使用 DynamicClusterConfigDeploymentBalancer 在集群中平衡队列。
拉取协议
每个 silo 运行一组拉取代理,每个代理从一个队列中拉取。 拉取代理本身由名为 SystemTarget 的内部运行时组件实现。 SystemTargets 本质上是运行时 grain,受单线程并发性的约束,可以使用常规 grain 消息传递,并且与 grain 一样精简。 与微粒相比,SystemTargets 不是虚拟的:它们由运行时显式创建,并且不是位置透明的。 通过将拉取代理实现为 SystemTargets,Orleans 流式处理运行时可以依赖于内置 Orleans 功能,并且可以扩展到大量队列,因为创建新的拉取代理与创建新 grain 一样便宜。
每个拉取代理都运行一个定期计时器,该计时器通过调用 IQueueAdapterReceiver.GetQueueMessagesAsync 该方法从队列中拉取。 返回的消息将放入名为IQueueCache的内部按代理划分的数据结构中。 检查每个消息以找出其目标流。 代理使用订阅-发布查找订阅了此流的流使用者列表。 一旦检索到消费者列表,代理会将其存储在本地(在其 pub-sub 缓存中),因此对于每条消息都无需咨询 Pub-Sub。 代理还会订阅订阅-发布以接收订阅该流的任何新使用者的通知。 代理和订阅-发布之间的这种握手保证了强大的流订阅语义:一旦使用者订阅了流,它就会看到订阅后生成的所有事件。 此外,使用 StreamSequenceToken
允许它在过去的时间订阅。
队列缓存
IQueueCache 是一个基于代理的内部数据结构,它允许将新事件从队列中解耦并传递给使用者。 它还允许将传送分离到不同的流和不同的使用者。
假设一个流有 3 个流使用者,其中一个使用者速度缓慢。 如果不小心的话,此速度缓慢的使用者可能会影响代理的进度,减慢该流的其他使用者的使用速度,甚至减慢其他流的取消排队和事件传递速度。 为了防止这一点并允许在代理中实现最大并行度,我们使用 IQueueCache
。
IQueueCache
缓冲流事件,并为代理提供一种方法,以其自身的速度将事件传送给每个使用者。 每个用户的交付通过被称为IQueueCacheCursor的内部组件实现,该组件跟踪每个用户的进度。 这样,每个使用者都能按自身的步调接收事件:速度较快的使用者可以像取消排队时那样快速接收事件,而速度较慢的使用者可以稍后接收事件。 将消息传递到所有使用者后,可以从缓存中删除该消息。
反压力
在Orleans流运行时中,反压作用于两个方面:从队列将流事件传递给代理,以及将事件从代理交付给流消费者。
后者由内置 Orleans 消息传送机制提供。 每个流事件通过标准 Orleans grain 消息传递从代理一次一个地传递给使用者。 也就是说,代理会向每个流使用者发送一个事件(或有限的大小事件批次),并等待此调用。 在解决或破坏前一个事件的任务之前,不会开始传递下一个事件。 这样,我们自然而然就会将基于使用者的传递频率限制为每次传递一条消息。
将流事件从队列引入代理时,Orleans 流式处理提供了新的特殊反压机制。 由于代理将解耦从队列中取消排队事件的操作并将事件传递给使用者,因此单个速度缓慢的使用者的进度可能落后很多,导致 IQueueCache
被填满。 为了防止 IQueueCache
无限期增长,我们将限制其大小(大小限制是可配置的)。 但是,代理永远不会丢弃未传递的事件。
相反,当缓存开始填满时,代理会减慢从队列中取消排队事件的速率。 这样,我们便可以通过调整从队列中使用事件的速率(“反压”)来“度过”缓慢的传递周期,并在稍后恢复到快速使用速率。 为了检测“缓慢传递”的低谷,IQueueCache
使用缓存桶的内部数据结构来跟踪将事件传递给单个流使用者的进度。 这使得系统响应迅速且能够自我调整。