次の方法で共有


Orleans ストリーミングのクイックスタート

このガイドでは、Orleans ストリームを設定して使用する簡単な方法を示します。 ストリーミング機能について詳しくは、このドキュメントの他の部分をご覧ください。

必要構成

このガイドでは、グレイン メッセージングを使ってストリーム データをサブスクライバーに送信するメモリベースのストリームを使います。 インメモリ ストレージ プロバイダーを使用し、サブスクリプションの一覧を格納します。 ストリーミングとストレージでのメモリベースのメカニズムの使用は、ローカルでの開発とテストのみを目的としており、運用環境向けではありません。

siloISiloBuilder であるサイロで、AddMemoryStreams を呼び出します。

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

clientIClientBuilder であるクラスター クライアントで、AddMemoryStreams を呼び出します。

client.AddMemoryStreams("StreamProvider");

このガイドでは、グレイン メッセージングを使ってストリーム データをサブスクライバーに送信する簡単なメッセージ ベースのストリームを使います。 メモリ内ストレージ プロバイダーを使ってサブスクリプションの一覧を格納するので、実際の運用アプリケーションの場合はこれは賢明な選択ではありません。

hostBuilderISiloHostBuilder であるサイロで、AddSimpleMessageStreamProvider を呼び出します。

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

clientBuilderIClientBuilder であるクラスター クライアントで、AddSimpleMessageStreamProvider を呼び出します。

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

Note

既定では、Simple Message Stream 経由で渡されるメッセージは不変と見なされ、他のグレインに参照で渡すことができます。 この動作をオフにするには、SimpleMessageStreamProviderOptions.OptimizeForImmutableData をオフにするように SMS プロバイダーを構成する必要があります

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

ストリームを作成し、プロデューサーとしてそれらを使ってデータを送信し、サブスクライバーとしてデータを受信することができます。

イベントを生成する

ストリームに合わせてイベントを生成するのは比較的簡単です。 まず上の構成で定義したストリーム プロバイダー ("StreamProvider") にアクセスしてから、ストリームを選んで、それにデータをプッシュする必要があります。

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

ストリームに合わせてイベントを生成するのは比較的簡単です。 まず上の構成で定義したストリーム プロバイダー ("SMSProvider") にアクセスしてから、ストリームを選んで、それにデータをプッシュする必要があります。

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

ご覧のように、ストリームには GUID と名前空間があります。 これにより、一意のストリームを簡単に識別できます。 たとえば、チャット ルームの名前空間を "Rooms" とし、GUID は所有している RoomGrain の GUID にすることができます。

ここでは、既知のチャット ルームの GUID を使います。 ストリームの OnNextAsync メソッドを使って、データをそれにプッシュできます。 乱数を使って、タイマー内でそれを行ってみましょう。 ストリームには他のデータ型も使用できます。

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

ストリーミング データをサブスクライブして受信する

データを受信する場合は、暗黙的なサブスクリプションと明示的なサブスクリプションを使用できます。詳細については、「明示的サブスクリプションと暗黙的サブスクリプション」で詳しく説明されています。 この例では、より簡単な暗黙的なサブスクリプションを使用します。 グレイン型でストリームを暗黙的にサブスクライブする必要がある場合は、属性 [ImplicitStreamSubscription (namespace)] を使用します。

ここでは、ReceiverGrain を次のように定義します。

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

タイマーで行っているように、名前空間 RANDOMDATA のストリームにデータがプッシュされるたびに、同じ Guid のストリームを持つ ReceiverGrain 型のグレインがメッセージを受信します。 現在、アクティブなグレインが存在しない場合でも、ランタイムは自動的に新しく作成して、それにメッセージを送信します。

これが機能するには、データを受信するための OnNextAsync メソッドを設定することで、サブスクリプション プロセスを完了する必要があります。 これを行うには、ReceiverGrainOnActivateAsync で次のようなものを呼び出す必要があります

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

すべての設定が完了しました。 ここで唯一必要なのは、何かがプロデューサー グレインの作成をトリガーすることであり、その後はタイマーが登録されて、すべての関係パーティーへのランダムな整数の送信が開始されます。

やはり、このガイドは多くの詳細が省かれており、全体像を知る場合にのみ適しています。 機能と使い方について詳しくは、このマニュアルの他の部分や RX に関する他のリソースをご覧ください。

リアクティブ プログラミングは、多くの問題を解決するための非常に強力なアプローチです。 たとえば、サブスクライバーで LINQ を使って数値をフィルター処理し、あらゆる種類の興味深い操作を行うことができます。

こちらもご覧ください

Orleansストリーム プログラミング API