Canaux de diffusion dans Orleans
Les canaux de diffusion constituent un type spécial de mécanisme de diffusion que vous pouvez utiliser pour envoyer des messages à tous les abonnés. Contrairement aux fournisseurs de diffusion en continu, les canaux de diffusion ne sont pas persistants et ne stockent pas les messages et ils ne remplacent pas des flux persistants. Avec les canaux de diffusion, les grains sont implicitement abonnés au canal de diffusion et reçoivent les messages de diffusion d’un producteur. Cela dissocie l’expéditeur et le destinataire du message, ce qui est utile dans les scénarios où l’expéditeur et le destinataire ne sont pas connus d’avance.
Pour utiliser le canal de diffusion, vous devez configurer les Flux Orleans, puis activer la diffusion sur votre canal à l’aide de AddBroadcastChannel
pendant la configuration du silo.
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
Exemple de scénario
Prenons l’exemple d’un scénario dans lequel vous avez un grain qui doit recevoir des mises à jour du cours des actions à partir d’un fournisseur de valeurs boursières. Le fournisseur de valeurs boursières est un service en arrière-plan qui publie des mises à jour du cours des actions sur un canal de diffusion. Les grains sont implicitement abonnés au canal de diffusion et reçoivent des cours boursiers mis à jour. Le diagramme suivant illustre le scénario :
Dans le diagramme ci-dessus :
- Le silo publie des mises à jour du cours des actions sur la canal de diffusion.
- Le grain s’abonne au canal de diffusion et reçoit les mises à jour de valeurs boursières.
- Le client consomme les mises à jour du cours des actions à partir du grain d’actions.
Le canal de diffusion dissocie le producteur et le consommateur des mises à jour des valeurs boursières. Le producteur publie des mises à jour du cours des actions sur le canal de diffusion et le consommateur s’abonne au canal de diffusion et reçoit des mises à jour des valeurs boursières.
Définir un grain de consommateur
Pour consommer des messages de canal de diffusion, votre grain doit implémenter l’interface IOnBroadcastChannelSubscribed. Votre implémentation va utiliser la méthode IBroadcastChannelSubscription.Attach pour l’attachement au canal de diffusion. La méthode Attach
accepte un paramètre de type générique pour le type de message que vous allez recevoir. L’exemple suivant montre un grain qui s’abonne à un canal de diffusion de type Stock
:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
Dans le code précédent :
- Grain
LiveStockGrain
qui implémente l’interfaceIOnBroadcastChannelSubscribed
. - La méthode
OnSubscribed
est appelée lorsque le grain s’abonne au canal de diffusion. - Le paramètre
subscription
est utilisé pour appeler la méthodeAttach
à attacher au canal de diffusion.- La méthode
OnStockUpdated
est passée àAttach
en tant que rappel qui se déclenche lorsque le messageStock
est reçu. - La méthode
OnError
est passée àAttach
en tant que rappel qui se déclenche lorsqu’une erreur se produit.
- La méthode
Cet exemple de grain va contenir les derniers cours boursiers tels qu’ils sont publiés sur le canal de diffusion. Tout client qui demande ce grain pour le dernier cours des actions va obtenir le dernier prix du canal de diffusion.
Publier des messages sur un canal de diffusion
Si vous souhaitez publier des messages sur le canal de diffusion, vous devez obtenir une référence au canal de diffusion. Pour ce faire, vous devez obtenir le IBroadcastChannelProvider à partir de IClusterClient. Avec le fournisseur, vous pouvez appeler la méthode IBroadcastChannelProvider.GetChannelWriter pour obtenir une instance de IBroadcastChannelWriter<T>. L’enregistreur est utilisé pour publier des messages sur le canal de diffusion. L’exemple suivant décrit comment publier des messages sur le canal de diffusion :
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
Dans le code précédent :
- La classe
StockWorker
est un service en arrière-plan qui publie des messages sur le canal de diffusion. - Le constructeur prend une valeur
IStockClient
et IClusterClient comme paramètres. - À partir de l’instance client de cluster, la méthode GetBroadcastChannelProvider est utilisée pour obtenir le fournisseur du canal de diffusion.
- En utilisant le
IStockClient
, la classeStockWorker
obtient le dernier cours boursier d’un symbole d’action. - Toutes les 15 secondes, la classe
StockWorker
publie un messageStock
sur le canal de diffusion.
La publication de messages sur un canal de diffusion est dissociée du grain de consommateur. Le grain de consommateur s’abonne au canal de diffusion et reçoit des messages du canal de diffusion. Le producteur vit dans un silo et est chargé de la publication de messages sur le canal de diffusion et ne sait rien sur la consommation de grains.