Übertragungskanäle in Orleans
Übertragungskanäle sind ein spezieller Übertragungsmechanismus, der zum Senden von Nachrichten an alle Abonnenten verwendet werden kann. Im Gegensatz zu Streaminganbietern sind Übertragungskanäle nicht persistent und speichern keine Nachrichten. Sie sind daher kein Ersatz für persistente Streams. Bei Übertragungskanälen werden Grains implizit für den Übertragungskanal abonniert und empfangen Übertragungsnachrichten von einem Producer. Dadurch werden Absender und Empfänger der Nachricht entkoppelt, was für Szenarien nützlich ist, in denen Absender und Empfänger nicht im Voraus bekannt sind.
Um den Übertragungskanal zu verwenden, sollten Sie Orleans Streams konfigurieren und dann die Übertragung auf Ihrem Kanal mithilfe von AddBroadcastChannel
während der Silokonfiguration aktivieren.
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
Beispielszenario
Betrachten Sie ein Szenario, in dem Sie über einen Grain verfügen, der Aktienkursaktualisierungen von einem Aktienkursanbieter erhalten muss. Der Aktienkursanbieter ist ein Hintergrunddienst, der Aktienkursupdates in einem Übertragungskanal veröffentlicht. Grains abonnieren implizit den Übertragungskanal und erhalten aktualisierte Aktienkurse. Das Szenario wird in der folgenden Abbildung veranschaulicht:
Im obigen Diagramm ist Folgendes zu sehen:
- Das Silo veröffentlicht Aktienkursupdates im Übertragungskanal.
- Der Grain abonniert den Übertragungskanal und erhält Aktienkursupdates.
- Der Client nutzt die Aktienkursupdates aus dem Börsengrain.
Der Übertragungskanal entkoppelt Producer und Consumer der Aktienkursaktualisierungen. Der Producer veröffentlicht Aktienkursupdates im Übertragungskanal, und der Consumer abonniert den Übertragungskanal und erhält Aktienkursaktualisierungen.
Definieren eines Consumergrains
Um Übertragungskanäle nutzen zu können, muss Ihr Grain die IOnBroadcastChannelSubscribed-Schnittstelle implementieren. Ihre Implementierung verwendet die IBroadcastChannelSubscription.Attach-Methode zum Anfügen an den Übertragungskanal. Die Attach
-Methode nimmt einen generischen Parameter für den Nachrichtentyp an, den Sie empfangen möchten. Das folgende Beispiel zeigt einen Grain, der einen Übertragungskanal vom Typ Stock
abonniert:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
Im obigen Code:
- Der Grain
LiveStockGrain
implementiert dieIOnBroadcastChannelSubscribed
-Schnittstelle. - Die
OnSubscribed
-Methode wird aufgerufen, wenn der Grain den Übertragungskanal abonniert. - Der
subscription
-Parameter wird verwendet, um dieAttach
-Methode aufzurufen, die an den Übertragungskanal angefügt werden soll.- Die
OnStockUpdated
-Methode wird als Rückruf übergebenAttach
, der ausgelöst wird, wenn die NachrichtStock
empfangen wird. - Die
OnError
-Methode wird als Rückruf anAttach
übergeben, der ausgelöst wird, wenn ein Fehler auftritt.
- Die
Dieser Beispielgrain enthält die aktuellen Aktienkurse, die im Übertragungskanal veröffentlicht wurden. Jeder Client, der diesen Grain nach dem neuesten Aktienkurs abfragt, erhält den neuesten Preis vom Übertragungskanal.
Veröffentlichen von Nachrichten in einem Übertragungskanal
Um Nachrichten im Übertragungskanal zu veröffentlichen, müssen Sie einen Verweis auf den Übertragungskanal abrufen. Dazu müssen Sie IBroadcastChannelProvider von IClusterClient abrufen. Beim Anbieter können Sie die IBroadcastChannelProvider.GetChannelWriter-Methode aufrufen, um eine Instanz von IBroadcastChannelWriter<T> abzurufen. Der Writer wird verwendet, um Nachrichten im Übertragungskanal zu veröffentlichen. Das folgende Beispiel zeigt, wie Nachrichten im Übertragungskanal veröffentlicht werden:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
Im obigen Code:
- Die
StockWorker
-Klasse ist ein Hintergrunddienst, der Nachrichten im Übertragungskanal veröffentlicht. - Der Konstruktor akzeptiert
IStockClient
und IClusterClient als Parameter. - In der Clusterclientinstanz wird die GetBroadcastChannelProvider-Methode verwendet, um den Übertragungskanalanbieter abzurufen.
- Mithilfe von
IStockClient
erhält dieStockWorker
-Klasse den aktuellen Aktienkurs für ein Aktiensymbol. - Alle 15 Sekunden veröffentlicht die
StockWorker
-Klasse eineStock
-Nachricht im Übertragungskanal.
Die Veröffentlichung von Nachrichten in einem Übertragungskanal ist vom Consumergrain entkoppelt. Der Consumergrain abonniert den Übertragungskanal und empfängt Nachrichten vom Übertragungskanal. Der Producer befindet sich in einem Silo, ist für die Veröffentlichung von Nachrichten im Übertragungskanal verantwortlich und weiß nichts über die Verbrauchergrains.