共用方式為


Orleans 中的廣播頻道

廣播頻道是一種特殊的廣播機制,可用來將訊息傳送給所有訂閱者。 不同於串流提供者,廣播頻道不是持續性的,也不會儲存訊息,而且不是持續性串流的替代方案。 透過廣播頻道,Grain 會隱含地訂閱廣播通道,並接收來自產生者的廣播訊息。 這會將訊息的寄件者和接收者分離,適用於事先不知道寄件者和接收者是誰的情況。

若要使用廣播通道,您應該設定 Orleans Streams,然後在定址接收器設定期間,使用 AddBroadcastChannel 在通道上啟用廣播。

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

範例案例

假設您有需要從股票價格提供者接收股票價格更新的 Grain。 股票價格提供者是將股票價格更新發佈至廣播頻道的背景服務。 Grain 會隱含訂閱廣播頻道,並接收更新的股票價格。 下圖顯示各個狀況:

股票價格圖表,描述簡單廣播頻道架構中的定址接收器、股票 Grain 和取用客戶。

在上圖中:

  • 定址接收器會將股票價格更新發佈至廣播頻道。
  • Grain 會訂閱廣播頻道,並接收股票價格更新。
  • 用戶端會從股票 Grain 取用股票價格更新。

廣播頻道會將股票價格更新的產生者和取用者分離。 產生者會將股票價格更新發佈至廣播頻道,而取用者會訂閱廣播頻道並接收股票價格更新。

定義取用者 Grain

若要取用廣播頻道訊息,您的 Grain 必須實作 IOnBroadcastChannelSubscribed 介面。 您的實作會使用 IBroadcastChannelSubscription.Attach 方法附加至廣播頻道。 Attach 方法會針對您要接收的訊息型別採用泛型型別參數。 下列範例顯示訂閱 Stock 型別廣播頻道的 Grain:

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

在上述程式碼中:

  • LiveStockGrain Grain 會實作 IOnBroadcastChannelSubscribed 介面。
  • 當 Grain 訂閱廣播頻道時,就會呼叫 OnSubscribed 方法。
  • subscription 參數會用來呼叫 Attach 方法,以附加至廣播頻道。
    • OnStockUpdated 方法會當做收到 Stock 訊息時引發的回呼傳遞至 Attach
    • OnError 方法會當做發生錯誤時引發的回呼傳遞至 Attach

此範例 Grain 會包含廣播頻道上發佈的最新股票價格。 任何向此 Grain 詢問最新股票價格的用戶端,都會從廣播頻道取得最新的價格。

將訊息發佈至廣播頻道

若要將訊息發佈至廣播頻道,您需要取得廣播頻道的參考。 若要這樣做,您必須從 IClusterClient 取得 IBroadcastChannelProvider。 透過提供者,您可以呼叫 IBroadcastChannelProvider.GetChannelWriter 方法來取得 IBroadcastChannelWriter<T> 執行個體。 寫入器可用來將訊息發佈至廣播頻道。 下列範例示範如何將訊息發佈至廣播頻道:

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

在上述程式碼中:

  • StockWorker 類別是將訊息發佈至廣播頻道的背景服務。
  • 建構函式會採用 IStockClientIClusterClient 作為參數。
  • 在叢集用戶端執行個體中,會使用 GetBroadcastChannelProvider 方法來取得廣播頻道提供者。
  • 使用 IStockClientStockWorker 類別可取得股票代號的最新股票價格。
  • StockWorker 類別每隔 15 秒就會將 Stock 訊息發佈至廣播頻道。

將訊息發佈至廣播頻道會與取用者 Grain 分離。 取用者 Grain 會訂閱廣播頻道,並從廣播頻道接收訊息。 產生者位於定址接收器中,負責將訊息發佈至廣播頻道,而且不知道有關 Grain 的任何事項。

另請參閱