Compartir vía


Canales de difusión en Orleans

Los canales de difusión son un tipo especial de mecanismo de difusión que se puede usar para enviar mensajes a todos los suscriptores. A diferencia de los proveedores de streaming, los canales de difusión no son persistentes, no almacenan mensajes y no son un reemplazo de las secuencias persistentes. Con los canales de difusión, los granos se suscriben implícitamente al canal de difusión y reciben mensajes de difusión de un productor. Esto desacopla el remitente y el receptor del mensaje, lo que resulta útil para escenarios en los que el remitente y el receptor no se conocen de antemano.

Para usar un canal de difusión, debe configurar secuencias de Orleans y, a continuación, habilitar la difusión en el canal mediante AddBroadcastChannel durante la configuración de los silos.

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

Escenario de ejemplo

Considere un escenario en el que tiene un grano que necesita recibir actualizaciones de precios de acciones de un proveedor de precios de acciones. El proveedor de precios de acciones es un servicio en segundo plano que publica actualizaciones de precios de acciones en un canal de difusión. Los granos se suscriben implícitamente al canal de difusión y reciben precios de acciones actualizados. El siguiente diagrama muestra los escenarios.

Diagrama de precios de acciones en el que se describe un silo, un grano de acciones y un cliente de consumo en una arquitectura de canal de difusión sencilla.

En el diagrama anterior:

  • El silo publica actualizaciones de los precios de las acciones en el canal de difusión.
  • El grano se suscribe al canal de difusión y recibe actualizaciones de los precios de las acciones.
  • El cliente consume las actualizaciones de precios de acciones del grano de acciones.

El canal de difusión desacopla el productor y el consumidor de las actualizaciones de los precios de las acciones. El productor publica actualizaciones de los precios de las acciones en el canal de difusión y el consumidor se suscribe al canal de difusión y recibe actualizaciones de los precios de las acciones.

Definir un grano de consumidor

Para consumir mensajes de canal de difusión, el grano debe implementar la interfaz IOnBroadcastChannelSubscribed. La implementación usará el método IBroadcastChannelSubscription.Attach para adjuntar al canal de difusión. El método Attach toma un parámetro de tipo genérico para el tipo de mensaje que va a recibir. En el ejemplo siguiente se muestra un grano que se suscribe a un canal de difusión de tipo Stock:

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

En el código anterior:

  • El grano LiveStockGrain implementa la interfaz IOnBroadcastChannelSubscribed.
  • Se llama al método OnSubscribed cuando el grano se suscribe al canal de difusión.
  • El parámetro subscription se usa para llamar al método Attach para asociarlo al canal de difusión.
    • El método OnStockUpdated se pasa a Attach como una devolución de llamada que se desencadena cuando se recibe el mensaje Stock.
    • El método OnError se pasa a Attach como una devolución de llamada que se desencadena cuando se produce un error.

Este ejemplo de grano contendrá los últimos precios de acciones publicados en el canal de difusión. Cualquier cliente que solicite este grano para el último precio de la acción obtendrá el precio más reciente del canal de difusión.

Publicación de mensajes en un canal de difusión

Para publicar mensajes en el canal de difusión, debe obtener una referencia a él. Para ello, debe obtener el IBroadcastChannelProvider del IClusterClient. Con el proveedor, puede llamar al método IBroadcastChannelProvider.GetChannelWriter para obtener una instancia de IBroadcastChannelWriter<T>. El escritor se usa para publicar mensajes en el canal de difusión. En el siguiente ejemplo se muestra cómo publicar mensajes en el canal de difusión:

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

En el código anterior:

  • La clase StockWorker es un servicio en segundo plano que publica mensajes en el canal de difusión.
  • El constructor toma IStockClient y IClusterClient como parámetros.
  • Desde la instancia de cliente del clúster, el método GetBroadcastChannelProvider se usa para obtener el proveedor de canales de difusión.
  • Con IStockClient, la clase StockWorker obtiene el último precio de acción de un símbolo de acción.
  • Cada 15 segundos, la clase StockWorker publica un mensaje Stock en el canal de difusión.

La publicación de mensajes en un canal de difusión se desacopla del grano del consumidor. El grano del consumidor se suscribe al canal de difusión y recibe mensajes del canal de difusión. El productor vive en un silo y es responsable de publicar mensajes en el canal de difusión y no sabe nada sobre el consumo de granos.

Consulte también