次の方法で共有


オブザーバー

シンプルなメッセージ - 応答のパターンでは足りず、クライアントが非同期通知を受信することが必要になる場合があります。 たとえば、あるユーザーが、友人が新しいメッセージを発行したときに通知を受け取りたいことがあります。

クライアント オブザーバーは、クライアントに非同期的に通知できるようにするメカニズムです。 オブザーバー インターフェイスは IGrainObserver から継承する必要があり、すべてのメソッドは、voidTaskTask<TResult>ValueTask または ValueTask<TResult> のいずれかを返す必要があります。 戻り値の void 型は、実装で async void の使用が推奨される場合があり、メソッドから例外がスローされた場合にアプリケーションがクラッシュする可能性がある危険なパターンであるため、推奨されません。 代わりに、最適な通知シナリオでは、OneWayAttribute をオブザーバーのインターフェイス メソッドに適用することを検討してください。 これにより、受信側はメソッド呼び出しの応答を送信せず、オブザーバーからの応答を待たずに、呼び出しサイトですぐにメソッドを返すようになります。 グレインは、他のグレイン インターフェイス メソッドと同様に、オブザーバーのメソッドを呼び出します。 Orleans ランタイムは、要求と応答の配信を保証します。 オブザーバーの一般的なユース ケースは、Orleans アプリケーションでイベントが発生したときに通知を受信するクライアントを登録することです。 このような通知を発行するグレインは、オブザーバーを追加または削除するための API を提供する必要があります。 さらに、通常は、既存のサブスクリプションを取り消すメソッドを公開すると便利です。

グレイン開発者は、ObserverManager<TObserver> などのユーティリティ クラスを使用して、確認されたグレイン型の開発を簡略化できます。 障害が発生した後に必要に応じて自動的に再アクティブ化されるグレインとは異なり、クライアントはフォールト トレラントではありません。障害が発生したクライアントは復旧しないおそれがあります。 このため、ObserverManager<T> ユーティリティは、構成された期間が経過するとサブスクリプションを削除します。 アクティブなクライアントは、サブスクリプションをアクティブな状態に保つために、タイマーでもう一度登録する必要があります。

通知を登録するには、クライアントが最初にオブザーバー インターフェイスを実装するローカル オブジェクトを作成する必要があります。 次に、オブザーバー ファクトリ CreateObjectReference` のメソッドを呼び出してオブジェクトをグレイン参照に変換し、通知グレインのサブスクリプション メソッドに渡すことができます。

このモデルは、非同期通知を受信するために他のグレインでも使用できます。 グレインは、IGrainObserver インターフェイスを実装することもできます。 クライアント サブスクリプションの場合とは異なり、登録しているグレインでは単にオブザーバー インターフェイスを実装し、それ自体への参照を渡します (例: this.AsReference<IMyGrainObserverInterface>())。 グレインは既にアドレス可能であるため、CreateObjectReference() は必要ありません。

コードの例

クライアントに定期的にメッセージを送信するグレインがあるとします。 わかりやすくするために、この例のメッセージを文字列にします。 最初に、メッセージを受信するクライアントでインターフェイスを定義します。

インターフェイスは次のようになります

public interface IChat : IGrainObserver
{
    Task ReceiveMessage(string message);
}

唯一の特別な点は、インターフェイスが IGrainObserver から継承する必要があるということです。 これらのメッセージを監視するすべてのクライアントは、IChat を実装するクラスを実装する必要があります。

最も簡単なケースは次のようになります。

public class Chat : IChat
{
    public Task ReceiveMessage(string message)
    {
        Console.WriteLine(message);
        return Task.CompletedTask;
    }
}

次に、サーバーでは、これらのチャット メッセージをクライアントに送信するグレインが必要です。 グレインには、クライアントが通知を登録および登録解除するためのメカニズムも必要です。 サブスクリプションの場合、グレインはユーティリティ クラス ObserverManager<TObserver> のインスタンスを使用できます。

注意

ObserverManager<TObserver> は、バージョン 7.0 以降の Orleans の一部です。 以前のバージョンでは、次の実装をコピーできます。

class HelloGrain : Grain, IHello
{
    private readonly ObserverManager<IChat> _subsManager;

    public HelloGrain(ILogger<HelloGrain> logger)
    {
        _subsManager =
            new ObserverManager<IChat>(
                TimeSpan.FromMinutes(5), logger);
    }

    // Clients call this to subscribe.
    public Task Subscribe(IChat observer)
    {
        _subsManager.Subscribe(observer, observer);

        return Task.CompletedTask;
    }

    //Clients use this to unsubscribe and no longer receive messages.
    public Task UnSubscribe(IChat observer)
    {
        _subsManager.Unsubscribe(observer);

        return Task.CompletedTask;
    }
}

クライアントにメッセージを送信するには、ObserverManager<IChat> インスタンスの Notify メソッドを使用できます。 このメソッドは、Action<T> メソッドまたはラムダ式を受け取ります (ここで T は型 IChat)。 インターフェイス上の任意のメソッドを呼び出して、クライアントに送信できます。 ここでは、ReceiveMessage メソッドが 1 つだけあり、サーバー上の送信コードは次のようになります。

public Task SendUpdateMessage(string message)
{
    _subsManager.Notify(s => s.ReceiveMessage(message));

    return Task.CompletedTask;
}

これで、サーバーにはオブザーバー クライアントにメッセージを送信するメソッドと、登録/登録解除のための 2 つのメソッドがあり、クライアントはグレイン メッセージを観察できるクラスを実装しました。 最後の手順では、以前に実装した Chat クラスを使用してクライアントにオブザーバー参照を作成し、登録後にメッセージを受信できるようにします。

コードは、次のようになります。

//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();

//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);

//Subscribe the instance to receive messages.
await friend.Subscribe(obj);

これで、サーバー上のグレインが SendUpdateMessage メソッドを呼び出すたびに、登録されているすべてのクライアントがメッセージを受け取ります。 このクライアント コードでは、変数 cChat インスタンスがメッセージを受信し、コンソールに出力します。

重要

CreateObjectReference に渡されたオブジェクトは WeakReference<T> 経由で保持されるため、他の参照が存在しない場合はガベージ コレクションされます。

ユーザーは、収集されたくない各オブザーバーの参照を保持する必要があります。

注意

オブザーバーをホストするクライアントは失敗する場合があり、回復後に作成されたオブザーバーには異なる (ランダム化された) ID が指定されるため、オブザーバーは本質的に信頼できません。 ObserverManager<TObserver> は、前述のように、オブザーバーによる定期的な再サブスクリプションに依存しているため、非アクティブなオブザーバーを削除できます。

実行モデル

IGrainObserver の実装は、IGrainFactory.CreateObjectReference の呼び出しによって登録され、そのメソッドを呼び出すたびに、その実装を指す新しい参照が作成されます。 これらの参照のそれぞれに送信された要求が Orleans によって 1 つずつ実行され、完了します。 オブザーバーは再入不可であるため、オブザーバーへの同時要求は Orleans によってインターリーブされません。 複数のオブザーバーが同時に要求を受信している場合、それらの要求は並列で実行できます。 オブザーバー メソッドの実行は、AlwaysInterleaveAttributeReentrantAttribute などの属性の影響を受けないため、実行モデルを開発者がカスタマイズすることはできません。