Vysílání kanálů v Orleans
Kanály vysílání jsou speciálním typem mechanismu vysílání, který lze použít k odesílání zpráv všem odběratelům. Na rozdíl od poskytovatelů streamování nejsou kanály vysílání trvalé a neukládají zprávy a nejsou náhradou za trvalé streamy. Při vysílání kanálů se zrnka implicitně přihlašují k odběru kanálu vysílání a přijímají zprávy vysílání od producenta. Tím se oddělí odesílatel a příjemce zprávy, což je užitečné ve scénářích, kdy odesílatel a příjemce nejsou předem známy.
Pokud chcete použít kanál vysílání, měli byste nakonfigurovat Orleans streamy a pak povolit vysílání ve vašem kanálu pomocí AddBroadcastChannel
konfigurace silo.
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
Ukázkový scénář
Představte si scénář, ve kterém máte zrno, které potřebuje dostávat aktualizace cen akcií od poskytovatele cen akcií. Poskytovatel cen akcií je služba na pozadí, která publikuje aktualizace cen akcií do kanálu vysílání. Zrnka se implicitně přihlašují k vysílání kanálu a přijímají aktualizované ceny akcií. Následující diagram znázorňuje scénář:
V předchozím diagramu:
- Silo publikuje aktualizace cen akcií do kanálu vysílání.
- Podrobné odběry kanálu vysílání a obdrží aktualizace cen akcií.
- Klient využívá aktualizace cen akcií z agregačního intervalu.
Kanál vysílání odděluje producenta a spotřebitele aktualizací ceny akcií. Producent publikuje aktualizace cen akcií do kanálu vysílání a příjemce se přihlásí k odběru kanálu vysílání a obdrží aktualizace cen akcií.
Definování agregační intervalu pro spotřebitele
Pokud chcete využívat zprávy kanálu všesměrového vysílání, je potřeba implementovat IOnBroadcastChannelSubscribed rozhraní. Vaše implementace použije metodu IBroadcastChannelSubscription.Attach pro připojení k kanálu vysílání. Metoda Attach
přebírá parametr obecného typu pro typ zprávy, který budete přijímat. Následující příklad ukazuje zrno, které se přihlásí k odběru kanálu vysílání typu Stock
:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
V předchozím kódu:
- Agregační
LiveStockGrain
interval implementujeIOnBroadcastChannelSubscribed
rozhraní. - Metoda
OnSubscribed
je volána, když se agregační kanál přihlásí k odběru kanálu vysílání. - Parametr
subscription
se používá k voláníAttach
metody pro připojení k kanálu vysílání.- Metoda
OnStockUpdated
se předáAttach
jako zpětné volání, které se aktivuje přiStock
přijetí zprávy. - Metoda
OnError
se předáAttach
jako zpětné volání, které se aktivuje, když dojde k chybě.
- Metoda
Tento příklad agregační interval bude obsahovat nejnovější ceny akcií, které jsou publikovány v kanálu vysílání. Každý klient, který požádá o toto zrno o nejnovější cenu akcií, získá nejnovější cenu z vysílání kanálu.
Publikování zpráv do kanálu všesměrového vysílání
Pokud chcete publikovat zprávy do kanálu vysílání, musíte získat odkaz na kanál vysílání. Chcete-li to udělat, musíte získat IBroadcastChannelProvider z IClusterClient. S poskytovatelem můžete volat metodu IBroadcastChannelProvider.GetChannelWriter pro získání instance IBroadcastChannelWriter<T>. Zapisovač se používá k publikování zpráv do kanálu vysílání. Následující příklad ukazuje, jak publikovat zprávy do kanálu vysílání:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
V předchozím kódu:
- Třída
StockWorker
je služba na pozadí, která publikuje zprávy do kanálu vysílání. - Konstruktor přebírá
IStockClient
parametry a IClusterClient jako parametry. - Z instance klienta clusteru se GetBroadcastChannelProvider metoda používá k získání poskytovatele kanálu všesměrového vysílání.
IStockClient
Pomocí třídyStockWorker
získá nejnovější cenu akcií pro symbol akcií.- Každých 15 sekund
StockWorker
třída publikujeStock
zprávu do kanálu vysílání.
Publikování zpráv do kanálu všesměrového vysílání je oddělené od odstupňované od příjemce. Spotřebitel se přihlásí k odběru kanálu všesměrového vysílání a přijímá zprávy z kanálu vysílání. Producent žije v silu a zodpovídá za publikování zpráv do vysílání kanálu a neví nic o spotřebě zrn.