观察程序
在某些情况下,简单的消息/响应模式是不够的,客户端需要接收异步通知。 例如,用户可能希望在好友发布新的即时消息时收到通知。
客户端观察程序是可以异步通知客户端的机制。 观察者接口必须从 IGrainObserver 继承,并且所有方法都必须返回 void
、Task、Task<TResult>、ValueTask 或 ValueTask<TResult>。 不建议使用返回类型 void
,因为它可能会鼓励在实现上使用 async void
,这是一种危险的模式,因为如果从方法引发异常,则可能会导致应用程序崩溃。 相反,对于尽力通知方案,请考虑将 OneWayAttribute 应用于观察者的接口方法。 这将导致接收方不发送方法调用的响应,并将导致方法在调用站点立即返回,而无需等待观察者的响应。 粒度通过像调用任何粒度接口方法一样来调用观察者的方法。 Orleans 运行时将确保传递请求和响应。 观察程序的一个常见用例是,在 Orleans 应用程序中发生事件时登记客户端以接收通知。 发布此类通知的 grain 应提供一个 API 用于添加或删除观察程序。 此外,公开一个允许取消现有订阅的方法通常会很方便。
grain 开发人员可以使用实用工具类(例如 ObserverManager<TObserver>)来简化观察到的 grain 类型的开发。 与失败后根据需要自动重新激活的 grain 不同,客户端不可容错:失败的客户端可能永远无法恢复。
因此,ObserverManager<T>
实用工具会在配置的持续时间后删除订阅。 处于活动状态的客户端应在计时器上重新订阅,使其订阅保持活动状态。
若要订阅通知,客户端必须首先创建一个实现观察程序接口的本地对象。 然后它调用观察程序工厂上的方法 CreateObjectReference` 以将对象转换为 grain 引用,然后可将其传递给通知方 grain 上的订阅方法。
此模型也可由其他 grain 用来接收异步通知。 grain 还可以实现 IGrainObserver 接口。 与客户端订阅情况不同,订阅方 grain 只会实现观察程序接口并传入对其自身的引用(例如 this.AsReference<IMyGrainObserverInterface>()
)。 不需要 CreateObjectReference()
,因为 grain 已经可寻址。
代码示例
假设有一个定期向客户端发送消息的 grain。 为简单起见,本示例中的消息是一个字符串。 我们首先在客户端上定义用于接收消息的接口。
该接口类似于
public interface IChat : IGrainObserver
{
Task ReceiveMessage(string message);
}
唯一的特殊之处是该接口应继承自 IGrainObserver
。
现在,任何想要观察这些消息的客户端都应该实现一个类来实现 IChat
。
最简单的案例如下所示:
public class Chat : IChat
{
public Task ReceiveMessage(string message)
{
Console.WriteLine(message);
return Task.CompletedTask;
}
}
在服务器上,接下来应有一个将这些聊天消息发送到客户端的 grain。 该 grain 还应为客户端提供一个机制,让客户端自行订阅和取消订阅通知。 对于订阅,粒度可以使用实用工具类 ObserverManager<TObserver> 的实例。
注意
ObserverManager<TObserver> 在 7.0 版本后是 Orleans 的一部分。 对于低版本,可以复制以下实现。
class HelloGrain : Grain, IHello
{
private readonly ObserverManager<IChat> _subsManager;
public HelloGrain(ILogger<HelloGrain> logger)
{
_subsManager =
new ObserverManager<IChat>(
TimeSpan.FromMinutes(5), logger);
}
// Clients call this to subscribe.
public Task Subscribe(IChat observer)
{
_subsManager.Subscribe(observer, observer);
return Task.CompletedTask;
}
//Clients use this to unsubscribe and no longer receive messages.
public Task UnSubscribe(IChat observer)
{
_subsManager.Unsubscribe(observer);
return Task.CompletedTask;
}
}
若要向客户端发送消息,可以使用 ObserverManager<IChat>
实例的 Notify
方法。 该方法采用 Action<T>
方法或 lambda 表达式(此处 T
的类型为 IChat
)。 可以调用接口上的任何方法将消息发送到客户端。 本例中只有一个方法,即 ReceiveMessage
,服务器上的发送方代码如下所示:
public Task SendUpdateMessage(string message)
{
_subsManager.Notify(s => s.ReceiveMessage(message));
return Task.CompletedTask;
}
现在我们的服务器有一个向观察程序客户端发送消息的方法,两个订阅/取消订阅方法,客户端已实现一个能够观察 grain 消息的类。 最后一步是使用先前实现的 Chat
类在客户端上创建一个观察程序引用,并让它在订阅后接收消息。
代码如下所示:
//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();
//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);
//Subscribe the instance to receive messages.
await friend.Subscribe(obj);
现在,每当服务器上的 grain 调用 SendUpdateMessage
方法时,所有已订阅客户端都会收到该消息。 在客户端代码中,变量 c
中的 Chat
实例将接收消息并将其输出到控制台。
重要
传递给 CreateObjectReference
的对象通过 WeakReference<T> 保存,因此如果不存在其他引用,这些对象将被回收。
用户应该为他们不希望回收的每个观察程序保留一个引用。
注意
观察程序在本质上是不可靠的,因为托管观察程序的客户端可能会失败,而恢复后创建的观察程序具有不同的(随机)标识。 如上所述,ObserverManager<TObserver> 依赖于观察程序定期重新订阅,以便可以移除非活动观察程序。
执行模型
IGrainObserver 的实现是通过调用 IGrainFactory.CreateObjectReference 注册的,每次调用该方法都会创建一个指向该实现的新引用。 Orleans 将逐一执行发送到其中每个引用的请求,直至完成。 观察程序是不可重入的,因此对观察程序的并发请求不会由 Orleans 交错处理。 如果有多个观察程序在并发接收请求,这些请求可以并行执行。 观察程序方法的执行不受 AlwaysInterleaveAttribute 或 ReentrantAttribute 等属性的影响:开发人员无法自定义执行模型。