Delen via


Uitzendkanalen in Orleans

Broadcast-kanalen zijn een speciaal type uitzendmechanisme dat kan worden gebruikt om berichten naar alle abonnees te verzenden. In tegenstelling tot streamingproviders zijn broadcastkanalen niet permanent en worden berichten niet opgeslagen en zijn ze geen vervanging voor permanente streams. Bij broadcastkanalen worden korrels impliciet geabonneerd op het broadcast-kanaal en ontvangen broadcastberichten van een producent. Hierdoor worden de afzender en ontvanger van het bericht losgekoppeld, wat handig is voor scenario's waarin de afzender en ontvanger niet van tevoren bekend zijn.

Als u broadcast-kanaal wilt gebruiken, moet u Streams configureren Orleans en vervolgens broadcast op uw kanaal inschakelen met behulp van de AddBroadcastChannel siloconfiguratie.

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

Voorbeeldscenario

Overweeg een scenario waarin u een graan hebt dat updates van aandelenkoersen van een aanbieder van aandelenprijzen moet ontvangen. De aanbieder van aandelenprijzen is een achtergrondservice waarmee aandelenkoersupdates worden gepubliceerd naar een broadcast-kanaal. Korrels worden impliciet geabonneerd op het broadcast-kanaal en ontvangen bijgewerkte aandelenkoersen. In het volgende diagram ziet u het scenario:

Diagram met aandelenprijzen met een silo, een voorraadkorrel en verbruikende client in een eenvoudige broadcast-kanaalarchitectuur.

In het bovenstaande diagram:

  • De silo publiceert aandelenkoersupdates naar het broadcast-kanaal.
  • Het graan abonneert zich op het broadcast-kanaal en ontvangt updates van de aandelenkoers.
  • De klant verbruikt de aandelenkoersupdates van het voorraadgranaat.

Het broadcastkanaal ontkoppelt de producent en consument van de updates van de aandelenkoers. De producent publiceert aandelenkoersupdates naar het broadcast-kanaal en de consument abonneert zich op het broadcast-kanaal en ontvangt updates van de aandelenkoers.

Een consumenteninterval definiƫren

Als u broadcast-kanaalberichten wilt gebruiken, moet uw korrel de IOnBroadcastChannelSubscribed interface implementeren. Uw implementatie gebruikt de IBroadcastChannelSubscription.Attach methode om te koppelen aan het broadcast-kanaal. De Attach methode gebruikt een algemene parameter voor het berichttype dat u gaat ontvangen. In het volgende voorbeeld ziet u een korrel die is geabonneerd op een uitzendkanaal van het type Stock:

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

In de voorgaande code:

  • De LiveStockGrain korrel implementeert de IOnBroadcastChannelSubscribed interface.
  • De OnSubscribed methode wordt aangeroepen wanneer de grain zich abonneert op het broadcast-kanaal.
  • De subscription parameter wordt gebruikt om de Attach methode aan te roepen die aan het broadcast-kanaal moet worden gekoppeld.
    • De OnStockUpdated methode wordt doorgegeven als een callback die wordt geactiveerd Attach wanneer het Stock bericht wordt ontvangen.
    • De OnError methode wordt doorgegeven als een callback die wordt geactiveerd Attach wanneer er een fout optreedt.

Dit voorbeeld bevat de meest recente aandelenkoersen zoals gepubliceerd op het broadcast-kanaal. Elke klant die dit graan vraagt voor de meest recente aandelenkoers, krijgt de meest recente prijs van het broadcast-kanaal.

Berichten publiceren naar een broadcast-kanaal

Als u berichten wilt publiceren naar het broadcast-kanaal, moet u een verwijzing naar het broadcast-kanaal ophalen. Om dit te doen, moet u de IBroadcastChannelProvider .IClusterClient Met de provider kunt u de IBroadcastChannelProvider.GetChannelWriter methode aanroepen om een exemplaar van IBroadcastChannelWriter<T>. De schrijver wordt gebruikt om berichten te publiceren naar het broadcast-kanaal. In het volgende voorbeeld ziet u hoe u berichten publiceert naar het broadcast-kanaal:

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

In de voorgaande code:

  • De StockWorker klasse is een achtergrondservice waarmee berichten worden gepubliceerd naar het broadcast-kanaal.
  • De constructor neemt een IStockClient en IClusterClient als parameters.
  • Vanuit het clusterclientexemplaren wordt de GetBroadcastChannelProvider methode gebruikt om de broadcastkanaalprovider op te halen.
  • Met behulp van de IStockClientklasse krijgt de StockWorker meest recente aandelenkoers voor een aandelensymbool.
  • Elke 15 seconden publiceert de StockWorker klasse een Stock bericht naar het broadcast-kanaal.

Het publiceren van berichten naar een broadcast-kanaal wordt losgekoppeld van het verbruiksinterval. Het consumenteninterval abonneert zich op het broadcast-kanaal en ontvangt berichten van het broadcast-kanaal. De producent woont in een silo en is verantwoordelijk voor het publiceren van berichten naar het broadcast-kanaal en weet niets over het verbruik van korrels.

Zie ook