Canais de transmissão em Orleans
Canais de transmissão são um tipo especial de mecanismo de difusão que pode ser usado para enviar mensagens a todos os assinantes. Ao contrário dos provedores de streaming, os canais de transmissão não são persistentes, não armazenam mensagens e não são uma substituição para fluxos persistentes. Com os canais de transmissão, as granularidades são implicitamente inscritas no canal de transmissão e recebem mensagens de transmissão de um produtor. Isso desassocia o remetente e o receptor da mensagem, o que é útil para cenários em que o remetente e o receptor não são conhecidos com antecedência.
Para usar o canal de transmissão, você deve configurar o Streams Orleans e habilitar a transmissão no seu canal usando o AddBroadcastChannel
durante a configuração do silo.
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
Cenário de exemplo
Considere um cenário em que você tem uma granularidade que precisa receber atualizações de preço de ações de um provedor de preços de ações. O provedor de preços de ações é um serviço em segundo plano que publica atualizações de preços de ações em um canal de transmissão. As granularidades são inscritas implicitamente no canal de transmissão e recebem preços atualizados das ações. O diagrama a seguir mostra o cenário:
No diagrama anterior:
- O silo publica atualizações de preço das ações no canal de transmissão.
- A granularidade assina o canal de transmissão e recebe atualizações de preço das ações.
- O cliente consome as atualizações de preço das ações da granularidade de estoque.
O canal de transmissão desassocia o produtor e o consumidor das atualizações de preços das ações. O produtor publica atualizações de preços das ações no canal de transmissão, e o consumidor assina o canal de transmissão e recebe atualizações de preços das ações.
Definir uma granularidade de consumidor
Para consumir mensagens de canal de transmissão, sua granularidade precisa implementar a interface IOnBroadcastChannelSubscribed. Sua implementação usará o método IBroadcastChannelSubscription.Attach para anexar ao canal de transmissão. O método Attach
usa um parâmetro de tipo genérico para o tipo de mensagem que você receberá. O exemplo a seguir mostra uma granularidade que assina um canal de transmissão do tipo Stock
:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
No código anterior:
- A granularidade
LiveStockGrain
implementa a interfaceIOnBroadcastChannelSubscribed
. - O método
OnSubscribed
é chamado quando a granularidade assina o canal de transmissão. - O parâmetro
subscription
é usado para chamar o métodoAttach
a ser anexado ao canal de transmissão.- O método
OnStockUpdated
é passado aoAttach
como um retorno de chamada que é acionado quando a mensagemStock
é recebida. - O método
OnError
é passado paraAttach
como um retorno de chamada que é acionado quando ocorre um erro.
- O método
Esta granularidade de exemplo conterá os preços das ações mais recentes, conforme publicado no canal de transmissão. Qualquer cliente que pedir a esta granularidade o preço mais recente das ações receberá o preço mais recente do canal de transmissão.
Publicar mensagens em um canal de transmissão
Para publicar mensagens no canal de transmissão, você precisa obter uma referência para o canal de transmissão. Para fazer isso, você precisa obter o IBroadcastChannelProvider do IClusterClient. Com o provedor, você pode chamar o método IBroadcastChannelProvider.GetChannelWriter para obter uma instância de IBroadcastChannelWriter<T>. O gravador é usado para publicar mensagens no canal de transmissão. O exemplo a seguir mostra como publicar mensagens no canal de transmissão:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
No código anterior:
- A classe
StockWorker
é um serviço em segundo plano que publica mensagens no canal de transmissão. - O construtor usa
IStockClient
e IClusterClient como parâmetros. - Na instância do cliente do cluster, o método GetBroadcastChannelProvider é usado para obter o provedor de canal de transmissão.
- Usando
IStockClient
, a classeStockWorker
obtém o preço mais recente das ações para um símbolo de ação. - A cada 15 segundos, a classe
StockWorker
publica uma mensagemStock
no canal de transmissão.
A publicação de mensagens em um canal de transmissão é dissociada do granularidade do consumidor. A granularidade do consumidor assina o canal de transmissão e recebe mensagens do canal de transmissão. O produtor vive em um silo e é responsável por publicar mensagens no canal de transmissão e não sabe nada sobre o consumo de granularidades.