Compartilhar via


Canais de transmissão em Orleans

Canais de transmissão são um tipo especial de mecanismo de difusão que pode ser usado para enviar mensagens a todos os assinantes. Ao contrário dos provedores de streaming, os canais de transmissão não são persistentes, não armazenam mensagens e não são uma substituição para fluxos persistentes. Com os canais de transmissão, as granularidades são implicitamente inscritas no canal de transmissão e recebem mensagens de transmissão de um produtor. Isso desassocia o remetente e o receptor da mensagem, o que é útil para cenários em que o remetente e o receptor não são conhecidos com antecedência.

Para usar o canal de transmissão, você deve configurar o Streams Orleans e habilitar a transmissão no seu canal usando o AddBroadcastChannel durante a configuração do silo.

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

Cenário de exemplo

Considere um cenário em que você tem uma granularidade que precisa receber atualizações de preço de ações de um provedor de preços de ações. O provedor de preços de ações é um serviço em segundo plano que publica atualizações de preços de ações em um canal de transmissão. As granularidades são inscritas implicitamente no canal de transmissão e recebem preços atualizados das ações. O diagrama a seguir mostra o cenário:

Diagrama de preços de ações representando um silo, um grão de ações e um cliente consumidor em uma arquitetura de canal de transmissão simples.

No diagrama anterior:

  • O silo publica atualizações de preço das ações no canal de transmissão.
  • A granularidade assina o canal de transmissão e recebe atualizações de preço das ações.
  • O cliente consome as atualizações de preço das ações da granularidade de estoque.

O canal de transmissão desassocia o produtor e o consumidor das atualizações de preços das ações. O produtor publica atualizações de preços das ações no canal de transmissão, e o consumidor assina o canal de transmissão e recebe atualizações de preços das ações.

Definir uma granularidade de consumidor

Para consumir mensagens de canal de transmissão, sua granularidade precisa implementar a interface IOnBroadcastChannelSubscribed. Sua implementação usará o método IBroadcastChannelSubscription.Attach para anexar ao canal de transmissão. O método Attach usa um parâmetro de tipo genérico para o tipo de mensagem que você receberá. O exemplo a seguir mostra uma granularidade que assina um canal de transmissão do tipo Stock:

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

No código anterior:

  • A granularidade LiveStockGrain implementa a interface IOnBroadcastChannelSubscribed.
  • O método OnSubscribed é chamado quando a granularidade assina o canal de transmissão.
  • O parâmetro subscription é usado para chamar o método Attach a ser anexado ao canal de transmissão.
    • O método OnStockUpdated é passado ao Attach como um retorno de chamada que é acionado quando a mensagem Stock é recebida.
    • O método OnError é passado para Attach como um retorno de chamada que é acionado quando ocorre um erro.

Esta granularidade de exemplo conterá os preços das ações mais recentes, conforme publicado no canal de transmissão. Qualquer cliente que pedir a esta granularidade o preço mais recente das ações receberá o preço mais recente do canal de transmissão.

Publicar mensagens em um canal de transmissão

Para publicar mensagens no canal de transmissão, você precisa obter uma referência para o canal de transmissão. Para fazer isso, você precisa obter o IBroadcastChannelProvider do IClusterClient. Com o provedor, você pode chamar o método IBroadcastChannelProvider.GetChannelWriter para obter uma instância de IBroadcastChannelWriter<T>. O gravador é usado para publicar mensagens no canal de transmissão. O exemplo a seguir mostra como publicar mensagens no canal de transmissão:

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

No código anterior:

  • A classe StockWorker é um serviço em segundo plano que publica mensagens no canal de transmissão.
  • O construtor usa IStockClient e IClusterClient como parâmetros.
  • Na instância do cliente do cluster, o método GetBroadcastChannelProvider é usado para obter o provedor de canal de transmissão.
  • Usando IStockClient, a classe StockWorker obtém o preço mais recente das ações para um símbolo de ação.
  • A cada 15 segundos, a classe StockWorker publica uma mensagem Stock no canal de transmissão.

A publicação de mensagens em um canal de transmissão é dissociada do granularidade do consumidor. A granularidade do consumidor assina o canal de transmissão e recebe mensagens do canal de transmissão. O produtor vive em um silo e é responsável por publicar mensagens no canal de transmissão e não sabe nada sobre o consumo de granularidades.

Confira também