次の方法で共有


Orleans ストリーミング API

アプリケーションは、既知の .NET リアクティブ拡張機能 (Rx) と非常によく似た API を介してストリームと対話します。 主な違いは、Orleans ストリーム拡張機能は、Orleans の分散されたスケーラブルなコンピューティング ファブリックでの処理をより効率的にするために非同期であるという点です。

非同期ストリーム

アプリケーションは、"ストリーム プロバイダー" を使用してストリームへのハンドルを取得することから開始します。 ストリーム プロバイダーの詳細については、こちらを参照してください。ただし、ここでは、これは、実装者がストリームの動作とセマンティクスをカスタマイズできるようにするストリーム ファクトリと考えることができます。

IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");

アプリケーションでは、グレイン内で Grain.GetStreamProvider メソッドを呼び出すことによって、またはクライアント上で GrainClient.GetStreamProvider メソッドを呼び出すことによって、ストリーム プロバイダーへの参照を取得できます。

Orleans.Streams.IAsyncStream<T> は、仮想ストリームへの厳密に型指定された論理ハンドルです。 考え方は、Orleans グレイン参照とよく似ています。 GetStreamProvider および GetStream の呼び出しは純粋にローカルです。 GetStream の引数は、GUID と、ストリーム名前空間を呼び出す追加の文字列 (null にすることができます) です。 GUID と名前空間文字列を組み合わせてストリーム ID を構成します (考え方は IGrainFactory.GetGrain の引数と似ています)。 GUID と名前空間文字列を組み合わせることにより、ストリーム ID をより柔軟に決定することができます。 グレイン 7 が Grain 型 PlayerGrain 内に存在し、別のグレイン 7 がグレイン型 ChatRoomGrain 内に存在する可能性があるのと同様、ストリーム 123 がストリーム名前空間 PlayerEventsStream 内に存在し、別のストリーム 123 がストリーム名前空間 ChatRoomMessagesStream 内に存在する可能性があります。

生成と利用

IAsyncStream<T> によって、IAsyncObserver<T>IAsyncObservable<T> の両方のインターフェイスが実装されます。 こうすることで、アプリケーションではストリームを使用して、Orleans.Streams.IAsyncObserver<T> を使用してストリームに新しいイベントを生成するか、Orleans.Streams.IAsyncObservable<T> を使用してイベントをストリームにサブスクライブしてストリームから使用することができます。

public interface IAsyncObserver<in T>
{
    Task OnNextAsync(T item, StreamSequenceToken token = null);
    Task OnCompletedAsync();
    Task OnErrorAsync(Exception ex);
}

public interface IAsyncObservable<T>
{
    Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}

ストリームにイベントを生成するには、アプリケーションで次を呼び出すだけです

await stream.OnNextAsync<T>(event)

ストリームにサブスクライブするには、アプリケーションで次を呼び出します

StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)

SubscribeAsync の引数には、IAsyncObserver<T> インターフェイスを実装するオブジェクト、または受信イベントを処理するラムダ関数の組み合わせを指定することができます。 SubscribeAsync のその他のオプションは、AsyncObservableExtensions クラスを介して使用できます。 SubscribeAsync からは、StreamSubscriptionHandle<T>が返されます。これは、ストリームからサブスクライブを解除するために使用できる不透明なハンドルです (考え方は、IDisposable の非同期バージョンと似ています)。

await subscriptionHandle.UnsubscribeAsync()

サブスクリプションは、アクティブ化用ではなくグレイン用であることに注意することが重要です。 グレイン コードがストリームにサブスクライブされると、このサブスクリプションはこのアクティブ化の有効期間を超えて、グレイン コードのサブスクライブが明示的に解除されるまで (別のアクティブ化の可能性がある)、永続的に持続します。 これは、仮想ストリームの抽象化の核心です。すべてのストリームが論理的に常に存在するだけでなく、ストリームのサブスクリプションは永続的であり、サブスクリプションを作成した特定の物理的なアクティブ化を超えて存続します。

カーディナリティ

Orleans ストリームには、複数のプロデューサーと複数のコンシューマーが含まれる場合があります。 プロデューサーによって発行されたメッセージは、メッセージが発行される前にストリームにサブスクライブされたすべてのコンシューマーに配信されます。

さらに、コンシューマーは同じストリームに複数回サブスクライブできます。 サブスクライブするたびに、一意の StreamSubscriptionHandle<T> が返されます。 グレイン (またはクライアント) が同じストリームに X 回サブスクライブされる場合、同じイベントが X 回 (サブスクリプションごとに 1 回) 受信されます。 コンシューマーは、個々のサブスクリプションからサブスクライブを解除することもできます。 次を呼び出すことで、現在のすべてのサブスクリプションを見つけることができます。

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

エラーからの回復

ストリームのプロデューサーが停止した場合 (またはグレインが非アクティブ化された場合)、何も行う必要はありません。 次にこのグレインがさらに多くのイベントを生成するときに、ストリーム ハンドルを再度取得し、同じ方法で新しいイベントを生成できます。

コンシューマー ロジックはもう少し複雑です。 前に説明したとおり、コンシューマー グレインがストリームにサブスクライブされると、グレインが明示的にサブスクライブ解除されるまで、このサブスクリプションは有効です。 ストリームのコンシューマーが停止し (またはそのグレインが非アクティブ化され)、ストリームで新しいイベントが生成された場合、コンシューマー グレインは自動的に再アクティブ化されます (メッセージが送信されると、通常の Orleans グレインが自動的にアクティブ化される場合と同様です)。 この時点でグレイン コードで行う必要があるのは、データを処理するための IAsyncObserver<T> を提供することだけです。 コンシューマーは、OnActivateAsync() メソッドの一部として処理ロジックを再アタッチする必要があります。 これを行うには、次を呼び出すことができます。

StreamSubscriptionHandle<int> newHandle =
    await subscriptionHandle.ResumeAsync(IAsyncObserver);

コンシューマーは、最初に "処理の再開" にサブスクライブしたときに取得した以前のハンドルを使用します。 ResumeAsync は単に既存のサブスクリプションを IAsyncObserver ロジックの新しいインスタンスで更新するだけで、このコンシューマーが既にこのストリームにサブスクライブしているという事実は変わらないことに注意してください。

コンシューマーは古い subscriptionHandle をどのように取得するのでしょうか? 2 つのオプションがあります。 コンシューマーは、元の SubscribeAsync 操作から返されたハンドルを永続化し、今すぐ使用できます。 または、コンシューマーがハンドルを持っていない場合は、次を呼び出すことによって、アクティブなすべてのサブスクリプション ハンドルを IAsyncStream<T> に要求できます。

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

コンシューマーは、必要に応じて、それらすべてを再開するか、一部のサブスクリプションを解除することができます。

ヒント

コンシューマー グレインが IAsyncObserver<T> インターフェイスを直接実装する場合 (public class MyGrain<T> : Grain, IAsyncObserver<T>)、理論的には IAsyncObserver を再アタッチする必要がないため、ResumeAsync を呼び出す必要はありません。 ストリーミング ランタイムは、グレインが既に IAsyncObserver を実装しており、これらの IAsyncObserver メソッドを呼び出すだけであることを自動的に把握できる必要があります。 ただし、ストリーミング ランタイムでは現在、これがサポートされていないため、グレインが IAsyncObserver を直接実装している場合でも、グレイン コードでは ResumeAsync を明示的に呼び出す必要があります。

明示的および暗黙的なサブスクリプション

既定では、ストリーム コンシューマーは明示的にストリームにサブスクライブする必要があります。 通常、このサブスクリプションは、グレイン (またはクライアント) が受信する、サブスクライブするように指示する外部メッセージによってトリガーされます。 たとえば、チャット サービスでは、ユーザーがチャット ルームに参加すると、そのグレインは、チャット名を含む JoinChatGroup メッセージを受け取ります。これにより、ユーザー グレインはこのチャット ストリームにサブスクライブします。

さらに、Orleans ストリームでは "暗黙的なサブスクリプション" もサポートされます。 このモデルでは、グレインはストリームに明示的にサブスクライブしません。 このグレインは、グレイン ID と ImplicitStreamSubscriptionAttribute に基づいて、自動的に (暗黙に) サブスクライブされます。 暗黙的なサブスクリプションの主要な値によって、ストリーム アクティビティはグレインのアクティブ化を自動的にトリガーできます (したがって、サブスクリプションをトリガーします)。 たとえば、SMS ストリームを使用すると、あるグレインがストリームを生成する必要があり、別のグレインがこのストリームを処理する場合、プロデューサーはコンシューマー グレインの ID を認識し、ストリームにサブスクライブするよう指示するグレイン呼び出しを行う必要があります。 その後にのみ、イベントの送信を開始できます。 代わりに、暗黙的なサブスクリプションを使用すると、プロデューサーはストリームへのイベントの生成を開始するだけで、コンシューマー グレインが自動的にアクティブ化され、ストリームにサブスクライブされます。 その場合、プロデューサーは誰がイベントを読んでいるかをまったく気にしません

グレイン実装 MyGrainType では、属性 [ImplicitStreamSubscription("MyStreamNamespace")] を宣言できます。 これは、ID が GUID XXX と "MyStreamNamespace" 名前空間であるストリームでイベントが生成された場合、ID が MyGrainType 型の XXX であるグレインに配信される必要があることをストリーミング ランタイムに通知します。 つまり、ランタイムは、ストリーム <XXX, MyStreamNamespace> をコンシューマー グレイン <XXX, MyGrainType> にマップします。

ImplicitStreamSubscription が存在すると、ストリーミング ランタイムは、このグレインをストリームに自動的にサブスクライブし、それにストリーム イベントを配信します。 ただし、その場合でも、グレイン コードでは、イベントの処理方法をランタイムに伝える必要があります。 基本的に、グレイン コードでは IAsyncObserver をアタッチする必要があります。 したがって、グレインがアクティブ化されると、OnActivateAsync 内のグレイン コードで次を呼び出す必要があります。

IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

StreamId streamId =
    StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
    streamProvider.GetStream<T>(streamId);

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

IAsyncStream<T> stream =
    streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);

サブスクリプション ロジックの記述

さまざまなケース (明示的および暗黙的なサブスクリプション、巻き戻し可能なストリーム、巻き戻し不可能なストリーム) のサブスクリプション ロジックを記述する方法に関するガイドラインを次に示します。 明示的なサブスクリプションと暗黙的なサブスクリプションの主な違いは、暗黙的な場合、グレインはすべてのストリーム名前空間に対して常に 1 つだけの暗黙的なサブスクリプションを持つことです。複数のサブスクリプションを作成する方法はなく (サブスクリプションの重複はありません)、サブスクライブを解除する方法はなく、グレイン ロジックは常に処理ロジックをアタッチするだけで済みます。 つまり、暗黙的なサブスクリプションの場合、サブスクリプションを再開する必要はありません。 一方、明示的なサブスクリプションの場合は、サブスクリプションを再開する必要があります。そうしなければ、グレインが再度サブスクライブすると、グレインは複数回サブスクライブされることになります。

暗黙的なサブスクリプション:

暗黙的なサブスクリプションの場合も、グレインが処理中のロジックをアタッチするにはサブスクライブが必要です。 これをコンシューマー グレインで行うには、サブスクライブとは別にグレインをアクティブ化できるように、IStreamSubscriptionObserver インターフェイスと IAsyncObserver<T> インターフェイスを実装します。 ストリームをサブスクライブするために、グレインは OnSubscribed(...) メソッド内でハンドルを作成し、await handle.ResumeAsync(this) を呼び出します。

メッセージを処理するには、ストリーム データとシーケンス トークンを受信するための IAsyncObserver<T>.OnNextAsync(...) メソッドを実装します。 別の方法として、ResumeAsync メソッドで、IAsyncObserver<T> インターフェイスのメソッドである onNextAsynconErrorAsynconCompletedAsync を表すデリゲートのセットを受け取ることもできます。

public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
    _logger.LogInformation($"Received an item from the stream: {item}");
}

public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
    var handle = handleFactory.Create<string>();
    await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(
            this.GetPrimaryKey(), "MyStreamNamespace");

    await stream.SubscribeAsync(OnNextAsync);
}

明示的なサブスクリプション:

明示的なサブスクリプションの場合、グレインは、ストリームをサブスクライブする SubscribeAsync を呼び出す必要があります。 これは、サブスクリプションを作成し、処理ロジックをアタッチします。 明示的なサブスクリプションは、グレインのサブスクライブが解除されるまで存在するため、グレインが非アクティブ化されて再アクティブ化された場合でも、グレインは明示的にサブスクライブされます。ただし、処理ロジックはアタッチされません。 この場合、グレインは処理ロジックを再アタッチする必要があります。 これを行うには、その OnActivateAsync で、グレインはまず IAsyncStream<T>.GetAllSubscriptionHandles() を呼び出して、所有しているサブスクリプションを見つける必要があります。 グレインは、処理を続行したいハンドルごとに ResumeAsync を実行するか、処理が完了したすべてのハンドルで UnsubscribeAsync を実行する必要があります。 グレインは必要に応じて、ResumeAsync 呼び出しの引数として StreamSequenceToken を指定できます。これにより、この明示的なサブスクリプションではそのトークンから使用を開始します。

public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
    var stream = streamProvider.GetStream<string>(streamId);

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    foreach (var handle in subscriptionHandles)
    {
       await handle.ResumeAsync(this);
    } 
}
public async override Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    if (!subscriptionHandles.IsNullOrEmpty())
    {
        subscriptionHandles.ForEach(
            async x => await x.ResumeAsync(OnNextAsync));
    }
}

ストリームの順序とシーケンス トークン

個々のプロデューサーと個々のコンシューマーの間のイベント配信の順序は、ストリーム プロバイダーによって異なります。

SMS では、プロデューサーが、イベントを公開する方法を制御することにより、コンシューマーに表示されるイベントの順序を明示的に制御します。 既定では (SMS プロバイダーの SimpleMessageStreamProviderOptions.FireAndForgetDelivery オプションが false に設定されている場合)、およびプロデューサーがすべての OnNextAsync 呼び出しを待機する場合、イベントは FIFO 順に到着します。 SMS では、OnNextAsync 呼び出しから返される破損した Task によって示される配信の失敗を処理する方法を決定するのは、プロデューサーの責任です

Azure Queue ストリームでは FIFO の順序が保証されません。基になる Azure キューで障害が発生した場合の順序が保証されないためです。 (エラーのない実行では、FIFO 順が保証されます)。プロデューサーがイベントを Azure Queue に生成するときに、キュー操作が失敗した場合、別のキューを試行し、後で潜在的な重複メッセージに対処するのは、プロデューサーの責任です。 配信側では、Orleans Streaming ランタイムがキューからイベントをデキューし、処理のためにコンシューマーへの配信が試行されます。 Orleans Streaming ランタイムは、処理が成功した場合にのみ、キューからイベントを削除します。 配信または処理が失敗した場合、イベントはキューから削除されず、後でキューに自動的に再表示されます。 Streaming ランタイムは再度配信を試みるため、FIFO 順が壊れる可能性があります。 上記の動作は、Azure キューの通常のセマンティクスと一致します。

アプリケーション定義順: 上記の順序の問題に対処するために、アプリケーションでは必要に応じてその順序を指定できます。 これは、StreamSequenceToken を使用して実現されます。これは、イベントの順序付けに使用できる不透明な IComparable オブジェクトです。 プロデューサーは、オプションの StreamSequenceTokenOnNext 呼び出しに渡すことができます。 この StreamSequenceToken はコンシューマーに渡され、イベントと共に配信されます。 こうすることで、アプリケーションはストリーミング ランタイムとは関係なく、その順序を推論して再構築できます。

巻き戻し可能なストリーム

一部のストリームでは、アプリケーションが最新の時点からサブスクライブすることしか許可されませんが、その他のストリームでは "時間をさかのぼる" ことが許可されます。 後者の機能は、基になるキューイング テクノロジと特定のストリーム プロバイダーによって異なります。 たとえば、Azure キューでは、キューに入れられた最新のイベントの使用のみが許可されますが、EventHub では、任意の時点 (最大で有効期限まで) からのイベントの再生が許可されます。 時間をさかのぼることをサポートするストリームは、"巻き戻し可能なストリーム" と呼ばれます。

巻き戻し可能なストリームのコンシューマーは、StreamSequenceTokenSubscribeAsync 呼び出しに渡すことができます。 ランタイムは、その StreamSequenceToken から始まるイベントを配信します。 null トークンは、コンシューマーが最新のイベントから順に受信することを意味します。

ストリームを巻き戻す機能は、回復シナリオで非常に役立ちます。 たとえば、ストリームにサブスクライブし、最新のシーケンス トークンと共にその状態を定期的にチェックポイントするグレインについて考えてみましょう。 障害から回復する場合、グレインはチェックポイントされた最新のシーケンス トークンから同じストリームに再サブスクライブできるため、最後のチェックポイント以降に生成されたイベントを失うことなく回復できます。

イベント ハブ プロバイダー は巻き戻し可能です。 そのコードは、GitHub: Orleans/Azure/Orleans.Streaming.EventHubs にあります。 SMS および Azure キュー プロバイダーは巻き戻し "できません"。

ステートレス自動スケールアウト処理

既定では、Orleans Streaming は、それぞれが 1 つ以上のステートフル グレインによって処理される多数の比較的小さなストリームをサポートすることを目的としています。 まとめて、すべてのストリームを一緒に処理すると、多数の通常の (ステートフルな) グレイン間でシャード化されます。 アプリケーション コードでは、ストリーム ID とグレイン ID を割り当て、明示的にサブスクライブすることによって、このシャード化を制御します。 目標は、シャード化されたステートフル処理です。

ただし、自動的にスケールアウトされたステートレス処理の興味深いシナリオもあります。 このシナリオでは、アプリケーションに少数のストリーム (または 1 つの大きなストリーム) があり、目標はステートレス処理です。 一例として、イベントのグローバル ストリームがあります。この処理では、各イベントをデコードし、さらにステートフルな処理のために他のストリームに転送する可能性があります。 Orleans では、StatelessWorkerAttribute グレインを介して、ステートレスなスケールアウト ストリーム処理をサポートできます。

ステートレス自動スケールアウト処理の現在の状態: これはまだ実装されていません。 StatelessWorker グレインからストリームにサブスクライブしようとすると、未定義の動作が発生します。 このオプションのサポートは検討中です

グレインと Orleans クライアント

Orleans ストリームは各グレインおよび Orleans クライアント間で均一に動作します。 つまり、同じ API をグレイン内と Orleans クライアントで使用して、イベントを生成および使用できます。 これにより、アプリケーション ロジックが大幅に簡素化され、Grain Observers などの特別なクライアント側 API が不要になります。

フル マネージドで信頼性の高い Streaming Pub-Sub

ストリーム サブスクリプションを追跡するために、Orleans は Streaming Pub-Sub と呼ばれるランタイム コンポーネントを使用します。これは、ストリーム コンシューマーとストリーム プロデューサーのランデブー ポイントとして機能します。 Pub-Sub は、すべてのストリーム サブスクリプションを追跡して保持し、ストリーム コンシューマーとストリーム プロデューサーを一致させます。

アプリケーションでは、Pub-Sub データを保存する場所と方法を選択できます。 Pub-Sub コンポーネント自体は、Orleans の宣言型永続性を使用するグレイン (PubSubRendezvousGrain と呼ばれる) として実装されます。 PubSubRendezvousGrain は、PubSubStore という名前のストレージ プロバイダーを使用します。 グレインと同様に、ストレージ プロバイダーの実装を指定できます。 Streaming Pub-Sub の場合、サイロ ホスト ビルダーを使用して、サイロの構築時に PubSubStore の実装を変更できます。

次のコードでは、その状態を Azure テーブルに格納するように Pub-Sub を構成します。

hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConnectionString = "<Secret>");

そうすれば、Pub-Sub データは Azure テーブルに永続的に格納されます。 初期開発では、メモリ ストレージも使用できます。 Pub-Sub に加えて、Orleans Streaming ランタイムは、プロデューサーからコンシューマーにイベントを配信し、アクティブに使用されるストリームに割り当てられたすべてのランタイム リソースを管理し、使用されていないストリームからのランタイム リソースの透過的なガベージ コレクションを実行します。

構成

ストリームを使用するには、サイロ ホストまたはクラスター クライアント ビルダーを使用してストリーム プロバイダーを有効にする必要があります。 ストリーム プロバイダーの詳細については、こちらを参照してください。 サンプル ストリーム プロバイダーの設定:

hostBuilder.AddMemoryStreams("StreamProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConfigureTableServiceClient("<Secret>")))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConnectionString = "<Secret>"))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConnectionString = "<Secret>");

関連項目

Orleans Stream プロバイダー