Orleans 流式处理快速入门

本指南介绍如何快速设置和使用 Orleans 流。 若要了解流式处理功能的详细信息,请阅读本文档的其他部分。

所需的配置

在本指南中,我们将使用基于内存的流,该流使用粒度消息传递将流数据发送给订阅者。 你将使用内存中存储提供程序来存储订阅列表。 使用基于内存的机制进行流式传输和存储仅适用于本地开发和测试,不适用于生产环境。

siloISiloBuilder 的接收器中,调用 AddMemoryStreams

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

clientIClientBuilder 的群集客户端上,调用 AddMemoryStreams

client.AddMemoryStreams("StreamProvider");

在本指南中,我们将使用一个简单的基于消息的流,该流使用 grain 消息传递将流数据发送给订阅者。 我们将使用内存中存储提供程序来存储订阅列表,因此对于实际生产应用程序而言这不是一个明智的选择。

hostBuilderISiloHostBuilder 的接收器中,调用 AddSimpleMessageStreamProvider

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

clientBuilderIClientBuilder 的群集客户端上,调用 AddSimpleMessageStreamProvider

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

注意

默认情况下,通过简单消息流传递的消息被视为不可变,可以通过对其他 grain 的引用来传递。 若要关闭此行为,必须将 SMS 提供程序配置为关闭 SimpleMessageStreamProviderOptions.OptimizeForImmutableData

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

你可以创建流,使用这些流作为生成者来发送数据,并将其用作订阅者来接收数据。

生成事件

为流生成事件是相对比较简单的。 首先应该访问在之前的配置中定义的流提供程序 ("StreamProvider"),然后选择一个流并将数据推送到其中。

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

为流生成事件是相对比较简单的。 首先应该访问在之前的配置中定义的流提供程序 ("SMSProvider"),然后选择一个流并将数据推送到其中。

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

可以看到,我们的流有一个 GUID 和一个命名空间。 这样可以轻松识别独特的流。 例如,聊天室的命名空间可以是“Rooms”,而 GUID 可以是拥有者 RoomGrain 的 GUID。

此处我们使用了某个已知聊天室的 GUID。 可以使用流的 OnNextAsync 方法将数据推送到该流。 让我们使用随机数在计时器中执行此操作。 也可以为流使用任何其他数据类型。

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

订阅和接收流数据

要接收数据,可以使用隐式订阅和显式订阅,请参阅显式订阅和隐式订阅以了解更多详细信息。 此示例使用隐式订阅,这样更容易操作。 当 grain 类型想要隐式订阅流时,它将使用属性 [ImplicitStreamSubscription (namespace)]

本例将定义一个 ReceiverGrain,如下所示:

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

每当向命名空间 RANDOMDATA 的流推送数据时(就像我们在计时器中所做的那样),ReceiverGrain 类型的、具有相同流 Guid 的 grain 将接收消息。 即使当前不存在 grain 的激活,运行时也会自动创建一个新的激活并向其发送消息。

要实现此目的,我们需要通过设置用于接收数据的 OnNextAsync 方法来完成订阅过程。 为此,ReceiverGrain 应调用 OnActivateAsync 中所示的代码

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

你已完成所有设置! 现在,唯一的要求就是通过某种行为触发生成者 grain 的创建,然后该 grain 将注册计时器并开始向所有相关方发送随机整数。

同样,本指南跳过了很多细节,而只介绍了大致的操作。 请阅读本手册的其他部分和有关 RX 的其他资源,以详细了解可用的功能及其用法。

反应式编程可能是解决许多问题的有效方法。 例如,可以在订阅者中使用 LINQ 来筛选数字和执行各种有趣的操作。

另请参阅

Orleans 流编程 API