Partager via


Canaux de diffusion dans Orleans

Les canaux de diffusion constituent un type spécial de mécanisme de diffusion que vous pouvez utiliser pour envoyer des messages à tous les abonnés. Contrairement aux fournisseurs de diffusion en continu, les canaux de diffusion ne sont pas persistants et ne stockent pas les messages et ils ne remplacent pas des flux persistants. Avec les canaux de diffusion, les grains sont implicitement abonnés au canal de diffusion et reçoivent les messages de diffusion d’un producteur. Cela dissocie l’expéditeur et le destinataire du message, ce qui est utile dans les scénarios où l’expéditeur et le destinataire ne sont pas connus d’avance.

Pour utiliser le canal de diffusion, vous devez configurer les Flux Orleans, puis activer la diffusion sur votre canal à l’aide de AddBroadcastChannel pendant la configuration du silo.

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

Exemple de scénario

Prenons l’exemple d’un scénario dans lequel vous avez un grain qui doit recevoir des mises à jour du cours des actions à partir d’un fournisseur de valeurs boursières. Le fournisseur de valeurs boursières est un service en arrière-plan qui publie des mises à jour du cours des actions sur un canal de diffusion. Les grains sont implicitement abonnés au canal de diffusion et reçoivent des cours boursiers mis à jour. Le diagramme suivant illustre le scénario :

Diagramme des cours boursiers illustrant un silo, un grain boursier un client consommateur dans une architecture de canal de diffusion simple.

Dans le diagramme ci-dessus :

  • Le silo publie des mises à jour du cours des actions sur la canal de diffusion.
  • Le grain s’abonne au canal de diffusion et reçoit les mises à jour de valeurs boursières.
  • Le client consomme les mises à jour du cours des actions à partir du grain d’actions.

Le canal de diffusion dissocie le producteur et le consommateur des mises à jour des valeurs boursières. Le producteur publie des mises à jour du cours des actions sur le canal de diffusion et le consommateur s’abonne au canal de diffusion et reçoit des mises à jour des valeurs boursières.

Définir un grain de consommateur

Pour consommer des messages de canal de diffusion, votre grain doit implémenter l’interface IOnBroadcastChannelSubscribed. Votre implémentation va utiliser la méthode IBroadcastChannelSubscription.Attach pour l’attachement au canal de diffusion. La méthode Attach accepte un paramètre de type générique pour le type de message que vous allez recevoir. L’exemple suivant montre un grain qui s’abonne à un canal de diffusion de type Stock :

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

Dans le code précédent :

  • Grain LiveStockGrain qui implémente l’interface IOnBroadcastChannelSubscribed.
  • La méthode OnSubscribed est appelée lorsque le grain s’abonne au canal de diffusion.
  • Le paramètre subscription est utilisé pour appeler la méthode Attach à attacher au canal de diffusion.
    • La méthode OnStockUpdated est passée à Attach en tant que rappel qui se déclenche lorsque le message Stock est reçu.
    • La méthode OnError est passée à Attach en tant que rappel qui se déclenche lorsqu’une erreur se produit.

Cet exemple de grain va contenir les derniers cours boursiers tels qu’ils sont publiés sur le canal de diffusion. Tout client qui demande ce grain pour le dernier cours des actions va obtenir le dernier prix du canal de diffusion.

Publier des messages sur un canal de diffusion

Si vous souhaitez publier des messages sur le canal de diffusion, vous devez obtenir une référence au canal de diffusion. Pour ce faire, vous devez obtenir le IBroadcastChannelProvider à partir de IClusterClient. Avec le fournisseur, vous pouvez appeler la méthode IBroadcastChannelProvider.GetChannelWriter pour obtenir une instance de IBroadcastChannelWriter<T>. L’enregistreur est utilisé pour publier des messages sur le canal de diffusion. L’exemple suivant décrit comment publier des messages sur le canal de diffusion :

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

Dans le code précédent :

  • La classe StockWorker est un service en arrière-plan qui publie des messages sur le canal de diffusion.
  • Le constructeur prend une valeur IStockClient et IClusterClient comme paramètres.
  • À partir de l’instance client de cluster, la méthode GetBroadcastChannelProvider est utilisée pour obtenir le fournisseur du canal de diffusion.
  • En utilisant le IStockClient, la classe StockWorker obtient le dernier cours boursier d’un symbole d’action.
  • Toutes les 15 secondes, la classe StockWorker publie un message Stock sur le canal de diffusion.

La publication de messages sur un canal de diffusion est dissociée du grain de consommateur. Le grain de consommateur s’abonne au canal de diffusion et reçoit des messages du canal de diffusion. Le producteur vit dans un silo et est chargé de la publication de messages sur le canal de diffusion et ne sait rien sur la consommation de grains.

Voir aussi