Sdílet prostřednictvím


Vysílání kanálů v Orleans

Kanály vysílání jsou speciálním typem mechanismu vysílání, který lze použít k odesílání zpráv všem odběratelům. Na rozdíl od poskytovatelů streamování nejsou kanály vysílání trvalé a neukládají zprávy a nejsou náhradou za trvalé streamy. Při vysílání kanálů se zrnka implicitně přihlašují k odběru kanálu vysílání a přijímají zprávy vysílání od producenta. Tím se oddělí odesílatel a příjemce zprávy, což je užitečné ve scénářích, kdy odesílatel a příjemce nejsou předem známy.

Pokud chcete použít kanál vysílání, měli byste nakonfigurovat Orleans streamy a pak povolit vysílání ve vašem kanálu pomocí AddBroadcastChannel konfigurace silo.

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

Ukázkový scénář

Představte si scénář, ve kterém máte zrno, které potřebuje dostávat aktualizace cen akcií od poskytovatele cen akcií. Poskytovatel cen akcií je služba na pozadí, která publikuje aktualizace cen akcií do kanálu vysílání. Zrnka se implicitně přihlašují k vysílání kanálu a přijímají aktualizované ceny akcií. Následující diagram znázorňuje scénář:

Diagram cen akcií znázorňující sila, skladové zrnko a využívání klienta v jednoduché architektuře kanálu vysílání

V předchozím diagramu:

  • Silo publikuje aktualizace cen akcií do kanálu vysílání.
  • Podrobné odběry kanálu vysílání a obdrží aktualizace cen akcií.
  • Klient využívá aktualizace cen akcií z agregačního intervalu.

Kanál vysílání odděluje producenta a spotřebitele aktualizací ceny akcií. Producent publikuje aktualizace cen akcií do kanálu vysílání a příjemce se přihlásí k odběru kanálu vysílání a obdrží aktualizace cen akcií.

Definování agregační intervalu pro spotřebitele

Pokud chcete využívat zprávy kanálu všesměrového vysílání, je potřeba implementovat IOnBroadcastChannelSubscribed rozhraní. Vaše implementace použije metodu IBroadcastChannelSubscription.Attach pro připojení k kanálu vysílání. Metoda Attach přebírá parametr obecného typu pro typ zprávy, který budete přijímat. Následující příklad ukazuje zrno, které se přihlásí k odběru kanálu vysílání typu Stock:

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

V předchozím kódu:

  • Agregační LiveStockGrain interval implementuje IOnBroadcastChannelSubscribed rozhraní.
  • Metoda OnSubscribed je volána, když se agregační kanál přihlásí k odběru kanálu vysílání.
  • Parametr subscription se používá k volání Attach metody pro připojení k kanálu vysílání.
    • Metoda OnStockUpdated se předá Attach jako zpětné volání, které se aktivuje při Stock přijetí zprávy.
    • Metoda OnError se předá Attach jako zpětné volání, které se aktivuje, když dojde k chybě.

Tento příklad agregační interval bude obsahovat nejnovější ceny akcií, které jsou publikovány v kanálu vysílání. Každý klient, který požádá o toto zrno o nejnovější cenu akcií, získá nejnovější cenu z vysílání kanálu.

Publikování zpráv do kanálu všesměrového vysílání

Pokud chcete publikovat zprávy do kanálu vysílání, musíte získat odkaz na kanál vysílání. Chcete-li to udělat, musíte získat IBroadcastChannelProvider z IClusterClient. S poskytovatelem můžete volat metodu IBroadcastChannelProvider.GetChannelWriter pro získání instance IBroadcastChannelWriter<T>. Zapisovač se používá k publikování zpráv do kanálu vysílání. Následující příklad ukazuje, jak publikovat zprávy do kanálu vysílání:

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

V předchozím kódu:

  • Třída StockWorker je služba na pozadí, která publikuje zprávy do kanálu vysílání.
  • Konstruktor přebírá IStockClient parametry a IClusterClient jako parametry.
  • Z instance klienta clusteru se GetBroadcastChannelProvider metoda používá k získání poskytovatele kanálu všesměrového vysílání.
  • IStockClientPomocí třídy StockWorker získá nejnovější cenu akcií pro symbol akcií.
  • Každých 15 sekund StockWorker třída publikuje Stock zprávu do kanálu vysílání.

Publikování zpráv do kanálu všesměrového vysílání je oddělené od odstupňované od příjemce. Spotřebitel se přihlásí k odběru kanálu všesměrového vysílání a přijímá zprávy z kanálu vysílání. Producent žije v silu a zodpovídá za publikování zpráv do vysílání kanálu a neví nic o spotřebě zrn.

Viz také