Orleans rychlý start streamování
Tato příručka vám ukáže rychlý způsob, jak nastavit a používat Orleans Toky. Další informace o funkcích streamování najdete v dalších částech této dokumentace.
Požadované konfigurace
V této příručce použijete datový proud založený na paměti, který používá zasílání odstupňovaných zpráv k odesílání dat streamu odběratelům. K ukládání seznamů předplatných použijete poskytovatele úložiště v paměti. Použití mechanismů založených na paměti pro streamování a úložiště je určené pouze pro místní vývoj a testování a není určeno pro produkční prostředí.
Na silu, kde silo
je ISiloBuildervolání :AddMemoryStreams
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
V klientovi clusteru, kde client
je IClientBuildervolání AddMemoryStreams, .
client.AddMemoryStreams("StreamProvider");
V této příručce použijeme jednoduchý stream založený na zprávách, který k odesílání streamových dat odběratelům používá zasílání odstupňovaných zpráv. Poskytovatele úložiště v paměti použijeme k ukládání seznamů předplatných, takže pro skutečné produkční aplikace to není moudrá volba.
Na silu, kde hostBuilder
je ISiloHostBuilder
volání :AddSimpleMessageStreamProvider
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
V klientovi clusteru, kde clientBuilder
je IClientBuilder
volání AddSimpleMessageStreamProvider, .
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
Poznámka:
Ve výchozím nastavení jsou zprávy předávané přes Simple Message Stream považovány za neměnné a mohou být předány odkazem na jiná zrnka. Chcete-li toto chování vypnout, je nutné, aby poskytovatel serveru SMS vypnul. SimpleMessageStreamProviderOptions.OptimizeForImmutableData
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
Můžete vytvářet datové proudy, odesílat je jako producenti a přijímat také data jako odběratelé.
Vytváření událostí
Vytváření událostí pro datové proudy je poměrně snadné. Nejprve byste měli získat přístup k poskytovateli streamu, který jste definovali v konfiguraci dříve ("StreamProvider"
) a pak zvolit datový proud a odeslat do něj data.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
Vytváření událostí pro datové proudy je poměrně snadné. Nejprve byste měli získat přístup k poskytovateli streamu, který jste definovali v konfiguraci dříve ("SMSProvider"
) a pak zvolit datový proud a odeslat do něj data.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
Jak vidíte, náš stream má identifikátor GUID a obor názvů. To usnadňuje identifikaci jedinečných datových proudů. Obor názvů pro chatovací místnost může být například "Místnosti" a identifikátor GUID může být guid vlastníka RoomGrain.
Tady používáme identifikátor GUID některé známé chatovací místnosti. OnNextAsync
Pomocí metody datového proudu můžeme do něj odesílat data. Pojďme to udělat uvnitř časovače pomocí náhodných čísel. Pro datový proud můžete použít i jakýkoli jiný datový typ.
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
Přihlášení k odběru a přijímání streamovaných dat
Pro příjem dat můžete použít implicitní a explicitní předplatná, která jsou podrobněji popsána v explicitních a implicitních předplatných. V tomto příkladu se používají implicitní předplatná, která jsou jednodušší. Pokud se typ zrnitosti chce implicitně přihlásit k odběru datového proudu, používá atribut [ImplicitStreamSubscription(namespace)].
Pro váš případ definujte následující:ReceiverGrain
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
Vždy, když se data nasdílí do datových proudů oboru názvů RANDOMDATA
, jak jsme měli v časovači, obdrží zpráva odstupňovaný typ ReceiverGrain
se stejným Guid
datovým proudem. I když v současné době neexistují žádné aktivace agregace, modul runtime automaticky vytvoří novou a odešle do ní zprávu.
Aby to fungovalo, musíme dokončit proces odběru nastavením metody OnNextAsync
pro příjem dat. Abychom to udělali, ReceiverGrain
měli bychom volat něco takového v jeho OnActivateAsync
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
Všechno je připravené. Jediným požadavkem je, že něco aktivuje vytvoření producenta a pak zaregistruje časovač a začne odesílat náhodné inty všem zúčastněným stranám.
Tento průvodce opět přeskočí spoustu podrobností a je vhodný jenom pro zobrazení velkého obrázku. Přečtěte si další části této příručky a dalších zdrojů informací o RX, abyste získali dobrou představu o tom, co je k dispozici a jak.
Reaktivní programování může být velmi výkonný přístup k řešení mnoha problémů. Můžete například použít LINQ v odběrateli k filtrování čísel a dělat všechny druhy zajímavých věcí.