Orleans rozhraní API streamování
Aplikace komunikují se streamy prostřednictvím rozhraní API, která jsou velmi podobná dobře známým reaktivním rozšířením (Rx) v .NET. Hlavním rozdílem je, že Orleans rozšíření datových proudů jsou asynchronní, aby bylo zpracování efektivnější v Orleansdistribuovaných a škálovatelných výpočetních prostředcích infrastruktury.
Asynchronní stream
Aplikace začíná pomocí zprostředkovatele streamu k získání popisovače datového proudu. Tady si můžete přečíst další informace o poskytovatelích datových proudů, ale prozatím si ho můžete představit jako objekt pro vytváření datových proudů, který implementátorům umožňuje přizpůsobit chování datových proudů a sémantiku:
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
Aplikace může získat odkaz na zprostředkovatele datového proudu buď voláním Grain.GetStreamProvider metody, když uvnitř agregační, nebo voláním GrainClient.GetStreamProvider metody v klientovi.
Orleans.Streams.IAsyncStream<T> je logický, silně typovaný popisovač virtuálního datového proudu. Je to podobné v duchu, jako Orleans je Grain Reference. Volání a GetStreamProvider
GetStream
jsou čistě místní. Argumenty, které GetStream
mají být IDENTIFIKÁTOR GUID, a další řetězec, který voláme obor názvů streamu (což může být null). Identifikátor GUID a řetězec oboru názvů tvoří identitu datového proudu (podobně jako argumenty).IGrainFactory.GetGrain Kombinace identifikátoru GUID a řetězce oboru názvů poskytuje větší flexibilitu při určování identit datových proudů. Stejně jako zrno 7 může existovat v rámci typu PlayerGrain
Zrno a v rámci typu ChatRoomGrain
zrnka může existovat jiná agregační 7 , stream 123 může existovat s oborem názvů PlayerEventsStream
streamu a v oboru názvů ChatRoomMessagesStream
streamu může existovat jiný datový proud 123 .
Výroba a spotřeba
IAsyncStream<T> implementuje rozhraní IAsyncObserver<T> i IAsyncObservable<T> rozhraní. Aplikace tak může stream použít buď k vytvoření nových událostí do datového proudu pomocí Orleans.Streams.IAsyncObserver<T>
, nebo k přihlášení k odběru a využívání událostí z datového proudu pomocí Orleans.Streams.IAsyncObservable<T>
.
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
K vytváření událostí do datového proudu stačí, když aplikace zavolá
await stream.OnNextAsync<T>(event)
Přihlášení k odběru datového proudu, volání aplikace
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
SubscribeAsync Argumentem může být buď objekt, který implementuje IAsyncObserver<T> rozhraní, nebo kombinaci funkcí lambda pro zpracování příchozích událostí. Další možnosti SubscribeAsync
jsou k dispozici prostřednictvím AsyncObservableExtensions třídy. SubscribeAsync
StreamSubscriptionHandle<T>vrátí , což je neprůhláhlý popisovač, který lze použít k odhlášení z datového proudu (podobně jako asynchronní verze IDisposable).
await subscriptionHandle.UnsubscribeAsync()
Je důležité si uvědomit, že předplatné je pro podrobné informace, ne pro aktivaci. Jakmile se kód zrnitosti přihlásí k odběru streamu, toto předplatné překročí životnost této aktivace a zůstane trvalé navždy, dokud se kód zrnitosti (potenciálně v jiné aktivaci) explicitně neodhlásí. Toto je jádro abstrakce virtuálního datového proudu: nejen, že všechny streamy vždy existují, logicky, ale také odběr streamu je trvalý a žije mimo určitou fyzickou aktivaci, která vytvořila předplatné.
Násobnost
Stream Orleans může mít více výrobců a více spotřebitelů. Zpráva publikovaná producentem se doručí všem uživatelům, kteří byli přihlášeni k odběru datového proudu před publikováním zprávy.
Spotřebitel se navíc může přihlásit k odběru stejného datového proudu vícekrát. Pokaždé, když se přihlásí k odběru, dostane zpět jedinečný StreamSubscriptionHandle<T>. Pokud se k odběru stejného datového proudu přihlásí interval (nebo klient), obdrží stejné časy události X, jednou pro každé předplatné. Příjemce se také může odhlásit od odběru individuálního předplatného. Může najít všechna aktuální předplatná voláním:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Zotavení ze selhání
Pokud producent streamu zemře (nebo jeho zrno je deaktivováno), není potřeba nic dělat. Až toto agregační interval příště chce vytvořit více událostí, může znovu získat popisovač datového proudu a stejným způsobem vytvářet nové události.
Spotřebitelská logika je trochu více zapojená. Jak jsme uvedli dříve, jakmile se odběr streamu přihlásí k odběru agregační agregační interval, bude toto předplatné platné, dokud se toto agregační intervaly explicitně neodhlásí. Pokud příjemce datového proudu zemře (nebo se deaktivuje jeho agregační interval) a vygeneruje se v datovém proudu nová událost, spotřebitelská agregace se automaticky znovu aktivuje (stejně jako jakákoli běžná Orleans agregace se automaticky aktivuje při odeslání zprávy). Jedinou věcí, kterou teď potřebuje kód odstupňovaného zpracování, je poskytnout IAsyncObserver<T> data ke zpracování. Příjemce musí v rámci metody znovu připojit logiku OnActivateAsync() zpracování. Chcete-li to provést, může volat:
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
Příjemce použije předchozí popisovač, který získal při prvním přihlášení k odběru "obnovení zpracování". Všimněte si, že ResumeAsync pouze aktualizuje stávající předplatné pomocí nové instance IAsyncObserver
logiky a nemění skutečnost, že tento příjemce je již přihlášen k odběru tohoto streamu.
Jak spotřebitel získá starou subscriptionHandle
? K dispozici jsou 2 možnosti. Příjemce mohl zachovat popisovač, který byl předán z původní SubscribeAsync
operace, a může ho nyní použít. Pokud příjemce popisovač nemá, může IAsyncStream<T>
požádat o všechny aktivní popisovače předplatného voláním:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Spotřebitel teď může obnovit všechny uživatele nebo odhlásit odběr některých, pokud si to přeje.
Tip
Pokud konzumentská zrnitost implementuje IAsyncObserver<T> rozhraní přímo (public class MyGrain<T> : Grain, IAsyncObserver<T>
), měla by být teoreticky vyžadována k opětovnému připojení IAsyncObserver
, a proto nebude nutné volat ResumeAsync
. Modul runtime streamování by měl být schopen automaticky zjistit, že se už implementuje IAsyncObserver
agregační interval a že tyto IAsyncObserver
metody pouze vyvolá. Modul runtime streamování ale v současné době tuto funkci nepodporuje a kód odstupňovaného intervalu musí explicitně volat ResumeAsync
, i když se agregační modul implementuje IAsyncObserver
přímo.
Explicitní a implicitní předplatná
Ve výchozím nastavení musí uživatel streamu explicitně přihlásit k odběru datového proudu. Toto předplatné se obvykle aktivuje některými externími zprávami, že zrno (nebo klient) obdrží pokyn, aby se přihlásil k odběru. Například v chatovací službě, když se uživatel připojí k chatovací místnosti, obdrží JoinChatGroup
zprávu s názvem chatu, což způsobí, že se uživatel přihlásí k odběru tohoto chatového streamu.
Kromě toho Orleans streamy podporují také implicitní předplatná. V tomto modelu se agregační interval explicitně nepřihlásí k odběru datového proudu. Toto agregační interval se odebírá automaticky, implicitně, pouze na základě jeho odstupňované identity a .ImplicitStreamSubscriptionAttribute Hlavní hodnota implicitních předplatných umožňuje aktivitě streamu aktivovat aktivaci agregace (tím se aktivuje předplatné) automaticky. Pokud by například jedno zrno chtělo vytvořit datový proud a další proces agregace, musí producent znát identitu konzumentského zrnka a volat ho, aby se přihlásil k odběru datového proudu. Teprve potom může začít odesílat události. Místo toho může producent začít vytvářet události do datového proudu pomocí implicitních předplatných a agregační interval příjemce se automaticky aktivuje a přihlásí se k odběru streamu. V takovém případě se producent nezajímá vůbec o to, kdo čte události.
Implementace zrnitosti MyGrainType
může deklarovat atribut [ImplicitStreamSubscription("MyStreamNamespace")]
. To říká modulu runtime streamování, že když je událost vygenerována ve streamu, jehož identita je GUID XXX a "MyStreamNamespace"
obor názvů, měla by být doručena do podrobného intervalu, jehož identita je XXX typu MyGrainType
. To znamená, že modul runtime mapuje datový proud <XXX, MyStreamNamespace>
na agregační interval <XXX, MyGrainType>
příjemce .
Přítomnost ImplicitStreamSubscription
způsobí, že se modul runtime streamování automaticky přihlásí k odběru tohoto agregace streamu a doručí do něj události streamu. Kód odstupňovaného intervalu ale stále potřebuje informovat modul runtime, jak chce zpracovat události. V podstatě musí připojit IAsyncObserver
. Proto při aktivaci agregačního intervalu musí kód zrnitosti OnActivateAsync
volat:
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
streamProvider.GetStream<T>(streamId);
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
Psaní logiky předplatného
Níže jsou uvedené pokyny k zápisu logiky předplatného pro různé případy: explicitní a implicitní předplatná, převinutelná zpětná a nepřevinutelná streamy. Hlavním rozdílem mezi explicitními a implicitními předplatnými je, že pro implicitní agregační interval má vždy jedno implicitní předplatné pro každý obor názvů streamu; neexistuje způsob, jak vytvořit více předplatných (neexistuje násobnost předplatného), neexistuje způsob, jak se odhlásit a logika zrnitosti musí vždy připojit pouze logiku zpracování. To také znamená, že pro implicitní předplatná není nikdy potřeba obnovit předplatné. Na druhou stranu u explicitních předplatných je potřeba obnovit předplatné, jinak pokud se předplatné znovu přihlásí k odběru, dojde k vícenásobnému přihlášení k odběru.
Implicitní předplatná:
U implicitních předplatných se stále musí přihlásit k odběru, aby se připojila logika zpracování. To je možné provést v agregačním intervalu příjemce implementací IStreamSubscriptionObserver
rozhraní a IAsyncObserver<T>
povolením aktivace agregace odděleně od přihlášení k odběru. Pokud se chcete přihlásit k odběru datového proudu, vytvoří agregační interval popisovač a volání await handle.ResumeAsync(this)
ve své OnSubscribed(...)
metodě.
Ke zpracování zpráv IAsyncObserver<T>.OnNextAsync(...)
se metoda implementuje pro příjem dat datového proudu a token sekvence. Alternativně může ResumeAsync
metoda přijmout sadu delegátů představující metody IAsyncObserver<T>
rozhraní , onNextAsync
, onErrorAsync
a onCompletedAsync
.
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
Explicitní předplatná:
Pro explicitní předplatná musí volat SubscribeAsync
odběr streamu. Tím se vytvoří předplatné a také připojí logiku zpracování. Explicitní předplatné bude existovat, dokud se neodhlásí odhlášený odběr, takže pokud se deaktivuje a znovu aktivuje agregační interval, bude se stále explicitně odebírat, ale nepřipojuje se žádná logika zpracování. V tomto případě je potřeba znovu připojit logiku zpracování. Chcete-li to udělat, v jeho OnActivateAsync
, agregační nejprve musí zjistit, jaká předplatná má, voláním IAsyncStream<T>.GetAllSubscriptionHandles(). Agregační interval se musí spouštět ResumeAsync
na každém popisovači, se kterým chce pokračovat ve zpracování, nebo zrušit odběr odběru u všech popisovačů, se kterými se pracuje. Agregační interval může také volitelně zadat StreamSequenceToken
jako argument pro ResumeAsync
volání, což způsobí, že toto explicitní předplatné začne využívat z tohoto tokenu.
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
Pořadí a sekvence tokenů streamu
Pořadí doručování událostí mezi individuálním producentem a individuálním příjemcem závisí na poskytovateli streamu.
S SMS producent explicitně řídí pořadí událostí, které uživatel vidí, tím, že řídí způsob, jakým je producent publikuje. Ve výchozím nastavení (pokud SimpleMessageStreamProviderOptions.FireAndForgetDelivery je možnost poskytovatele serveru SMS nastavena na false) a pokud producent čeká na každé OnNextAsync
volání, události přicházejí v objednávce FIFO. V SMS je na producentovi rozhodnout, jak zpracovat selhání doručení, které bude označeno přerušeným Task
vráceným voláním OnNextAsync
.
Streamy front Azure nezaručují objednávku FIFO, protože podkladové fronty Azure nezaručují pořadí v případech selhání. (Zaručují pořadí FIFO při provádění bez selhání.) Když producent vytvoří událost do fronty Azure, pokud operace fronty selže, je na producentovi, aby se pokusil o jinou frontu a později při řešení potenciálních duplicitních zpráv. Na straně Orleans doručení modul runtime streamování vyřadí událost z fronty a pokusí se ji doručit pro zpracování příjemcům. Modul Orleans runtime streamování odstraní událost z fronty pouze po úspěšném zpracování. Pokud se doručení nebo zpracování nezdaří, událost se z fronty neodstraní a později se automaticky znovu zobrazí ve frontě. Modul runtime streamování se ho pokusí znovu doručit, čímž se potenciálně přeruší objednávka FIFO. Výše uvedené chování odpovídá normální sémantice front Azure.
Definované pořadí aplikace: Pokud chcete řešit výše uvedené problémy s řazením, může aplikace volitelně zadat řazení. Toho lze dosáhnout prostřednictvím objektu StreamSequenceToken, který je neprůhlený IComparable objekt, který lze použít k řazení událostí. Producent může volání předat volitelnému StreamSequenceToken
OnNext
. Předá StreamSequenceToken
se příjemci a doručí se společně s událostí. Aplikace tak může zdůvodnit a rekonstruovat své pořadí nezávisle na modulu runtime streamování.
Převinutelné streamy
Některé streamy umožňují aplikaci přihlásit se k odběru pouze od nejnovějšího bodu v čase, zatímco jiné streamy umožňují "vrátit se v čase". Druhá funkce závisí na základní technologii řízení front a na konkrétním poskytovateli datových proudů. Například fronty Azure umožňují využívání nejnovějších událostí ve frontě, zatímco EventHub umožňuje přehrání událostí z libovolného bodu v čase (až do určité doby vypršení platnosti). Datové proudy, které podporují návrat zpět v čase, se označují jako převinutelné datové proudy.
Příjemce převinutelného datového proudu může předat StreamSequenceToken
volání SubscribeAsync
. Modul runtime doručí události počínaje tímto StreamSequenceToken
modulem . Token null znamená, že příjemce chce přijímat události od nejnovější verze.
Schopnost převinout datový proud zpět je velmi užitečná ve scénářích obnovení. Představte si například agregační interval, který se přihlásí k odběru datového proudu a pravidelně kontroluje jeho stav společně s nejnovějším tokenem sekvence. Při zotavení z selhání se může agregační interval znovu přihlásit ke stejnému streamu z nejnovějšího tokenu sekvence kontrolních bodů, čímž se obnoví, aniž by došlo ke ztrátě událostí vygenerovaných od posledního kontrolního bodu.
Zprostředkovatel služby Event Hubs je možné převinout zpět. Jeho kód najdete na GitHubu: Orleans/Azure/Orleans. Streaming.EventHubs. Poskytovatelé sms a front Azure se nedají převinout zpět.
Bezstavové automaticky škálované zpracování horizontálního navýšení kapacity
Streamování ve výchozím nastavení Orleans podporuje velký počet relativně malých datových proudů, z nichž každý zpracovává jedno nebo více stavových zrn. Souhrnně se zpracování všech proudů společně dělí mezi velký počet běžných (stavových) zrn. Kód aplikace řídí toto horizontální dělení přiřazením ID datových proudů a ID agregace a explicitním přihlášením k odběru. Cílem je horizontálně dělené stavové zpracování.
Existuje ale také zajímavý scénář automatického bezstavového zpracování s horizontálním navýšením kapacity. V tomto scénáři má aplikace malý počet datových proudů (nebo dokonce jeden velký datový proud) a cílem je bezstavové zpracování. Příkladem je globální datový proud událostí, kdy zpracování zahrnuje dekódování jednotlivých událostí a potenciálně ho přesměrovávat do jiných datových proudů pro další stavové zpracování. Bezstavové zpracování streamů s horizontálním navýšením kapacity je možné podporovat prostřednictvím Orleans StatelessWorkerAttribute zrn.
Aktuální stav bezstavového automatického zpracování horizontálního navýšení kapacity: Zatím není implementováno. Pokus o přihlášení k odběru datového proudu z agregačního intervalu StatelessWorker
způsobí nedefinované chování. Zvažujeme podporu této možnosti.
Zrna a Orleans klienti
Orleans datové proudy fungují jednotně napříč zrny a Orleans klienty. To znamená, že stejná rozhraní API se dají použít uvnitř podrobného a klientského Orleans rozhraní k vytváření a využívání událostí. To výrazně zjednodušuje logiku aplikace, takže speciální rozhraní API na straně klienta, jako jsou pozorovatelé grainů, redundantní.
Plně spravované a spolehlivé streamování pub-sub
Ke sledování odběrů streamů Orleans používá komponentu modulu runtime s názvem Streaming Pub-Sub , která slouží jako bod rendezvous pro uživatele streamů a producenty streamů. Pub-sub sleduje všechna předplatná streamů a udržuje je a odpovídá příjemcům streamů s producenty streamů.
Aplikace můžou zvolit, kde a jak se ukládají data Pub-Sub. Samotná komponenta Pub-Sub je implementována jako zrnka (označovaná PubSubRendezvousGrain
), která používá Orleans deklarativní trvalost. PubSubRendezvousGrain
používá zprostředkovatele úložiště s názvem PubSubStore
. Stejně jako u jakéhokoli agregačního intervalu můžete určit implementaci pro poskytovatele úložiště. Pro Streaming Pub-Sub můžete změnit implementaci PubSubStore
v silové konstrukci pomocí tvůrce hostitelů sila:
Následující konfigurace pub-Sub ukládá svůj stav v tabulkách Azure.
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
Tímto způsobem budou data pub-Sub trvale uložena v tabulce Azure. Pro počáteční vývoj můžete také použít úložiště paměti. Kromě pub-Sub Orleans poskytuje modul runtime streamování události od producentů uživatelů, spravuje všechny prostředky modulu runtime přidělené aktivně používaným datovým proudům a transparentně shromažďuje prostředky modulu runtime z nepoužívaných datových proudů.
Konfigurace
Pokud chcete používat streamy, musíte povolit poskytovatele datových proudů prostřednictvím tvůrce klienta silo nebo clusteru. Tady si můžete přečíst další informace o poskytovateli streamů. Ukázkové nastavení zprostředkovatele streamu:
hostBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConfigureTableServiceClient("<Secret>")))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");