Orleans 中的广播频道
广播频道是一种特殊类型的广播机制,可用于向所有订阅者发送消息。 与流式传输提供程序不同,广播频道不是永久性的,并且不存储消息,不能替代永久性的流。 在广播频道中,粒度隐式订阅广播频道并接收来自生产者的广播消息。 这可区分消息的发送者与接收者,如果事先不知道发送者和接收者,会很有用。
若要使用广播通道,应配置 Orleans Streams,然后在接收器配置期间使用 AddBroadcastChannel
在频道上启用广播。
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
示例方案
假设你的粒度需要从股价提供程序接收股价最新信息。 股价提供程序是一种后台服务,可将股价最新信息发布到广播频道。 粒度隐式订阅广播频道并接收最新的股价。 下图显示了以下方案:
在上图中:
- 接收器将股价最新信息发布到广播频道。
- 粒度订阅广播频道并接收股价最新信息。
- 客户端使用来自股票粒度的股价最新信息。
广播频道可区分股价最新信息的生产者和使用者。 生产者向广播频道发布股价最新信息,使用者订阅广播频道并接收股价最新信息。
定义使用者粒度
若要使用广播频道消息,你的粒度需要实现 IOnBroadcastChannelSubscribed 接口。 实现时,将使用 IBroadcastChannelSubscription.Attach 方法以附加到广播频道。 Attach
方法为要接收的消息类型采用泛型类型参数。 以下示例显示订阅 Stock
类型的广播频道的粒度:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
在上述代码中:
LiveStockGrain
粒度实现IOnBroadcastChannelSubscribed
接口。- 粒度订阅广播频道时调用
OnSubscribed
方法。 subscription
参数用于调用Attach
方法以附加到广播频道。OnStockUpdated
方法传递给Attach
,作为收到Stock
消息时触发的回叫。OnError
方法传递给Attach
,作为发生错误时触发的回叫。
此示例粒度将包含广播频道中发布的最新股价。 任何向此粒度询问最新股价的客户端都会从广播频道获得最新价格。
将消息发布到广播频道
若要向广播频道发布消息,需获得对广播频道的引用。 为此,需从 IClusterClient 获取 IBroadcastChannelProvider。 通过提供程序,可以调用 IBroadcastChannelProvider.GetChannelWriter 方法来获取 IBroadcastChannelWriter<T> 的实例。 编写器用于向广播频道发布消息。 以下示例显示如何将消息发布到广播频道:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
在上述代码中:
StockWorker
类是将消息发布到广播频道的后台服务。- 构造函数将
IStockClient
和 IClusterClient 作为参数。 - 从群集客户端实例中,GetBroadcastChannelProvider 方法用于获取广播频道提供程序。
- 通过使用
IStockClient
,StockWorker
类可获取某支股票的最新股价。 StockWorker
类每 15 秒向广播频道发布一条Stock
消息。
将消息发布到广播频道与使用者粒度无关。 使用者粒度订阅广播频道并从广播频道接收消息。 生产者位于接收器,负责向广播频道发布消息,与使用者粒度毫不相干。