Broadcast channels in Orleans
Broadcast channels are a special type of broadcasting mechanism that can be used to send messages to all subscribers. Unlike streaming providers, broadcast channels are not persistent and don't store messages, and they're not a replacement for persistent streams. With broadcast channels, grains are implicitly subscribed to the broadcast channel and receive broadcast messages from a producer. This decouples the sender and receiver of the message, which is useful for scenarios where the sender and receiver are not known in advance.
To use broadcast channel you should configure Orleans Streams and then enable broadcast on your channel using the AddBroadcastChannel
during silo configuration.
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
Example scenario
Consider a scenario where you have a grain that needs to receive stock price updates from a stock price provider. The stock price provider is a background service that publishes stock price updates to a broadcast channel. Grains are implicitly subscribed to the broadcast channel and receive updated stock prices. The following diagram shows the scenario:
In the preceding diagram:
- The silo publishes stock price updates to the broadcast channel.
- The grain subscribes to the broadcast channel and receives stock price updates.
- The client consumes the stock price updates from the stock grain.
The broadcast channel decouples the producer and consumer of the stock price updates. The producer publishes stock price updates to the broadcast channel, and the consumer subscribes to the broadcast channel and receives stock price updates.
Define a consumer grain
To consume broadcast channel messages, your grain needs to implement the IOnBroadcastChannelSubscribed interface. Your implementation will use the IBroadcastChannelSubscription.Attach method to attach to the broadcast channel. The Attach
method takes a generic-type parameter for the message type you're going to receive. The following example shows a grain that subscribes to a broadcast channel of type Stock
:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
In the preceding code:
- The
LiveStockGrain
grain implements theIOnBroadcastChannelSubscribed
interface. - The
OnSubscribed
method is called when the grain subscribes to the broadcast channel. - The
subscription
parameter is used to call theAttach
method to attach to the broadcast channel.- The
OnStockUpdated
method is passed toAttach
as a callback that fires when theStock
message is received. - The
OnError
method is passed toAttach
as a callback that fires when an error occurs.
- The
This example grain will contain the latest stock prices as published on the broadcast channel. Any client that asks this grain for the latest stock price will get the latest price from the broadcast channel.
Publish messages to a broadcast channel
To publish messages to the broadcast channel, you need to get a reference to the broadcast channel. To do this, you need to get the IBroadcastChannelProvider from the IClusterClient. With the provider, you can call the IBroadcastChannelProvider.GetChannelWriter method to get an instance of IBroadcastChannelWriter<T>. The writer is used to publish messages to the broadcast channel. The following example shows how to publish messages to the broadcast channel:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
In the preceding code:
- The
StockWorker
class is a background service that publishes messages to the broadcast channel. - The constructor takes an
IStockClient
and IClusterClient as parameters. - From the cluster client instance, the GetBroadcastChannelProvider method is used to get the broadcast channel provider.
- Using the
IStockClient
, theStockWorker
class gets the latest stock price for a stock symbol. - Every 15 seconds, the
StockWorker
class publishes aStock
message to the broadcast channel.
The publishing of messages to a broadcast channel is decoupled from the consumer grain. The consumer grain subscribes to the broadcast channel and receives messages from the broadcast channel. The producer lives in a silo and is responsible for publishing messages to the broadcast channel and doesn't know anything about consuming grains.