Dela via


Sända kanaler i Orleans

Sändningskanaler är en särskild typ av sändningsmekanism som kan användas för att skicka meddelanden till alla prenumeranter. Till skillnad från strömningsleverantörer är sändningskanaler inte beständiga och lagrar inte meddelanden, och de ersätter inte beständiga strömmar. Med sändningskanaler prenumererar korn implicit på sändningskanalen och tar emot sändningsmeddelanden från en producent. Detta frikopplar avsändaren och mottagaren av meddelandet, vilket är användbart för scenarier där avsändaren och mottagaren inte är kända i förväg.

Om du vill använda broadcast-kanalen bör du konfigurera Orleans Strömmar och sedan aktivera sändning på kanalen med hjälp av AddBroadcastChannel under silokonfigurationen.

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

Exempelscenario

Tänk dig ett scenario där du har ett korn som behöver få aktiekursuppdateringar från en aktiekursleverantör. Aktiekursleverantören är en bakgrundstjänst som publicerar aktiekursuppdateringar till en sändningskanal. Korn prenumererar implicit på sändningskanalen och får uppdaterade aktiekurser. Följande diagram visar scenariot:

Aktiekursdiagram som visar en silo, en lagerkorns- och konsumtionsklient i en enkel broadcast-kanalarkitektur.

I diagrammet ovan händer följande:

  • Silon publicerar aktiekursuppdateringar till sändningskanalen.
  • Kornet prenumererar på sändningskanalen och får aktiekursuppdateringar.
  • Klienten förbrukar aktiekursuppdateringarna från börsen.

Sändningskanalen frikopplar producenten och konsumenten av aktiekursuppdateringarna. Producenten publicerar aktiekursuppdateringar till sändningskanalen och konsumenten prenumererar på sändningskanalen och får aktiekursuppdateringar.

Definiera ett konsumentintervall

Om du vill använda broadcast channel-meddelanden måste ditt korn implementera IOnBroadcastChannelSubscribed gränssnittet. Implementeringen använder metoden IBroadcastChannelSubscription.Attach för att ansluta till sändningskanalen. Metoden Attach tar en parameter av allmän typ för den meddelandetyp som du ska ta emot. I följande exempel visas ett korn som prenumererar på en sändningskanal av typen Stock:

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

I koden ovan:

  • Kornet LiveStockGrain implementerar IOnBroadcastChannelSubscribed gränssnittet.
  • Metoden OnSubscribed anropas när kornet prenumererar på sändningskanalen.
  • Parametern subscription används för att anropa metoden Attach som ska kopplas till sändningskanalen.
    • Metoden OnStockUpdated skickas till Attach som ett återanrop som utlöses när meddelandet Stock tas emot.
    • Metoden OnError skickas till Attach som ett återanrop som utlöses när ett fel inträffar.

Det här exemplet innehåller de senaste aktiekurserna som publicerats på sändningskanalen. Alla klienter som frågar detta korn för den senaste aktiekursen får det senaste priset från sändningskanalen.

Publicera meddelanden till en sändningskanal

Om du vill publicera meddelanden till sändningskanalen måste du hämta en referens till sändningskanalen. För att göra detta måste du hämta IBroadcastChannelProvider från IClusterClient. Med providern kan du anropa IBroadcastChannelProvider.GetChannelWriter metoden för att hämta en instans av IBroadcastChannelWriter<T>. Skrivaren används för att publicera meddelanden till sändningskanalen. I följande exempel visas hur du publicerar meddelanden till sändningskanalen:

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

I koden ovan:

  • Klassen StockWorker är en bakgrundstjänst som publicerar meddelanden till sändningskanalen.
  • Konstruktorn tar en IStockClient och IClusterClient som-parametrar.
  • Från klusterklientinstansen GetBroadcastChannelProvider används metoden för att hämta broadcast-kanalprovidern.
  • Med hjälp av IStockClientfår StockWorker klassen den senaste aktiekursen för en aktiesymbol.
  • Var 15:e sekund StockWorker publicerar klassen ett Stock meddelande till sändningskanalen.

Publiceringen av meddelanden till en sändningskanal frikopplas från konsumentintervallet. Konsumentkornet prenumererar på sändningskanalen och tar emot meddelanden från sändningskanalen. Producenten bor i en silo och ansvarar för att publicera meddelanden till sändningskanalen och vet ingenting om att konsumera korn.

Se även