Orleans Przewodnik Szybki start dotyczący przesyłania strumieniowego
W tym przewodniku przedstawiono szybki sposób konfigurowania i używania Orleans Strumienie. Aby dowiedzieć się więcej na temat szczegółów funkcji przesyłania strumieniowego, przeczytaj inne części tej dokumentacji.
Wymagane konfiguracje
W tym przewodniku użyjesz strumienia opartego na pamięci, który używa komunikatów ziarna do wysyłania danych strumienia do subskrybentów. Dostawca magazynu w pamięci będzie używany do przechowywania list subskrypcji. Używanie mechanizmów opartych na pamięci na potrzeby przesyłania strumieniowego i magazynu jest przeznaczone tylko do lokalnego programowania i testowania i nie jest przeznaczone dla środowisk produkcyjnych.
W silosie, gdzie silo
znajduje się element , wywołaj metodę ISiloBuilderAddMemoryStreams:
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
Na kliencie klastra, gdzie client
jest elementem , wywołaj metodę IClientBuilderAddMemoryStreams.
client.AddMemoryStreams("StreamProvider");
W tym przewodniku użyjemy prostego strumienia opartego na komunikatach, który używa komunikatów ziarna do wysyłania danych strumienia do subskrybentów. Użyjemy dostawcy magazynu w pamięci do przechowywania list subskrypcji, więc nie jest to rozsądny wybór dla rzeczywistych aplikacji produkcyjnych.
W silosie, gdzie hostBuilder
znajduje się element , wywołaj metodę ISiloHostBuilder
AddSimpleMessageStreamProvider:
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
Na kliencie klastra, gdzie clientBuilder
jest elementem , wywołaj metodę IClientBuilder
AddSimpleMessageStreamProvider.
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
Uwaga
Domyślnie komunikaty przekazywane przez strumień komunikatów prostych są uznawane za niezmienne i mogą być przekazywane przez odwołanie do innych ziarna. Aby wyłączyć to zachowanie, należy skonfigurować dostawcę programu SMS, aby wyłączyć SimpleMessageStreamProviderOptions.OptimizeForImmutableData
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
Możesz tworzyć strumienie, wysyłać dane przy użyciu ich jako producentów, a także odbierać dane jako subskrybenci.
Tworzenie zdarzeń
Stosunkowo łatwo jest tworzyć zdarzenia dla strumieni. Najpierw należy uzyskać dostęp do dostawcy strumienia zdefiniowanego wcześniej w konfiguracji ("StreamProvider"
), a następnie wybrać strumień i wypchnąć do niego dane.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
Stosunkowo łatwo jest tworzyć zdarzenia dla strumieni. Najpierw należy uzyskać dostęp do dostawcy strumienia zdefiniowanego wcześniej w konfiguracji ("SMSProvider"
), a następnie wybrać strumień i wypchnąć do niego dane.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
Jak widać, nasz strumień ma identyfikator GUID i przestrzeń nazw. Ułatwi to identyfikację unikatowych strumieni. Na przykład przestrzeń nazw pokoju rozmów może mieć wartość "Pokoje", a identyfikator GUID może być identyfikatorem GUID obiektu RoomGrain.
W tym miejscu używamy identyfikatora GUID znanego pokoju rozmów. OnNextAsync
Za pomocą metody strumienia możemy wypchnąć do niego dane. Zróbmy to wewnątrz czasomierza, używając liczb losowych. Możesz również użyć dowolnego innego typu danych dla strumienia.
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
Subskrybowanie i odbieranie danych przesyłanych strumieniowo
W przypadku odbierania danych można użyć niejawnych i jawnych subskrypcji, które zostały szczegółowo opisane w artykule Jawne i niejawne subskrypcje. W tym przykładzie użyto niejawnych subskrypcji, które są łatwiejsze. Gdy typ ziarna chce niejawnie subskrybować strumień, używa atrybutu [ImplicitStreamSubscription(namespace)].
W twoim przypadku zdefiniuj następujący element ReceiverGrain
:
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
Za każdym razem, gdy dane są wypychane do strumieni przestrzeni nazw RANDOMDATA
, jak w czasomierzu, ziarno typu ReceiverGrain
o tym samym Guid
strumieniu otrzyma komunikat. Nawet jeśli obecnie nie istnieją żadne aktywacje ziarna, środowisko uruchomieniowe automatycznie utworzy nowe i wyśle do niego komunikat.
Aby to zadziałało, musimy ukończyć proces subskrypcji, ustawiając naszą OnNextAsync
metodę odbierania danych. Aby to zrobić, powinniśmy ReceiverGrain
wywołać coś takiego w jego OnActivateAsync
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
Wszystko gotowe! Teraz jedynym wymaganiem jest to, że coś wyzwala tworzenie ziarna producenta, a następnie zarejestruje czasomierz i zacznie wysyłać losowe ints do wszystkich zainteresowanych stron.
Ponownie ten przewodnik pomija wiele szczegółów i jest dobry tylko do pokazywania dużego obrazu. Przeczytaj inne części tego podręcznika i innych zasobów na platformie RX, aby uzyskać dobre zrozumienie, co jest dostępne i jak.
Programowanie reaktywne może być bardzo zaawansowanym podejściem do rozwiązywania wielu problemów. Możesz na przykład użyć LINQ w subskrybentu do filtrowania liczb i wykonywać różnego rodzaju interesujące rzeczy.