Orleans 流式处理快速入门
本指南介绍如何快速设置和使用 Orleans 流。 若要了解流式处理功能的详细信息,请阅读本文档的其他部分。
所需的配置
在本指南中,我们将使用基于内存的流,该流使用粒度消息传递将流数据发送给订阅者。 你将使用内存中存储提供程序来存储订阅列表。 使用基于内存的机制进行流式传输和存储仅适用于本地开发和测试,不适用于生产环境。
在 silo
为 ISiloBuilder 的接收器中,调用 AddMemoryStreams:
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
在 client
为 IClientBuilder 的群集客户端上,调用 AddMemoryStreams。
client.AddMemoryStreams("StreamProvider");
在本指南中,我们将使用一个简单的基于消息的流,该流使用 grain 消息传递将流数据发送给订阅者。 我们将使用内存中存储提供程序来存储订阅列表,因此对于实际生产应用程序而言这不是一个明智的选择。
在 hostBuilder
为 ISiloHostBuilder
的接收器中,调用 AddSimpleMessageStreamProvider:
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
在 clientBuilder
为 IClientBuilder
的群集客户端上,调用 AddSimpleMessageStreamProvider。
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
注意
默认情况下,通过简单消息流传递的消息被视为不可变,可以通过对其他 grain 的引用来传递。 若要关闭此行为,必须将 SMS 提供程序配置为关闭 SimpleMessageStreamProviderOptions.OptimizeForImmutableData
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
你可以创建流,使用这些流作为生成者来发送数据,并将其用作订阅者来接收数据。
生成事件
为流生成事件是相对比较简单的。 首先应该访问在之前的配置中定义的流提供程序 ("StreamProvider"
),然后选择一个流并将数据推送到其中。
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
为流生成事件是相对比较简单的。 首先应该访问在之前的配置中定义的流提供程序 ("SMSProvider"
),然后选择一个流并将数据推送到其中。
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
可以看到,我们的流有一个 GUID 和一个命名空间。 这样可以轻松识别独特的流。 例如,聊天室的命名空间可以是“Rooms”,而 GUID 可以是拥有者 RoomGrain 的 GUID。
此处我们使用了某个已知聊天室的 GUID。 可以使用流的 OnNextAsync
方法将数据推送到该流。 让我们使用随机数在计时器中执行此操作。 也可以为流使用任何其他数据类型。
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
订阅和接收流数据
要接收数据,可以使用隐式订阅和显式订阅,请参阅显式订阅和隐式订阅以了解更多详细信息。 此示例使用隐式订阅,这样更容易操作。 当 grain 类型想要隐式订阅流时,它将使用属性 [ImplicitStreamSubscription (namespace)]。
本例将定义一个 ReceiverGrain
,如下所示:
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
每当向命名空间 RANDOMDATA
的流推送数据时(就像我们在计时器中所做的那样),ReceiverGrain
类型的、具有相同流 Guid
的 grain 将接收消息。 即使当前不存在 grain 的激活,运行时也会自动创建一个新的激活并向其发送消息。
要实现此目的,我们需要通过设置用于接收数据的 OnNextAsync
方法来完成订阅过程。 为此,ReceiverGrain
应调用 OnActivateAsync
中所示的代码
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
你已完成所有设置! 现在,唯一的要求就是通过某种行为触发生成者 grain 的创建,然后该 grain 将注册计时器并开始向所有相关方发送随机整数。
同样,本指南跳过了很多细节,而只介绍了大致的操作。 请阅读本手册的其他部分和有关 RX 的其他资源,以详细了解可用的功能及其用法。
反应式编程可能是解决许多问题的有效方法。 例如,可以在订阅者中使用 LINQ 来筛选数字和执行各种有趣的操作。