共用方式為


Orleans 串流快速入門

本指南將說明設定和使用 Orleans 串流的快速方式。 若要深入了解串流功能的詳細資料,請閱讀本文件的其他部分。

必要的設定

在本指南中,您將使用記憶體型串流,其使用粒紋傳訊將串流資料傳送給訂閱者。 您將使用記憶體內部儲存體提供者,來儲存訂用帳戶清單。 使用記憶體型機制進行串流,且儲存體僅適用於本地開發和測試,不適用於實際執行環境。

siloISiloBuilder的定址接收器上,呼叫 AddMemoryStreams

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

clientIClientBuilder的叢集用戶端上,呼叫 AddMemoryStreams

client.AddMemoryStreams("StreamProvider");

在本指南中,我們將使用簡單的訊息型串流,其使用粒紋傳訊將串流資料傳送給訂閱者。 我們會使用記憶體內部儲存體提供者來儲存訂用帳戶清單,因此這不是實際生產應用程式的明智選擇。

hostBuilderISiloHostBuilder的定址接收器上,呼叫 AddSimpleMessageStreamProvider

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

clientBuilderIClientBuilder的叢集用戶端上,呼叫 AddSimpleMessageStreamProvider

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

注意

根據預設,透過簡單訊息串流傳遞的訊息會被視為不可變,而且可能由參考傳遞至其他粒紋。 若要關閉此行為,您必須設定 SMS 提供者以關閉 SimpleMessageStreamProviderOptions.OptimizeForImmutableData

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

您可以建立串流、以產生者的形式使用串流傳送資料,以及以訂閱者的形式接收資料。

產生事件

產生串流的事件相對容易。 您應該先存取先前在設定 ("StreamProvider") 中定義的串流提供者,然後選擇串流並將資料推送至其中。

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

產生串流的事件相對容易。 您應該先存取先前在設定 ("SMSProvider") 中定義的串流提供者,然後選擇串流並將資料推送至其中。

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

如您所見,串流具有 GUID 和命名空間。 這可讓您輕鬆地識別唯一的串流。 例如,聊天室的命名空間可以是「會議室」,而 GUID 可以擁有 RoomGrain 的 GUID。

在這裡,我們會使用一些已知聊天室的 GUID。 使用我們可以將資料推送至其中的串流 OnNextAsync 方法。 讓我們使用亂數在計時器內執行此動作。 您也可以針對串流使用任何其他資料類型。

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

訂閱和接收串流資料

若要接收資料,您可使用隱含和明確訂用帳戶,在隱含和明確訂用帳戶 (部分機器翻譯) 中有更詳細的說明。 此範例會使用較容易的隱含訂閱。 當粒紋型別想要隱含訂閱串流時,其會使用屬性 [ImplicitStreamSubscription(namespace)]

針對您的案例,請如下定義 ReceiverGrain

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

每當資料推送至命名空間 RANDOMDATA 的串流時,如同在計時器中一樣,具有相同串流 GuidReceiverGrain 型別粒紋將會收到訊息。 即使目前不存在粒紋的啟用,執行階段仍會自動建立新的粒紋,並將訊息傳送至其中。

若要讓此作業能夠運作,我們必須藉由設定 OnNextAsync 接收資料的方法來完成訂用帳戶流程。 若要這樣做,OnActivateAsync 應該在 ReceiverGrain 中呼叫類似這樣的項目

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

一切準備就緒! 現在唯一的需求是,某些項目會觸發產生者粒紋的建立,然後其會註冊計時器,並開始將隨機 int 傳送給所有有關的各方。

同樣地,本指南會略過許多詳細資料,而且只顯示重點。 閱讀此手冊的其他部分和其他 RX 上的其他資源,以充分了解可用的項目及方式。

回應式程式設計可能會是解決許多問題的高效方法。 例如,您可以在訂閱者中使用 LINQ 來篩選數字,並執行各種有趣的內容。

另請參閱

OrleansStreams 程式設計 API