观察程序

在某些情况下,简单的消息/响应模式是不够的,客户端需要接收异步通知。 例如,用户可能希望在好友发布新的即时消息时收到通知。

客户端观察程序是可以异步通知客户端的机制。 观察者接口必须从 IGrainObserver 继承,并且所有方法都必须返回 voidTaskTask<TResult>ValueTaskValueTask<TResult>。 不建议使用返回类型 void,因为它可能会鼓励在实现上使用 async void,这是一种危险的模式,因为如果从方法引发异常,则可能会导致应用程序崩溃。 相反,对于尽力通知方案,请考虑将 OneWayAttribute 应用于观察者的接口方法。 这将导致接收方不发送方法调用的响应,并将导致方法在调用站点立即返回,而无需等待观察者的响应。 粒度通过像调用任何粒度接口方法一样来调用观察者的方法。 Orleans 运行时将确保传递请求和响应。 观察程序的一个常见用例是,在 Orleans 应用程序中发生事件时登记客户端以接收通知。 发布此类通知的 grain 应提供一个 API 用于添加或删除观察程序。 此外,公开一个允许取消现有订阅的方法通常会很方便。

grain 开发人员可以使用实用工具类(例如 ObserverManager<TObserver>)来简化观察到的 grain 类型的开发。 与失败后根据需要自动重新激活的 grain 不同,客户端不可容错:失败的客户端可能永远无法恢复。 因此,ObserverManager<T> 实用工具会在配置的持续时间后删除订阅。 处于活动状态的客户端应在计时器上重新订阅,使其订阅保持活动状态。

若要订阅通知,客户端必须首先创建一个实现观察程序接口的本地对象。 然后它调用观察程序工厂上的方法 CreateObjectReference` 以将对象转换为 grain 引用,然后可将其传递给通知方 grain 上的订阅方法。

此模型也可由其他 grain 用来接收异步通知。 grain 还可以实现 IGrainObserver 接口。 与客户端订阅情况不同,订阅方 grain 只会实现观察程序接口并传入对其自身的引用(例如 this.AsReference<IMyGrainObserverInterface>())。 不需要 CreateObjectReference(),因为 grain 已经可寻址。

代码示例

假设有一个定期向客户端发送消息的 grain。 为简单起见,本示例中的消息是一个字符串。 我们首先在客户端上定义用于接收消息的接口。

该接口类似于

public interface IChat : IGrainObserver
{
    Task ReceiveMessage(string message);
}

唯一的特殊之处是该接口应继承自 IGrainObserver。 现在,任何想要观察这些消息的客户端都应该实现一个类来实现 IChat

最简单的案例如下所示:

public class Chat : IChat
{
    public Task ReceiveMessage(string message)
    {
        Console.WriteLine(message);
        return Task.CompletedTask;
    }
}

在服务器上,接下来应有一个将这些聊天消息发送到客户端的 grain。 该 grain 还应为客户端提供一个机制,让客户端自行订阅和取消订阅通知。 对于订阅,粒度可以使用实用工具类 ObserverManager<TObserver> 的实例。

注意

ObserverManager<TObserver> 在 7.0 版本后是 Orleans 的一部分。 对于低版本,可以复制以下实现

class HelloGrain : Grain, IHello
{
    private readonly ObserverManager<IChat> _subsManager;

    public HelloGrain(ILogger<HelloGrain> logger)
    {
        _subsManager =
            new ObserverManager<IChat>(
                TimeSpan.FromMinutes(5), logger);
    }

    // Clients call this to subscribe.
    public Task Subscribe(IChat observer)
    {
        _subsManager.Subscribe(observer, observer);

        return Task.CompletedTask;
    }

    //Clients use this to unsubscribe and no longer receive messages.
    public Task UnSubscribe(IChat observer)
    {
        _subsManager.Unsubscribe(observer);

        return Task.CompletedTask;
    }
}

若要向客户端发送消息,可以使用 ObserverManager<IChat> 实例的 Notify 方法。 该方法采用 Action<T> 方法或 lambda 表达式(此处 T 的类型为 IChat)。 可以调用接口上的任何方法将消息发送到客户端。 本例中只有一个方法,即 ReceiveMessage,服务器上的发送方代码如下所示:

public Task SendUpdateMessage(string message)
{
    _subsManager.Notify(s => s.ReceiveMessage(message));

    return Task.CompletedTask;
}

现在我们的服务器有一个向观察程序客户端发送消息的方法,两个订阅/取消订阅方法,客户端已实现一个能够观察 grain 消息的类。 最后一步是使用先前实现的 Chat 类在客户端上创建一个观察程序引用,并让它在订阅后接收消息。

代码如下所示:

//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();

//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);

//Subscribe the instance to receive messages.
await friend.Subscribe(obj);

现在,每当服务器上的 grain 调用 SendUpdateMessage 方法时,所有已订阅客户端都会收到该消息。 在客户端代码中,变量 c 中的 Chat 实例将接收消息并将其输出到控制台。

重要

传递给 CreateObjectReference 的对象通过 WeakReference<T> 保存,因此如果不存在其他引用,这些对象将被回收。

用户应该为他们不希望回收的每个观察程序保留一个引用。

注意

观察程序在本质上是不可靠的,因为托管观察程序的客户端可能会失败,而恢复后创建的观察程序具有不同的(随机)标识。 如上所述,ObserverManager<TObserver> 依赖于观察程序定期重新订阅,以便可以移除非活动观察程序。

执行模型

IGrainObserver 的实现是通过调用 IGrainFactory.CreateObjectReference 注册的,每次调用该方法都会创建一个指向该实现的新引用。 Orleans 将逐一执行发送到其中每个引用的请求,直至完成。 观察程序是不可重入的,因此对观察程序的并发请求不会由 Orleans 交错处理。 如果有多个观察程序在并发接收请求,这些请求可以并行执行。 观察程序方法的执行不受 AlwaysInterleaveAttributeReentrantAttribute 等属性的影响:开发人员无法自定义执行模型。