Uitzendkanalen in Orleans
Broadcast-kanalen zijn een speciaal type uitzendmechanisme dat kan worden gebruikt om berichten naar alle abonnees te verzenden. In tegenstelling tot streamingproviders zijn broadcastkanalen niet permanent en worden berichten niet opgeslagen en zijn ze geen vervanging voor permanente streams. Bij broadcastkanalen worden korrels impliciet geabonneerd op het broadcast-kanaal en ontvangen broadcastberichten van een producent. Hierdoor worden de afzender en ontvanger van het bericht losgekoppeld, wat handig is voor scenario's waarin de afzender en ontvanger niet van tevoren bekend zijn.
Als u broadcast-kanaal wilt gebruiken, moet u Streams configureren Orleans en vervolgens broadcast op uw kanaal inschakelen met behulp van de AddBroadcastChannel
siloconfiguratie.
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
Voorbeeldscenario
Overweeg een scenario waarin u een graan hebt dat updates van aandelenkoersen van een aanbieder van aandelenprijzen moet ontvangen. De aanbieder van aandelenprijzen is een achtergrondservice waarmee aandelenkoersupdates worden gepubliceerd naar een broadcast-kanaal. Korrels worden impliciet geabonneerd op het broadcast-kanaal en ontvangen bijgewerkte aandelenkoersen. In het volgende diagram ziet u het scenario:
In het bovenstaande diagram:
- De silo publiceert aandelenkoersupdates naar het broadcast-kanaal.
- Het graan abonneert zich op het broadcast-kanaal en ontvangt updates van de aandelenkoers.
- De klant verbruikt de aandelenkoersupdates van het voorraadgranaat.
Het broadcastkanaal ontkoppelt de producent en consument van de updates van de aandelenkoers. De producent publiceert aandelenkoersupdates naar het broadcast-kanaal en de consument abonneert zich op het broadcast-kanaal en ontvangt updates van de aandelenkoers.
Een consumenteninterval definiƫren
Als u broadcast-kanaalberichten wilt gebruiken, moet uw korrel de IOnBroadcastChannelSubscribed interface implementeren. Uw implementatie gebruikt de IBroadcastChannelSubscription.Attach methode om te koppelen aan het broadcast-kanaal. De Attach
methode gebruikt een algemene parameter voor het berichttype dat u gaat ontvangen. In het volgende voorbeeld ziet u een korrel die is geabonneerd op een uitzendkanaal van het type Stock
:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
In de voorgaande code:
- De
LiveStockGrain
korrel implementeert deIOnBroadcastChannelSubscribed
interface. - De
OnSubscribed
methode wordt aangeroepen wanneer de grain zich abonneert op het broadcast-kanaal. - De
subscription
parameter wordt gebruikt om deAttach
methode aan te roepen die aan het broadcast-kanaal moet worden gekoppeld.- De
OnStockUpdated
methode wordt doorgegeven als een callback die wordt geactiveerdAttach
wanneer hetStock
bericht wordt ontvangen. - De
OnError
methode wordt doorgegeven als een callback die wordt geactiveerdAttach
wanneer er een fout optreedt.
- De
Dit voorbeeld bevat de meest recente aandelenkoersen zoals gepubliceerd op het broadcast-kanaal. Elke klant die dit graan vraagt voor de meest recente aandelenkoers, krijgt de meest recente prijs van het broadcast-kanaal.
Berichten publiceren naar een broadcast-kanaal
Als u berichten wilt publiceren naar het broadcast-kanaal, moet u een verwijzing naar het broadcast-kanaal ophalen. Om dit te doen, moet u de IBroadcastChannelProvider .IClusterClient Met de provider kunt u de IBroadcastChannelProvider.GetChannelWriter methode aanroepen om een exemplaar van IBroadcastChannelWriter<T>. De schrijver wordt gebruikt om berichten te publiceren naar het broadcast-kanaal. In het volgende voorbeeld ziet u hoe u berichten publiceert naar het broadcast-kanaal:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
In de voorgaande code:
- De
StockWorker
klasse is een achtergrondservice waarmee berichten worden gepubliceerd naar het broadcast-kanaal. - De constructor neemt een
IStockClient
en IClusterClient als parameters. - Vanuit het clusterclientexemplaren wordt de GetBroadcastChannelProvider methode gebruikt om de broadcastkanaalprovider op te halen.
- Met behulp van de
IStockClient
klasse krijgt deStockWorker
meest recente aandelenkoers voor een aandelensymbool. - Elke 15 seconden publiceert de
StockWorker
klasse eenStock
bericht naar het broadcast-kanaal.
Het publiceren van berichten naar een broadcast-kanaal wordt losgekoppeld van het verbruiksinterval. Het consumenteninterval abonneert zich op het broadcast-kanaal en ontvangt berichten van het broadcast-kanaal. De producent woont in een silo en is verantwoordelijk voor het publiceren van berichten naar het broadcast-kanaal en weet niets over het verbruik van korrels.