다음을 통해 공유


Orleans의 브로드캐스트 채널

브로드캐스트 채널은 모든 구독자에게 메시지를 보내는 데 사용할 수 있는 특수한 유형의 브로드캐스트 메커니즘입니다. 스트리밍 공급자와 달리 브로드캐스트 채널은 영구적이 아니고 메시지를 저장하지 않으며 영구 스트림을 대체하지 않습니다. 브로드캐스트 채널을 사용하면 조직이 브로드캐스트 채널을 암시적으로 구독하고 생산자로부터 브로드캐스트 메시지를 받습니다. 이렇게 하면 메시지의 보낸 사람 및 받는 사람이 분리됩니다. 이러한 점은 보낸 사람과 받는 사람을 미리 알 수 없는 시나리오에 유용합니다.

브로드캐스트 채널을 사용하려면 Orleans 스트림을 구성한 다음 사일로 구성 중에 AddBroadcastChannel을 사용하여 채널에서 브로드캐스트를 사용하도록 설정해야 합니다.

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

예제 시나리오

주가 공급자로부터 주가 업데이트를 받아야 하는 조직이 있는 시나리오를 고려해 보세요. 주가 공급자는 브로드캐스트 채널에 주가 업데이트를 게시하는 백그라운드 서비스입니다. 조직은 브로드캐스트 채널을 암시적으로 구독하고 업데이트된 주가를 받습니다. 다음 다이어그램에서는 이러한 시나리오를 보여 줍니다.

간단한 브로드캐스트 채널 아키텍처로 사일로, 주식 조직 및 소비 클라이언트를 보여 주는 주가 다이어그램

위의 다이어그램에서:

  • 사일로는 브로드캐스트 채널에 주가 업데이트를 게시합니다.
  • 조직은 브로드캐스트 채널을 구독하고 주가 업데이트를 받습니다.
  • 클라이언트는 주식 조직의 주가 업데이트를 사용합니다.

브로드캐스트 채널은 주가 업데이트의 생산자와 소비자를 분리합니다. 생산자는 브로드캐스트 채널에 주가 업데이트를 게시하고 소비자는 브로드캐스트 채널을 구독한 후 주가 업데이트를 받습니다.

소비자 조직 정의

브로드캐스트 채널 메시지를 사용하려면 IOnBroadcastChannelSubscribed 인터페이스를 구현해야 합니다. 구현에서는 IBroadcastChannelSubscription.Attach 메서드를 사용하여 브로드캐스트 채널에 연결합니다. Attach 메서드는 수신하려는 메시지 형식에 대한 제네릭 형식 매개 변수를 사용합니다. 다음 예제에서는 Stock 형식의 브로드캐스트 채널을 구독하는 조직을 보여 줍니다.

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

위의 코드에서

  • LiveStockGrain 조직은 IOnBroadcastChannelSubscribed 인터페이스를 구현합니다.
  • OnSubscribed 메서드는 조직이 브로드캐스트 채널을 구독할 때 호출됩니다.
  • subscription 매개 변수는 Attach 메서드를 호출하여 브로드캐스트 채널에 연결하는 데 사용됩니다.
    • OnStockUpdated 메서드는 Stock 메시지가 수신될 때 발생하는 콜백으로 Attach에 전달됩니다.
    • OnError 메서드는 오류가 있을 때 발생하는 콜백으로 Attach에 전달됩니다.

이 예제 조직은 브로드캐스트 채널에 게시된 최신 주가를 포함합니다. 최신 주가를 이 조직에 요청하는 모든 클라이언트는 해당 브로드캐스트 채널에서 최신 가격을 얻게 됩니다.

브로드캐스트 채널에 메시지 게시

브로드캐스트 채널에 메시지를 게시하려면 브로드캐스트 채널에 대한 참조를 가져와야 합니다. 이렇게 하려면 IClusterClient에서 IBroadcastChannelProvider를 가져와야 합니다. 공급자를 사용하면 IBroadcastChannelProvider.GetChannelWriter 메서드를 호출하여 IBroadcastChannelWriter<T>의 인스턴스를 가져올 수 있습니다. 작성기는 브로드캐스트 채널에 메시지를 게시하는 데 사용됩니다. 다음 예제에서는 브로드캐스트 채널에 메시지를 게시하는 방법을 보여 줍니다.

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

위의 코드에서

  • StockWorker 클래스는 브로드캐스트 채널에 메시지를 게시하는 백그라운드 서비스입니다.
  • 생성자는 IStockClientIClusterClient를 매개 변수로 사용합니다.
  • 클러스터 클라이언트 인스턴스에서 GetBroadcastChannelProvider 메서드를 사용하여 브로드캐스트 채널 공급자를 가져옵니다.
  • IStockClient를 사용하여 StockWorker 클래스는 주식 종목의 최신 주가를 가져옵니다.
  • 15초마다 StockWorker 클래스는 브로드캐스트 채널에 Stock 메시지를 게시합니다.

브로드캐스트 채널에 메시지를 게시하는 것은 소비자 조직에서 분리됩니다. 소비자 조직에서는 브로드캐스트 채널을 구독하고 브로드캐스트 채널에서 메시지를 받습니다. 생산자는 사일로에 상주하며 브로드캐스트 채널에 메시지를 게시할 책임이 있고, 조직 사용에 대해서는 아무것도 알지 못합니다.

참고 항목