Canais de transmissão em Orleans
Os canais de transmissão são um tipo especial de mecanismo de transmissão que pode ser usado para enviar mensagens a todos os assinantes. Ao contrário dos provedores de streaming, os canais de transmissão não são persistentes e não armazenam mensagens, além de não substituírem os fluxos persistentes. Com os canais de transmissão, os grãos são implicitamente inscritos no canal de transmissão e recebem mensagens de transmissão de um produtor. Isso separa o emissor e o recetor da mensagem, o que é útil para cenários em que o emissor e o recetor não são conhecidos antecipadamente.
Para usar o canal de transmissão, você deve configurar Orleans Streams e, em seguida, habilitar a transmissão em seu canal usando a configuração durante o AddBroadcastChannel
silo.
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
Cenário de exemplo
Considere um cenário em que você tem um grão que precisa receber atualizações do preço das ações de um provedor de preços de ações. O provedor de preço de ações é um serviço em segundo plano que publica atualizações de preços de ações em um canal de transmissão. Os grãos são implicitamente inscritos no canal de transmissão e recebem preços de ações atualizados. O diagrama a seguir mostra o cenário:
No diagrama anterior:
- O silo publica atualizações do preço das ações no canal de transmissão.
- O grão assina o canal de transmissão e recebe atualizações do preço das ações.
- O cliente consome as atualizações de preço do estoque do grão de estoque.
O canal de transmissão separa o produtor e o consumidor das atualizações de preço das ações. O produtor publica atualizações do preço das ações no canal de transmissão, e o consumidor assina o canal de transmissão e recebe atualizações do preço das ações.
Definir um grão de consumidor
Para consumir mensagens do canal de transmissão, seu grão precisa implementar a IOnBroadcastChannelSubscribed interface. Sua implementação usará o IBroadcastChannelSubscription.Attach método para anexar ao canal de transmissão. O Attach
método usa um parâmetro de tipo genérico para o tipo de mensagem que você receberá. O exemplo a seguir mostra um grão que assina um canal de transmissão do tipo Stock
:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
No código anterior:
- O
LiveStockGrain
grão implementa aIOnBroadcastChannelSubscribed
interface. - O
OnSubscribed
método é chamado quando o grão assina o canal de transmissão. - O
subscription
parâmetro é usado para chamar oAttach
método para anexar ao canal de transmissão.- O
OnStockUpdated
método é passado paraAttach
como um retorno de chamada que é acionado quando aStock
mensagem é recebida. - O
OnError
método é passado paraAttach
como um retorno de chamada que é acionado quando ocorre um erro.
- O
Este exemplo de grão conterá os preços mais recentes das ações, conforme publicado no canal de transmissão. Qualquer cliente que peça este grão para o preço mais recente das ações receberá o preço mais recente do canal de transmissão.
Publicar mensagens em um canal de transmissão
Para publicar mensagens no canal de transmissão, você precisa obter uma referência ao canal de transmissão. Para fazer isso, você precisa obter o IBroadcastChannelProvider do IClusterClient. Com o provedor, você pode chamar o IBroadcastChannelProvider.GetChannelWriter método para obter uma instância do IBroadcastChannelWriter<T>. O escritor é usado para publicar mensagens para o canal de transmissão. O exemplo a seguir mostra como publicar mensagens no canal de transmissão:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
No código anterior:
- A
StockWorker
classe é um serviço em segundo plano que publica mensagens para o canal de transmissão. - O construtor usa um
IStockClient
e IClusterClient como parâmetros. - A partir da instância do cliente de cluster, o GetBroadcastChannelProvider método é usado para obter o provedor de canal de transmissão.
- Usando o
IStockClient
, aStockWorker
classe obtém o preço mais recente de uma ação para um símbolo de ação. - A cada 15 segundos, a turma publica
StockWorker
umaStock
mensagem para o canal de transmissão.
A publicação de mensagens em um canal de transmissão é dissociada do grão do consumidor. O consumidor assina o canal de transmissão e recebe mensagens do canal de transmissão. O produtor mora em um silo e é responsável por publicar mensagens para o canal de transmissão e não sabe nada sobre o consumo de grãos.