다음을 통해 공유


Orleans 스트리밍 빠른 시작

이 가이드에서는 Orleans Streams를 빠르게 설정하고 사용하는 방법을 보여 줍니다. 스트리밍 기능의 세부 정보에 대해 자세히 알아보려면 이 설명서의 다른 부분을 읽어보세요.

필수 구성

이 가이드에서는 조직 메시징을 사용하여 구독자에게 스트림 데이터를 보내는 메모리 기반 스트림을 사용합니다. 인 메모리 스토리지 공급자를 사용하여 구독 목록을 저장할 것입니다. 스트리밍 및 스토리지에 메모리 기반 메커니즘을 사용하는 것은 로컬 개발 및 테스트용이며 프로덕션 환경에는 적합하지 않습니다.

siloISiloBuilder인 사일로에서 AddMemoryStreams를 호출합니다.

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

clientIClientBuilder인 클러스터 클라이언트에서 AddMemoryStreams를 호출합니다.

client.AddMemoryStreams("StreamProvider");

이 가이드에서는 조직 메시징을 사용하여 구독자에게 스트림 데이터를 보내는 간단한 메시지 기반 스트림을 사용합니다. 메모리 내 스토리지 공급자를 사용하여 구독 목록을 저장하므로 실제 프로덕션 애플리케이션에 적합한 것은 아닙니다.

hostBuilderISiloHostBuilder인 사일로에서 AddSimpleMessageStreamProvider를 호출합니다.

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

clientBuilderIClientBuilder인 클러스터 클라이언트에서 AddSimpleMessageStreamProvider를 호출합니다.

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

참고

기본적으로 단순 메시지 스트림을 통해 전달되는 메시지는 변경할 수 없는 것으로 간주되며 다른 조직에 대한 참조로 전달될 수 있습니다. 이 동작을 해제하려면 SMS 공급자를 구성하여 SimpleMessageStreamProviderOptions.OptimizeForImmutableData를 해제해야 합니다.

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

스트림을 만들고, 생산자로 사용하여 데이터를 보내고, 구독자로 데이터를 받을 수 있습니다.

이벤트 생성

스트림에 대한 이벤트를 생성하는 것은 비교적 쉽습니다. 먼저 위의 구성("StreamProvider")에서 정의한 스트림 공급자에 액세스한 다음, 스트림을 선택하고 데이터를 푸시해야 합니다.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

스트림에 대한 이벤트를 생성하는 것은 비교적 쉽습니다. 먼저 위의 구성("SMSProvider")에서 정의한 스트림 공급자에 액세스한 다음, 스트림을 선택하고 데이터를 푸시해야 합니다.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

여기에서 볼 수 있듯이 스트림에는 GUID와 네임스페이스가 있습니다. 이렇게 하면 고유한 스트림을 쉽게 식별할 수 있습니다. 예를 들어 채팅방의 네임스페이스는 “Rooms”일 수 있으며 GUID는 소유 RoomGrain의 GUID일 수 있습니다.

여기서는 알려진 채팅방의 GUID를 사용합니다. 스트림의 OnNextAsync 메서드를 사용하여 데이터를 푸시할 수 있습니다. 난수를 사용하여 타이머 내부에서 수행해 보겠습니다. 스트림에 다른 데이터 형식도 사용할 수 있습니다.

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

스트리밍 데이터 구독 및 수신

데이터를 수신하기 위해 암시적 및 명시적 구독에 자세히 설명되어 있는 암시적 및 명시적 구독을 사용할 수 있습니다. 이 예제에서는 더 쉬운 암시적 구독을 사용합니다. 조직 형식이 스트림을 암시적으로 구독하려는 경우 [ImplicitStreamSubscription(네임스페이스)] 특성을 사용합니다.

사용자의 경우 ReceiverGrain을 다음과 같이 정의합니다.

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

데이터가 네임스페이스 RANDOMDATA의 스트림으로 푸시될 때마다 타이머에 있는 것처럼 스트림의 Guid가 동일한 ReceiverGrain 형식의 조직이 메시지를 수신합니다. 현재 조직의 활성화가 없더라도 런타임은 자동으로 새 활성화를 만들고 메시지를 보냅니다.

이렇게 하려면 데이터 수신을 위한 OnNextAsync 메서드를 설정하여 구독 프로세스를 완료해야 합니다. 이렇게 하려면 ReceiverGrainOnActivateAsync에서 다음과 같이 호출해야 합니다.

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

모든 설정이 완료되었습니다. 이제 유일한 요구 사항은 생산자 조직의 생성을 트리거하는 것입니다. 그런 다음, 타이머를 등록하고 모든 이해 관계자에게 임의의 정수를 보내기 시작합니다.

다시 말하지만, 이 가이드는 많은 세부 사항을 생략하였으며 큰 그림을 보여주는 데만 적합합니다. 사용 가능한 기능과 방법을 잘 이해하려면 이 설명서의 다른 부분과 RX의 다른 리소스를 참조하세요.

반응형 프로그래밍은 많은 문제를 해결하는 매우 강력한 접근 방식일 수 있습니다. 예를 들어 구독자에서 LINQ를 사용하여 숫자를 필터링하고 모든 종류의 흥미로운 작업을 수행할 수 있습니다.

참고 항목

Orleans 스트림 프로그래밍 API