Orleans 流实现细节
本部分简要概述 Orleans 流的实现。 其中介绍了在应用程序级别不可见的概念和详细信息。 如果你只打算使用流,则不必要阅读本部分。
术语:
我们用“队列”一词来表示任何可以引入流事件,并允许拉取事件或提供基于推送的机制来使用事件的持久存储技术。 通常,为了提供可伸缩性,这些技术提供分片/分区队列。 例如,Azure 队列允许创建多个队列,而事件中心具有多个中心。
持久流
所有 Orleans 持久流提供程序共享一个通用实现 PersistentStreamProvider。 需要使用特定于技术的 IQueueAdapterFactory 来配置这些泛型流提供程序。
例如,对于测试,队列适配器可以生成其测试数据,而无需从队列中读取数据。 以下代码演示如何配置持久流提供程序以使用自定义(生成器)队列适配器。 为此,该代码使用了用于创建适配器的工厂函数来配置持久流提供程序。
hostBuilder.AddPersistentStreams(
StreamProviderName, GeneratorAdapterFactory.Create);
当流生成者生成新的流项并调用 stream.OnNext()
时,Orleans 流运行时将调用该流提供程序的 IQueueAdapter 上的相应方法,从而将项直接排入相应的队列。
拉取代理
持久流提供程序的核心是拉取代理。 拉取代理从一组持久队列中拉取事件,并将这些事件传递给 grain 中使用这些事件的应用程序代码。 可将拉取代理视为一种分布式“微服务”– 分区的、高度可用的弹性分布式组件。 拉取代理在托管应用程序 grain 的相同 silo 中运行,并完全由 Orleans 流运行时管理。
StreamQueueMapper
和 StreamQueueBalancer
使用 IStreamQueueMapper 和 IStreamQueueBalancer 参数化拉取代理。 IStreamQueueMapper
提供所有队列的列表,它还负责将流映射到队列。 这样,持久流提供程序的生成者端就知道要将消息排入哪个队列。
IStreamQueueBalancer
表示队列在 Orleans silo 和代理之间的平衡方式。 目标是以平衡的方式将队列分配到代理,以防止出现瓶颈并支持弹性功能。 将新的 silo 添加到 Orleans 群集时,队列会自动在新旧 silo 之间重新平衡。 StreamQueueBalancer
允许自定义该过程。 Orleans 中有多个内置 StreamQueueBalancers 可以支持不同的平衡方案(大量和少量队列)和不同的环境(Azure、本地、静态)。
沿用前面的测试生成器示例,以下代码演示如何配置队列映射器和队列平衡器。
hostBuilder
.AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
providerConfigurator =>
providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
ob => ob.Configure(options => options.TotalQueueCount = 8))
.UseDynamicClusterConfigDeploymentBalancer());
以上代码将 GeneratorAdapterFactory 配置为使用具有八个队列的队列映射器,并使用 DynamicClusterConfigDeploymentBalancer 平衡整个群集中的队列。
拉取协议
每个 silo 运行一组拉取代理,每个代理从一个队列中拉取。 拉取代理本身由名为 SystemTarget 的内部运行时组件实现。 SystemTargets 本质上是运行时 grain,受单线程并发性的约束,可以使用常规 grain 消息传递,并且与 grain 一样精简。 与 grain 相比,SystemTargets 不是虚拟的:它们是(由运行时)显式创建的,并且位置不透明。 通过将拉取代理实现为 SystemTargets,Orleans 流运行时可以依赖内置 Orleans 功能,并且可以扩展到极大量的队列,因为创建新的拉取代理与创建新的 grain 一样便宜。
每个拉取代理运行一个定期计时器,该计时器通过调用 IQueueAdapterReceiver.GetQueueMessagesAsync 方法从队列中拉取。 返回的消息将放入名为 IQueueCache 的基于代理的内部数据结构中。 检查每条消息即可找出其目标流。 代理使用订阅-发布查找订阅了此流的流使用者列表。 检索使用者列表后,代理会将其存储在本地(存储在其订阅-发布缓存中),因此它不需要向订阅-发布查询每条消息。 代理还会订阅订阅-发布以接收订阅该流的任何新使用者的通知。 代理和订阅-发布之间的这种握手保证了强大的流订阅语义:一旦使用者订阅了流,它就会看到订阅后生成的所有事件。 此外,使用 StreamSequenceToken
允许它在过去的时间订阅。
队列缓存
IQueueCache 是一个基于代理的内部数据结构,它允许将新事件从队列中解耦并传递给使用者。 它还允许将传递给不同流和不同使用者的事件解耦。
假设一个流有 3 个流使用者,其中一个使用者速度缓慢。 如果不小心的话,此速度缓慢的使用者可能会影响代理的进度,减慢该流的其他使用者的使用速度,甚至减慢其他流的取消排队和事件传递速度。 为了防止这种情况并在代理中实现最大并行度,我们使用了 IQueueCache
。
IQueueCache
缓冲流事件,并为代理提供一种按自身步调将事件传递给每个使用者的方式。 基于使用者的传递由名为 IQueueCacheCursor 的内部组件实现,该组件跟踪每个使用者的进度。 这样,每个使用者都能按自身的步调接收事件:速度较快的使用者可以像取消排队时那样快速接收事件,而速度较慢的使用者可以稍后接收事件。 将消息传递给所有使用者后,可以从缓存中删除它。
背压
Orleans 流运行时中的反压在两个位置应用:将流事件从队列引入代理,将事件从代理传递给流使用者。
后者由内置的 Orleans 消息传递机制提供。 每个流事件通过标准的 Orleans grain 消息传递逐个地从代理传递给使用者。 也就是说,代理向每个流使用者发送一个事件(或受限大小的事件批)并等待此调用。 在解决或破坏前一个事件的任务之前,不会开始传递下一个事件。 这样,我们自然而然就会将基于使用者的传递频率限制为每次传递一条消息。
Orleans 流提供新的特殊反压机制用于将流事件从队列引入代理。 由于代理将解耦从队列中取消排队事件的操作并将事件传递给使用者,因此单个速度缓慢的使用者的进度可能落后很多,导致 IQueueCache
被填满。 为了防止 IQueueCache
无限增长,我们限制了其大小(大小限制可配置)。 但是,代理永远不会丢弃未传递的事件。
相反,当缓存开始填满时,代理会减慢从队列中取消排队事件的速率。 这样,我们便可以通过调整从队列中使用事件的速率(“反压”)来“度过”缓慢的传递周期,并在稍后恢复到快速使用速率。 为了检测“缓慢传递”的低谷,IQueueCache
使用缓存桶的内部数据结构来跟踪将事件传递给单个流使用者的进度。 这样就构建了一个响应速度很快且能自我调整的系统。