Orleans 中的广播频道

广播频道是一种特殊类型的广播机制,可用于向所有订阅者发送消息。 与流式传输提供程序不同,广播频道不是永久性的,并且不存储消息,不能替代永久性的流。 在广播频道中,粒度隐式订阅广播频道并接收来自生产者的广播消息。 这可区分消息的发送者与接收者,如果事先不知道发送者和接收者,会很有用。

若要使用广播通道,应配置 Orleans Streams,然后在接收器配置期间使用 AddBroadcastChannel 在频道上启用广播。

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

示例方案

假设你的粒度需要从股价提供程序接收股价最新信息。 股价提供程序是一种后台服务,可将股价最新信息发布到广播频道。 粒度隐式订阅广播频道并接收最新的股价。 下图显示了以下方案:

股价图,其中显示了简单广播频道体系结构中的接收器、股票粒度和使用的客户端。

在上图中:

  • 接收器将股价最新信息发布到广播频道。
  • 粒度订阅广播频道并接收股价最新信息。
  • 客户端使用来自股票粒度的股价最新信息。

广播频道可区分股价最新信息的生产者和使用者。 生产者向广播频道发布股价最新信息,使用者订阅广播频道并接收股价最新信息。

定义使用者粒度

若要使用广播频道消息,你的粒度需要实现 IOnBroadcastChannelSubscribed 接口。 实现时,将使用 IBroadcastChannelSubscription.Attach 方法以附加到广播频道。 Attach 方法为要接收的消息类型采用泛型类型参数。 以下示例显示订阅 Stock 类型的广播频道的粒度:

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

在上述代码中:

  • LiveStockGrain 粒度实现 IOnBroadcastChannelSubscribed 接口。
  • 粒度订阅广播频道时调用 OnSubscribed 方法。
  • subscription 参数用于调用 Attach 方法以附加到广播频道。
    • OnStockUpdated 方法传递给 Attach,作为收到 Stock 消息时触发的回叫。
    • OnError 方法传递给 Attach,作为发生错误时触发的回叫。

此示例粒度将包含广播频道中发布的最新股价。 任何向此粒度询问最新股价的客户端都会从广播频道获得最新价格。

将消息发布到广播频道

若要向广播频道发布消息,需获得对广播频道的引用。 为此,需从 IClusterClient 获取 IBroadcastChannelProvider。 通过提供程序,可以调用 IBroadcastChannelProvider.GetChannelWriter 方法来获取 IBroadcastChannelWriter<T> 的实例。 编写器用于向广播频道发布消息。 以下示例显示如何将消息发布到广播频道:

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

在上述代码中:

  • StockWorker 类是将消息发布到广播频道的后台服务。
  • 构造函数将 IStockClientIClusterClient 作为参数。
  • 从群集客户端实例中,GetBroadcastChannelProvider 方法用于获取广播频道提供程序。
  • 通过使用 IStockClientStockWorker 类可获取某支股票的最新股价。
  • StockWorker 类每 15 秒向广播频道发布一条 Stock 消息。

将消息发布到广播频道与使用者粒度无关。 使用者粒度订阅广播频道并从广播频道接收消息。 生产者位于接收器,负责向广播频道发布消息,与使用者粒度毫不相干。

另请参阅