Orleans 串流快速入門
本指南將說明設定和使用 Orleans 串流的快速方式。 若要深入了解串流功能的詳細資料,請閱讀本文件的其他部分。
必要的設定
在本指南中,您將使用記憶體型串流,其使用粒紋傳訊將串流資料傳送給訂閱者。 您將使用記憶體內部儲存體提供者,來儲存訂用帳戶清單。 使用記憶體型機制進行串流,且儲存體僅適用於本地開發和測試,不適用於實際執行環境。
在 silo
是 ISiloBuilder的定址接收器上,呼叫 AddMemoryStreams:
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
在 client
是 IClientBuilder的叢集用戶端上,呼叫 AddMemoryStreams。
client.AddMemoryStreams("StreamProvider");
在本指南中,我們將使用簡單的訊息型串流,其使用粒紋傳訊將串流資料傳送給訂閱者。 我們會使用記憶體內部儲存體提供者來儲存訂用帳戶清單,因此這不是實際生產應用程式的明智選擇。
在 hostBuilder
是 ISiloHostBuilder
的定址接收器上,呼叫 AddSimpleMessageStreamProvider:
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
在 clientBuilder
是 IClientBuilder
的叢集用戶端上,呼叫 AddSimpleMessageStreamProvider。
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
注意
根據預設,透過簡單訊息串流傳遞的訊息會被視為不可變,而且可能由參考傳遞至其他粒紋。 若要關閉此行為,您必須設定 SMS 提供者以關閉 SimpleMessageStreamProviderOptions.OptimizeForImmutableData
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
您可以建立串流、以產生者的形式使用串流傳送資料,以及以訂閱者的形式接收資料。
產生事件
產生串流的事件相對容易。 您應該先存取先前在設定 ("StreamProvider"
) 中定義的串流提供者,然後選擇串流並將資料推送至其中。
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
產生串流的事件相對容易。 您應該先存取先前在設定 ("SMSProvider"
) 中定義的串流提供者,然後選擇串流並將資料推送至其中。
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
如您所見,串流具有 GUID 和命名空間。 這可讓您輕鬆地識別唯一的串流。 例如,聊天室的命名空間可以是「會議室」,而 GUID 可以擁有 RoomGrain 的 GUID。
在這裡,我們會使用一些已知聊天室的 GUID。 使用我們可以將資料推送至其中的串流 OnNextAsync
方法。 讓我們使用亂數在計時器內執行此動作。 您也可以針對串流使用任何其他資料類型。
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
訂閱和接收串流資料
若要接收資料,您可使用隱含和明確訂用帳戶,在隱含和明確訂用帳戶 (部分機器翻譯) 中有更詳細的說明。 此範例會使用較容易的隱含訂閱。 當粒紋型別想要隱含訂閱串流時,其會使用屬性 [ImplicitStreamSubscription(namespace)]。
針對您的案例,請如下定義 ReceiverGrain
:
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
每當資料推送至命名空間 RANDOMDATA
的串流時,如同在計時器中一樣,具有相同串流 Guid
的 ReceiverGrain
型別粒紋將會收到訊息。 即使目前不存在粒紋的啟用,執行階段仍會自動建立新的粒紋,並將訊息傳送至其中。
若要讓此作業能夠運作,我們必須藉由設定 OnNextAsync
接收資料的方法來完成訂用帳戶流程。 若要這樣做,OnActivateAsync
應該在 ReceiverGrain
中呼叫類似這樣的項目
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
一切準備就緒! 現在唯一的需求是,某些項目會觸發產生者粒紋的建立,然後其會註冊計時器,並開始將隨機 int 傳送給所有有關的各方。
同樣地,本指南會略過許多詳細資料,而且只顯示重點。 閱讀此手冊的其他部分和其他 RX 上的其他資源,以充分了解可用的項目及方式。
回應式程式設計可能會是解決許多問題的高效方法。 例如,您可以在訂閱者中使用 LINQ 來篩選數字,並執行各種有趣的內容。