Orleans 中的廣播頻道
廣播頻道是一種特殊的廣播機制,可用來將訊息傳送給所有訂閱者。 不同於串流提供者,廣播頻道不是持續性的,也不會儲存訊息,而且不是持續性串流的替代方案。 透過廣播頻道,Grain 會隱含地訂閱廣播通道,並接收來自產生者的廣播訊息。 這會將訊息的寄件者和接收者分離,適用於事先不知道寄件者和接收者是誰的情況。
若要使用廣播通道,您應該設定 Orleans Streams,然後在定址接收器設定期間,使用 AddBroadcastChannel
在通道上啟用廣播。
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
範例案例
假設您有需要從股票價格提供者接收股票價格更新的 Grain。 股票價格提供者是將股票價格更新發佈至廣播頻道的背景服務。 Grain 會隱含訂閱廣播頻道,並接收更新的股票價格。 下圖顯示各個狀況:
在上圖中:
- 定址接收器會將股票價格更新發佈至廣播頻道。
- Grain 會訂閱廣播頻道,並接收股票價格更新。
- 用戶端會從股票 Grain 取用股票價格更新。
廣播頻道會將股票價格更新的產生者和取用者分離。 產生者會將股票價格更新發佈至廣播頻道,而取用者會訂閱廣播頻道並接收股票價格更新。
定義取用者 Grain
若要取用廣播頻道訊息,您的 Grain 必須實作 IOnBroadcastChannelSubscribed 介面。 您的實作會使用 IBroadcastChannelSubscription.Attach 方法附加至廣播頻道。 Attach
方法會針對您要接收的訊息型別採用泛型型別參數。 下列範例顯示訂閱 Stock
型別廣播頻道的 Grain:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
在上述程式碼中:
LiveStockGrain
Grain 會實作IOnBroadcastChannelSubscribed
介面。- 當 Grain 訂閱廣播頻道時,就會呼叫
OnSubscribed
方法。 subscription
參數會用來呼叫Attach
方法,以附加至廣播頻道。OnStockUpdated
方法會當做收到Stock
訊息時引發的回呼傳遞至Attach
。OnError
方法會當做發生錯誤時引發的回呼傳遞至Attach
。
此範例 Grain 會包含廣播頻道上發佈的最新股票價格。 任何向此 Grain 詢問最新股票價格的用戶端,都會從廣播頻道取得最新的價格。
將訊息發佈至廣播頻道
若要將訊息發佈至廣播頻道,您需要取得廣播頻道的參考。 若要這樣做,您必須從 IClusterClient 取得 IBroadcastChannelProvider。 透過提供者,您可以呼叫 IBroadcastChannelProvider.GetChannelWriter 方法來取得 IBroadcastChannelWriter<T> 執行個體。 寫入器可用來將訊息發佈至廣播頻道。 下列範例示範如何將訊息發佈至廣播頻道:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
在上述程式碼中:
StockWorker
類別是將訊息發佈至廣播頻道的背景服務。- 建構函式會採用
IStockClient
和 IClusterClient 作為參數。 - 在叢集用戶端執行個體中,會使用 GetBroadcastChannelProvider 方法來取得廣播頻道提供者。
- 使用
IStockClient
,StockWorker
類別可取得股票代號的最新股票價格。 StockWorker
類別每隔 15 秒就會將Stock
訊息發佈至廣播頻道。
將訊息發佈至廣播頻道會與取用者 Grain 分離。 取用者 Grain 會訂閱廣播頻道,並從廣播頻道接收訊息。 產生者位於定址接收器中,負責將訊息發佈至廣播頻道,而且不知道有關 Grain 的任何事項。