Orleans 스트리밍 빠른 시작
이 가이드에서는 Orleans Streams를 빠르게 설정하고 사용하는 방법을 보여 줍니다. 스트리밍 기능의 세부 정보에 대해 자세히 알아보려면 이 설명서의 다른 부분을 읽어보세요.
필수 구성
이 가이드에서는 조직 메시징을 사용하여 구독자에게 스트림 데이터를 보내는 메모리 기반 스트림을 사용합니다. 인 메모리 스토리지 공급자를 사용하여 구독 목록을 저장할 것입니다. 스트리밍 및 스토리지에 메모리 기반 메커니즘을 사용하는 것은 로컬 개발 및 테스트용이며 프로덕션 환경에는 적합하지 않습니다.
silo
가 ISiloBuilder인 사일로에서 AddMemoryStreams를 호출합니다.
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
client
가 IClientBuilder인 클러스터 클라이언트에서 AddMemoryStreams를 호출합니다.
client.AddMemoryStreams("StreamProvider");
이 가이드에서는 조직 메시징을 사용하여 구독자에게 스트림 데이터를 보내는 간단한 메시지 기반 스트림을 사용합니다. 메모리 내 스토리지 공급자를 사용하여 구독 목록을 저장하므로 실제 프로덕션 애플리케이션에 적합한 것은 아닙니다.
hostBuilder
가 ISiloHostBuilder
인 사일로에서 AddSimpleMessageStreamProvider를 호출합니다.
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
clientBuilder
가 IClientBuilder
인 클러스터 클라이언트에서 AddSimpleMessageStreamProvider를 호출합니다.
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
참고
기본적으로 단순 메시지 스트림을 통해 전달되는 메시지는 변경할 수 없는 것으로 간주되며 다른 조직에 대한 참조로 전달될 수 있습니다. 이 동작을 해제하려면 SMS 공급자를 구성하여 SimpleMessageStreamProviderOptions.OptimizeForImmutableData를 해제해야 합니다.
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
스트림을 만들고, 생산자로 사용하여 데이터를 보내고, 구독자로 데이터를 받을 수 있습니다.
이벤트 생성
스트림에 대한 이벤트를 생성하는 것은 비교적 쉽습니다. 먼저 위의 구성("StreamProvider"
)에서 정의한 스트림 공급자에 액세스한 다음, 스트림을 선택하고 데이터를 푸시해야 합니다.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
스트림에 대한 이벤트를 생성하는 것은 비교적 쉽습니다. 먼저 위의 구성("SMSProvider"
)에서 정의한 스트림 공급자에 액세스한 다음, 스트림을 선택하고 데이터를 푸시해야 합니다.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
여기에서 볼 수 있듯이 스트림에는 GUID와 네임스페이스가 있습니다. 이렇게 하면 고유한 스트림을 쉽게 식별할 수 있습니다. 예를 들어 채팅방의 네임스페이스는 “Rooms”일 수 있으며 GUID는 소유 RoomGrain의 GUID일 수 있습니다.
여기서는 알려진 채팅방의 GUID를 사용합니다. 스트림의 OnNextAsync
메서드를 사용하여 데이터를 푸시할 수 있습니다. 난수를 사용하여 타이머 내부에서 수행해 보겠습니다. 스트림에 다른 데이터 형식도 사용할 수 있습니다.
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
스트리밍 데이터 구독 및 수신
데이터를 수신하기 위해 암시적 및 명시적 구독에 자세히 설명되어 있는 암시적 및 명시적 구독을 사용할 수 있습니다. 이 예제에서는 더 쉬운 암시적 구독을 사용합니다. 조직 형식이 스트림을 암시적으로 구독하려는 경우 [ImplicitStreamSubscription(네임스페이스)] 특성을 사용합니다.
사용자의 경우 ReceiverGrain
을 다음과 같이 정의합니다.
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
데이터가 네임스페이스 RANDOMDATA
의 스트림으로 푸시될 때마다 타이머에 있는 것처럼 스트림의 Guid
가 동일한 ReceiverGrain
형식의 조직이 메시지를 수신합니다. 현재 조직의 활성화가 없더라도 런타임은 자동으로 새 활성화를 만들고 메시지를 보냅니다.
이렇게 하려면 데이터 수신을 위한 OnNextAsync
메서드를 설정하여 구독 프로세스를 완료해야 합니다. 이렇게 하려면 ReceiverGrain
이 OnActivateAsync
에서 다음과 같이 호출해야 합니다.
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
모든 설정이 완료되었습니다. 이제 유일한 요구 사항은 생산자 조직의 생성을 트리거하는 것입니다. 그런 다음, 타이머를 등록하고 모든 이해 관계자에게 임의의 정수를 보내기 시작합니다.
다시 말하지만, 이 가이드는 많은 세부 사항을 생략하였으며 큰 그림을 보여주는 데만 적합합니다. 사용 가능한 기능과 방법을 잘 이해하려면 이 설명서의 다른 부분과 RX의 다른 리소스를 참조하세요.
반응형 프로그래밍은 많은 문제를 해결하는 매우 강력한 접근 방식일 수 있습니다. 예를 들어 구독자에서 LINQ를 사용하여 숫자를 필터링하고 모든 종류의 흥미로운 작업을 수행할 수 있습니다.
참고 항목
.NET