Orleans 流式处理 API
应用程序通过 API 与流进行交互,这些 API 与众所周知的 .NET 中的反应式扩展 (Rx) 非常相似。 主要差别在于,Orleans 流扩展是异步的,使 Orleans 的分布式和可缩放计算结构中的处理更加高效。
异步流
应用程序首先使用流提供程序来获取流的句柄。 可在此处阅读有关流提供程序的详细信息,但暂时可将其视为流工厂,使实现者能够自定义流的行为和语义:
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
应用程序可以通过调用 Grain.GetStreamProvider 方法(在 grain 内时)或者调用 GrainClient.GetStreamProvider 方法(在客户端时)来获取对流提供程序的引用。
Orleans.Streams.IAsyncStream<T> 是虚拟流的逻辑强类型句柄。 它在本质上与 Orleans grain 引用相似。 对 GetStreamProvider
和 GetStream
的调用纯粹在本地进行。 GetStream
的参数是一个 GUID 和一个称为流命名空间的附加字符串(可为 null)。 该 GUID 和命名空间字符串共同构成了流标识(本质上与 IGrainFactory.GetGrain 的参数相似)。 该 GUID 和命名空间字符串的组合为确定流标识提供了额外的灵活性。 就像 grain 类型 PlayerGrain
内可以存在 grain 7,而 grain 类型 ChatRoomGrain
内可以存在不同的 grain 7 一样,流命名空间 PlayerEventsStream
内可以存在流 123,而流命名空间 ChatRoomMessagesStream
内可以存在一个不同的流 123。
生成和使用
IAsyncStream<T> 实现 IAsyncObserver<T> 和 IAsyncObservable<T> 接口。 应用程序可以使用流通过 Orleans.Streams.IAsyncObserver<T>
将新事件生成到流中,或者使用 Orleans.Streams.IAsyncObservable<T>
来订阅和使用来自流的事件。
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
若要将事件生成到流中,应用程序只需调用
await stream.OnNextAsync<T>(event)
若要订阅流,应用程序可以调用
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
SubscribeAsync 的参数可以是实现 IAsyncObserver<T> 接口的对象,也可以是用于处理传入事件的 lambda 函数的组合。 可通过 AsyncObservableExtensions 类获取 SubscribeAsync
的更多选项。 SubscribeAsync
返回一个 StreamSubscriptionHandle<T>,即,一个不透明的句柄,可用于取消订阅流(在本质上类似于 IDisposable 的异步版本)。
await subscriptionHandle.UnsubscribeAsync()
请务必注意,订阅适用于 grain,而不适用于激活。 在 grain 代码订阅流后,此订阅就会超过此激活的生命周期并且永久保留,直到 grain 代码(可能在不同的激活中)显式取消订阅。 这是虚拟流抽象的核心所在:不仅所有流在逻辑上始终存在,而且流订阅是持久性的,其生存期超过了创建订阅的特定物理激活。
多重性
一个 Orleans 流可以有多个生成者和多个使用者。 生成者发布的消息将传递给在发布消息之前订阅了该流的所有使用者。
此外,使用者可以多次订阅同一个流。 每次订阅时,它都会获取唯一的 StreamSubscriptionHandle<T>。 如果某个 grain(或客户端)订阅同一个流 X 次,则它将收到相同的事件 X 次,每次订阅会接收一个事件。 使用者还可以取消单个订阅。 它可以通过以下调用查找其所有当前订阅:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
从故障中恢复
如果流的生成者消亡(或其 grain 已取消激活),则无需执行任何操作。 下一次当此 grain 需要生成更多事件时,它可以再次获取流句柄,并以相同的方式生成新事件。
使用者逻辑要稍微复杂一些。 如前所述,一旦使用者 grain 订阅了一个流,此订阅在 grain 显式取消订阅之前都是有效的。 如果流的使用者消亡(或其 grain 已取消激活)并且在流上生成了新的事件,则使用者 grain 将自动重新激活(就像任何常规的 Orleans grain 在收到消息时自动激活一样)。 现在,grain 代码只需提供一个 IAsyncObserver<T> 来处理数据。 使用者需要重新附加处理逻辑作为 OnActivateAsync() 方法的一部分。 为此,它可以调用:
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
使用者使用先前在首次订阅时获取的句柄来“恢复处理”。 请注意,ResumeAsync 只会使用 IAsyncObserver
逻辑的新实例来更新现有订阅,而不会更改使用者已订阅此流的事实。
使用者如何获取旧的 subscriptionHandle
? 有两个选项。 使用者可能保存了从原始 SubscribeAsync
操作返回的句柄,如果是这样,则现在可以使用该句柄。 或者,如果使用者没有句柄,则它可通过以下调用向 IAsyncStream<T>
询问其所有活动订阅句柄:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
使用者现在可以恢复所有这些句柄,或者根据需要取消订阅某些句柄。
提示
如果使用者 grain 直接实现 IAsyncObserver<T> 接口 (public class MyGrain<T> : Grain, IAsyncObserver<T>
),则理论上它不需要重新附加 IAsyncObserver
,因此不需要调用 ResumeAsync
。 流运行时应该能够自动确定 grain 已实现 IAsyncObserver
,并只调用这些 IAsyncObserver
方法。 但是,流运行时目前不支持此功能,即使 grain 直接实现 IAsyncObserver
,grain 代码也仍然需要显式调用 ResumeAsync
。
显式和隐式订阅
默认情况下,流使用者必须显式订阅流。 此订阅通常由 grain(或客户端)接收的、指示它订阅的某些外部消息触发。 例如,在聊天服务中,当用户加入聊天室时,其 grain 会收到一条包含聊天名称的 JoinChatGroup
消息,这会导致用户 grain 订阅此聊天流。
此外,Orleans 流还支持隐式订阅。 在此模式下,grain 不会显式订阅流。 此 grain 是基于其 grain 标识和 ImplicitStreamSubscriptionAttribute 自动隐式订阅的。 隐式订阅的主要价值是使流活动能够自动触发 grain 激活(从而触发订阅)。 例如,使用 SMS 流时,如果一个 grain 想要生成流,而另一个 grain 处理该流,则生成者需要知道使用者 grain 的标识,并对其发出 grain 调用,以告知它要订阅流。 只有在完成此过程后,它才开始发送事件。 而使用隐式订阅时,生成者可以开始向流生成事件,使用者 grain 将自动激活并订阅流。 在这种情况下,生成者根本不关心谁正在读取事件
粒度实现 MyGrainType
可以声明 [ImplicitStreamSubscription("MyStreamNamespace")]
属性。 这会告知流运行时,当在标识为 GUID XXX 和命名空间为 "MyStreamNamespace"
的流上生成事件时,应将其传递给标识为 XXX、类型为 MyGrainType
的 grain。 即,运行时将流 <XXX, MyStreamNamespace>
映射到使用者 grain <XXX, MyGrainType>
。
存在 ImplicitStreamSubscription
会导致流运行时自动向流订阅此 grain 并向其传递流事件。 但是,grain 代码仍然需要告知运行时它希望如何处理事件。 本质上,它需要附加 IAsyncObserver
。 因此,当激活 grain 时,OnActivateAsync
内的 grain 代码需要调用:
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
streamProvider.GetStream<T>(streamId);
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
编写订阅逻辑
下面是关于如何为各种情况编写订阅逻辑的指导:显式和隐式订阅、可倒退和不可倒退的流。 显式订阅和隐式订阅之间的主要差别在于,对于隐式订阅,grain 对每个流命名空间始终只有一个隐式订阅;无法创建多个订阅(没有订阅多重性),无法取消订阅,并且 grain 逻辑始终只需要附加处理逻辑。 这也意味着,对于隐式订阅,永远不需要恢复订阅。 另一方面,对于显式订阅,需要恢复订阅,否则如果 grain 再次订阅流,将导致 grain 多次订阅。
隐式订阅:
对于隐式订阅,grain 仍需要订阅以附加处理逻辑。 这可以通过实现 IStreamSubscriptionObserver
和 IAsyncObserver<T>
接口在使用者粒度中实现,从而允许粒度与订阅分开激活。 要订阅流,粒度会创建一个句柄,并在其 OnSubscribed(...)
方法中调用 await handle.ResumeAsync(this)
。
为处理信息,IAsyncObserver<T>.OnNextAsync(...)
方法用于实现接收流数据和序列令牌。 或者,ResumeAsync
方法可以接受一组代表 IAsyncObserver<T>
接口、onNextAsync
、onErrorAsync
和 onCompletedAsync
方法的委托。
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
显式订阅:
对于显式订阅,grain 必须调用 SubscribeAsync
来订阅流。 这会创建一个订阅,并附加处理逻辑。 显式订阅将一直存在到 grain 取消订阅为止,因此,如果取消激活再重新激活 grain,则 grain 仍会显式订阅,但不会附加处理逻辑。 在这种情况下,grain 需要重新附加处理逻辑。 为此,在 OnActivateAsync
中,grain 首先需要通过调用 IAsyncStream<T>.GetAllSubscriptionHandles() 来确定它已有的订阅。 grain 必须在它希望继续处理的每个句柄上执行 ResumeAsync
,或者在它完成的任何句柄上执行 UnsubscribeAsync。 grain 还可以选择性地指定 StreamSequenceToken
作为 ResumeAsync
调用的参数,这会导致此显式订阅从该标记使用数据。
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
流顺序和序列标记
单个生成者和单个使用者之间的事件传递顺序取决于流提供程序。
使用 SMS 时,生成者通过控制其发布事件的方式,来显式控制使用者看到的事件的顺序。 默认情况下(如果 SMS 提供程序的 SimpleMessageStreamProviderOptions.FireAndForgetDelivery 选项设置为 false)如果生成者等待每个 OnNextAsync
调用,则事件将按 FIFO 顺序到达。 在 SMS 中,由生成者决定如何处理传递故障,此故障由 OnNextAsync
调用返回的已中断 Task
来指示。
Azure 队列流不保证 FIFO 顺序,因为基础 Azure 队列不保证在发生故障的情况下的顺序。 (但它们确实能够保证无故障执行中的 FIFO 顺序)。当生成者将事件生成到 Azure 队列时,如果队列操作失败,则由生成者再次尝试排队,然后处理可能重复的消息。 在传递端,Orleans 流运行时将事件从 Azure 队列中取消排队,然后尝试将其传递给使用者进行处理。 只有在成功处理后,Orleans 流运行时才会从队列中删除事件。 如果传递或处理失败,则不会从队列中删除该事件,并且该事件稍后将自动重新出现在队列中。 流运行时将尝试再次传递它,从而可能破坏 FIFO 顺序。 上述行为符合 Azure 队列的一般语义。
应用程序定义的顺序:为了处理上述排序问题,应用程序可以选择指定自己的顺序。 这是通过 StreamSequenceToken 实现的,它是一个不透明的 IComparable 对象,可用于对事件进行排序。 生成者可以将可选的 StreamSequenceToken
传递给 OnNext
调用。 此 StreamSequenceToken
将传递给使用者,并与事件一起传递。 这样,应用程序可以独立于流运行时来推理和重新构造其顺序。
可倒退流
某些流只允许应用程序从最新时间点开始订阅它们,而其他某些流则允许“时光倒流”。 后一项功能依赖于基础队列技术和特定的流提供程序。 例如,Azure 队列只允许使用最新的已排队事件,而事件中心则允许从任意时间点(不超过某个过期时间)重放事件。 支持时光倒流的流称为可倒退流。
可倒退流的使用者可将 StreamSequenceToken
传递给 SubscribeAsync
调用。 运行时将从该 StreamSequenceToken
开始向使用者传递事件。 null 标记表示使用者希望从最新时间点开始接收事件。
在恢复方案中,倒退流的功能非常有用。 例如,假设某个 grain 订阅流并定期检查其状态以及最新序列标记。 当从故障中恢复时,该 grain 可以从最新检查点序列标记重新订阅同一个流,从而实现恢复,且不会丢失自上一个检查点以来生成的任何事件。
事件中心提供程序可倒退。 可以在 GitHub: Orleans/Azure/Orleans.Streaming.EventHubs 上找到其代码。 SMS 和 Azure 队列提供程序不可倒退。
无状态自动横向扩展处理
默认情况下,Orleans 流的目标是支持大量相对较小的流,每个流由一个或多个有状态 grain 处理。 总而言之,所有流的处理在大量常规(有状态)grain 之间分片。 应用程序代码通过分配流 ID 和 grain ID 并通过显式订阅来控制这种分片。 目标是实现分片式有状态处理。
但是,还有一种有趣方案是自动横向扩展的无状态处理。 在此方案中,应用程序具有少量的流(甚至只有一个大流),而目标是实现无状态处理。 例如,在全局事件流中,处理涉及到解码每个事件,并可能将它转发到其他流以进一步进行有状态处理。 在 Orleans 中,可以通过 StatelessWorkerAttribute grain 支持无状态横向扩展流处理。
无状态自动横向扩展处理的当前状态:尚未实现。 尝试订阅来自 StatelessWorker
grain 的流会导致未定义的行为。 我们正在考虑支持此选项。
Grain 和 Orleans 客户端
Orleans 流在 grain 和 Orleans 客户端之间以统一的方式工作。 也就是说,可以在 grain 和 Orleans 客户端中使用相同的 API 来生成和使用事件。 这极大地简化了应用程序逻辑,使特殊的客户端 API(例如 Grain 观察程序)变得多余。
完全托管的可靠流订阅-发布
为了跟踪流订阅,Orleans 使用名为“流订阅-发布”的运行时组件,作为流使用者和流生成者的会合点。 订阅-发布跟踪和保存所有流订阅,并将流使用者与流生成者进行匹配。
应用程序可以选择订阅-发布数据的存储位置和方式。 订阅-发布组件本身实现为 grain(称为 PubSubRendezvousGrain
),这些 grain 使用 Orleans 声明持久性。 PubSubRendezvousGrain
使用名为 PubSubStore
的存储提供程序。 与任何 grain 一样,可为存储提供程序指定实现。 对于流订阅-发布,可以使用 silo 主机生成器在构造 silo 时更改 PubSubStore
的实现:
下面将订阅-发布配置为在 Azure 表中存储其状态。
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
这样,订阅-发布数据将持久存储在 Azure 表中。 对于初始开发,也可以使用内存存储。 除了订阅-发布以外,Orleans 流运行时还会将事件从生成者传递给使用者,管理分配给活跃使用的流的所有运行时资源,并以透明方式从未使用的流中回收运行时资源。
配置
若要使用流,需要通过接收器主机或群集客户端生成器启用流提供程序。 可在此处详细了解流提供程序。 示例流提供程序设置:
hostBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConfigureTableServiceClient("<Secret>")))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");