Canales de difusión en Orleans
Los canales de difusión son un tipo especial de mecanismo de difusión que se puede usar para enviar mensajes a todos los suscriptores. A diferencia de los proveedores de streaming, los canales de difusión no son persistentes, no almacenan mensajes y no son un reemplazo de las secuencias persistentes. Con los canales de difusión, los granos se suscriben implícitamente al canal de difusión y reciben mensajes de difusión de un productor. Esto desacopla el remitente y el receptor del mensaje, lo que resulta útil para escenarios en los que el remitente y el receptor no se conocen de antemano.
Para usar un canal de difusión, debe configurar secuencias de Orleans y, a continuación, habilitar la difusión en el canal mediante AddBroadcastChannel
durante la configuración de los silos.
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
Escenario de ejemplo
Considere un escenario en el que tiene un grano que necesita recibir actualizaciones de precios de acciones de un proveedor de precios de acciones. El proveedor de precios de acciones es un servicio en segundo plano que publica actualizaciones de precios de acciones en un canal de difusión. Los granos se suscriben implícitamente al canal de difusión y reciben precios de acciones actualizados. El siguiente diagrama muestra los escenarios.
En el diagrama anterior:
- El silo publica actualizaciones de los precios de las acciones en el canal de difusión.
- El grano se suscribe al canal de difusión y recibe actualizaciones de los precios de las acciones.
- El cliente consume las actualizaciones de precios de acciones del grano de acciones.
El canal de difusión desacopla el productor y el consumidor de las actualizaciones de los precios de las acciones. El productor publica actualizaciones de los precios de las acciones en el canal de difusión y el consumidor se suscribe al canal de difusión y recibe actualizaciones de los precios de las acciones.
Definir un grano de consumidor
Para consumir mensajes de canal de difusión, el grano debe implementar la interfaz IOnBroadcastChannelSubscribed. La implementación usará el método IBroadcastChannelSubscription.Attach para adjuntar al canal de difusión. El método Attach
toma un parámetro de tipo genérico para el tipo de mensaje que va a recibir. En el ejemplo siguiente se muestra un grano que se suscribe a un canal de difusión de tipo Stock
:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
En el código anterior:
- El grano
LiveStockGrain
implementa la interfazIOnBroadcastChannelSubscribed
. - Se llama al método
OnSubscribed
cuando el grano se suscribe al canal de difusión. - El parámetro
subscription
se usa para llamar al métodoAttach
para asociarlo al canal de difusión.- El método
OnStockUpdated
se pasa aAttach
como una devolución de llamada que se desencadena cuando se recibe el mensajeStock
. - El método
OnError
se pasa aAttach
como una devolución de llamada que se desencadena cuando se produce un error.
- El método
Este ejemplo de grano contendrá los últimos precios de acciones publicados en el canal de difusión. Cualquier cliente que solicite este grano para el último precio de la acción obtendrá el precio más reciente del canal de difusión.
Publicación de mensajes en un canal de difusión
Para publicar mensajes en el canal de difusión, debe obtener una referencia a él. Para ello, debe obtener el IBroadcastChannelProvider del IClusterClient. Con el proveedor, puede llamar al método IBroadcastChannelProvider.GetChannelWriter para obtener una instancia de IBroadcastChannelWriter<T>. El escritor se usa para publicar mensajes en el canal de difusión. En el siguiente ejemplo se muestra cómo publicar mensajes en el canal de difusión:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
En el código anterior:
- La clase
StockWorker
es un servicio en segundo plano que publica mensajes en el canal de difusión. - El constructor toma
IStockClient
y IClusterClient como parámetros. - Desde la instancia de cliente del clúster, el método GetBroadcastChannelProvider se usa para obtener el proveedor de canales de difusión.
- Con
IStockClient
, la claseStockWorker
obtiene el último precio de acción de un símbolo de acción. - Cada 15 segundos, la clase
StockWorker
publica un mensajeStock
en el canal de difusión.
La publicación de mensajes en un canal de difusión se desacopla del grano del consumidor. El grano del consumidor se suscribe al canal de difusión y recibe mensajes del canal de difusión. El productor vive en un silo y es responsable de publicar mensajes en el canal de difusión y no sabe nada sobre el consumo de granos.