Orleans API:er för direktuppspelning
Program interagerar med strömmar via API:er som liknar de välkända reaktiva tilläggen (Rx) i .NET. Den största skillnaden är att Orleans dataströmtillägg är asynkrona för att göra bearbetningen mer effektiv i Orleansdistribuerad och skalbar beräkningsinfrastruktur.
Asynkron ström
Ett program börjar med att använda en dataströmsprovider för att få ett handtag till en dataström. Du kan läsa mer om streamprovidrar här, men för tillfället kan du se det som en strömfabrik som gör det möjligt för implementerare att anpassa strömmarnas beteende och semantik:
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
Ett program kan hämta en referens till dataströmprovidern antingen genom att anropa Grain.GetStreamProvider metoden i ett korn, eller genom att anropa GrainClient.GetStreamProvider metoden när den finns på klienten.
Orleans.Streams.IAsyncStream<T> är ett logiskt, starkt skrivet handtag till en virtuell dataström. Det liknar grain-referensen i andan Orleans . Samtal till GetStreamProvider
och GetStream
är helt lokala. Argumenten till GetStream
är ett GUID och en ytterligare sträng som vi kallar ett stream-namnområde (som kan vara null). Tillsammans utgör GUID och namnområdessträngen strömidentiteten (ungefär som argumenten till IGrainFactory.GetGrain). Kombinationen av GUID och namnområdessträng ger extra flexibilitet vid fastställande av strömidentiteter. Precis som korn 7 kan finnas inom korntypen PlayerGrain
och ett annat korn 7 kan finnas inom korntypen ChatRoomGrain
, kan Stream 123 finnas med dataströmnamnområdet PlayerEventsStream
och en annan ström 123 kan finnas inom dataströmmens namnområde ChatRoomMessagesStream
.
Producera och konsumera
IAsyncStream<T> implementerar både gränssnitten IAsyncObserver<T> och IAsyncObservable<T> . På så sätt kan ett program använda strömmen antingen för att skapa nya händelser i strömmen med hjälp Orleans.Streams.IAsyncObserver<T>
av eller för att prenumerera på och använda händelser från en dataström med hjälp Orleans.Streams.IAsyncObservable<T>
av .
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
För att skapa händelser i strömmen anropar ett program bara
await stream.OnNextAsync<T>(event)
Om du vill prenumerera på en dataström anropar ett program
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
Argumentet till SubscribeAsync kan antingen vara ett objekt som implementerar IAsyncObserver<T> gränssnittet eller en kombination av lambda-funktioner för att bearbeta inkommande händelser. Fler alternativ för SubscribeAsync
är tillgängliga via AsyncObservableExtensions klass. SubscribeAsync
returnerar en StreamSubscriptionHandle<T>, som är ett ogenomskinligt handtag som kan användas för att avbryta prenumerationen från strömmen (liknar i andan en asynkron version av IDisposable).
await subscriptionHandle.UnsubscribeAsync()
Det är viktigt att observera att prenumerationen är för ett korn, inte för aktivering. När kornkoden prenumererar på strömmen överskrider den här prenumerationen aktiveringens livslängd och förblir varaktig för alltid tills kornkoden (eventuellt i en annan aktivering) uttryckligen avregistrerar sig. Det här är kärnan i en abstraktion av virtuella strömmar: inte bara finns alla strömmar alltid, logiskt, utan även en stream-prenumeration är hållbar och lever bortom en viss fysisk aktivering som skapade prenumerationen.
Kardinalitet
En Orleans dataström kan ha flera producenter och flera konsumenter. Ett meddelande som publiceras av en producent levereras till alla konsumenter som prenumererade på strömmen innan meddelandet publicerades.
Dessutom kan konsumenten prenumerera på samma ström flera gånger. Varje gång den prenumererar får den tillbaka en unik StreamSubscriptionHandle<T>. Om ett korn (eller en klient) prenumererar X gånger på samma ström får den samma händelse X gånger, en gång för varje prenumeration. Konsumenten kan också avbryta prenumerationen från en enskild prenumeration. Den hittar alla sina aktuella prenumerationer genom att anropa:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Återställa från fel
Om producenten av en ström dör (eller om dess korn är inaktiverat) finns det inget den behöver göra. Nästa gång det här kornet vill skapa fler händelser kan det få strömhandtaget igen och skapa nya händelser på samma sätt.
Konsumentlogik är lite mer involverad. Som vi sa tidigare är den här prenumerationen giltig tills kornet uttryckligen avregistreras när ett konsumentkorn prenumererar på en dataström. Om strömmens konsument dör (eller om dess korn är inaktiverat) och en ny händelse genereras i strömmen, återaktiveras konsumentkornet automatiskt (precis som alla vanliga Orleans korn aktiveras automatiskt när ett meddelande skickas till den). Det enda som kornkoden behöver göra nu är att tillhandahålla en IAsyncObserver<T> för att bearbeta data. Konsumenten måste koppla om bearbetningslogik som en del av OnActivateAsync() metoden. För att göra det kan det anropa:
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
Konsumenten använder den tidigare referensen som den fick när den först prenumererade på "återuppta bearbetningen". Observera att ResumeAsync bara uppdaterar en befintlig prenumeration med den nya instansen av IAsyncObserver
logik och inte ändrar det faktum att den här konsumenten redan prenumererar på den här strömmen.
Hur får konsumenten en gammal subscriptionHandle
? Det finns 2 alternativ. Konsumenten kan ha sparat handtaget som den gavs tillbaka från den ursprungliga SubscribeAsync
åtgärden och kan använda den nu. Om konsumenten inte har handtaget kan den också be om IAsyncStream<T>
alla sina aktiva prenumerationsreferenser genom att anropa:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Konsumenten kan nu återuppta alla eller avsluta prenumerationen från vissa om den vill.
Dricks
Om konsumentkornet implementerar IAsyncObserver<T> gränssnittet direkt (public class MyGrain<T> : Grain, IAsyncObserver<T>
), bör det i teorin inte vara nödvändigt att återansluta IAsyncObserver
och därför inte behöver anropa ResumeAsync
. Strömningskörningen bör automatiskt kunna ta reda på att kornet redan implementeras IAsyncObserver
och bara anropa dessa IAsyncObserver
metoder. Strömningskörningen stöder dock för närvarande inte detta och kornkoden måste fortfarande uttryckligen anropa ResumeAsync
, även om kornet implementeras IAsyncObserver
direkt.
Explicita och implicita prenumerationer
Som standard måste en stream-konsument uttryckligen prenumerera på strömmen. Den här prenumerationen utlöses vanligtvis av ett externt meddelande som kornet (eller klienten) tar emot som instruerar den att prenumerera. I till exempel en chatttjänst när en användare ansluter till ett chattrum får hans korn ett JoinChatGroup
meddelande med chattnamnet, vilket gör att användarens kornighet prenumererar på den här chattströmmen.
Dessutom Orleans stöder strömmar även implicita prenumerationer. I den här modellen prenumererar inte kornet uttryckligen på strömmen. Det här kornet prenumereras automatiskt, implicit, bara baserat på dess kornidentitet och en ImplicitStreamSubscriptionAttribute. Implicita prenumerationers huvudvärde gör att strömaktiviteten kan utlösa kornig aktivering (vilket utlöser prenumerationen) automatiskt. Om ett korn till exempel vill skapa en ström och en annan kornprocess för den här strömmen, skulle producenten behöva känna till konsumentkornets identitet och göra ett kornanrop till den där den uppmanas att prenumerera på strömmen. Först efter det kan den börja skicka händelser. Med implicita prenumerationer kan producenten i stället bara börja producera händelser till en dataström, och konsumentkornet aktiveras automatiskt och prenumererar på strömmen. I så fall bryr sig producenten inte alls om vem som läser händelserna
Kornimplementeringen MyGrainType
kan deklarera ett attribut [ImplicitStreamSubscription("MyStreamNamespace")]
. Detta talar om för strömningskörningen att när en händelse genereras på en ström vars identitet är GUID XXX och "MyStreamNamespace"
namnrymd, ska den levereras till kornet vars identitet är XXX av typen MyGrainType
. Det vill: körningen mappar dataströmmen <XXX, MyStreamNamespace>
till konsumentkorn .<XXX, MyGrainType>
Förekomsten av ImplicitStreamSubscription
gör att strömningskörningen automatiskt prenumererar på det här kornet till en ström och levererar streamhändelserna till den. Kornkoden måste dock fortfarande tala om för körningen hur den vill att händelser ska bearbetas. I princip måste den IAsyncObserver
bifoga . När kornet aktiveras måste därför kornkoden inuti OnActivateAsync
anropa:
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
streamProvider.GetStream<T>(streamId);
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
Skriva prenumerationslogik
Nedan visas riktlinjerna för hur du skriver prenumerationslogik för olika fall: explicita och implicita prenumerationer, bakåtspolningsbara och icke-bakåtspolbara strömmar. Den största skillnaden mellan explicita och implicita prenumerationer är att för implicita korn har alltid exakt en implicit prenumeration för varje stream-namnområde. Det finns inget sätt att skapa flera prenumerationer (det finns ingen prenumerationsmultlicering), det finns inget sätt att avbryta prenumerationen och kornlogik behöver alltid bara koppla bearbetningslogik. Det innebär också att det för implicita prenumerationer aldrig finns något behov av att återuppta en prenumeration. Å andra sidan, för explicita prenumerationer, måste man återuppta prenumerationen, annars, om kornet prenumererar igen resulterar det i att kornet prenumererar flera gånger.
Implicita prenumerationer:
För implicita prenumerationer måste kornet fortfarande prenumerera på för att koppla bearbetningslogik. Detta kan göras i konsumentkornet genom att implementera gränssnitten IStreamSubscriptionObserver
och IAsyncObserver<T>
så att kornet kan aktiveras separat från att prenumerera. För att prenumerera på strömmen skapar kornet ett handtag och anropar await handle.ResumeAsync(this)
i sin OnSubscribed(...)
metod.
För att bearbeta meddelanden IAsyncObserver<T>.OnNextAsync(...)
implementeras metoden för att ta emot dataströmmar och en sekvenstoken. ResumeAsync
Alternativt kan metoden ta en uppsättning ombud som representerar metoderna för IAsyncObserver<T>
gränssnittet, , onNextAsync
onErrorAsync
och onCompletedAsync
.
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
Explicita prenumerationer:
För explicita prenumerationer måste ett korn anropa SubscribeAsync
för att prenumerera på strömmen. Detta skapar en prenumeration och kopplar bearbetningslogik. Den explicita prenumerationen finns tills kornet avregistreras, så om ett korn inaktiveras och återaktiveras prenumereras kornet fortfarande uttryckligen, men ingen bearbetningslogik kopplas. I det här fallet måste kornet koppla om bearbetningslogik. För att göra det måste OnActivateAsync
kornet först ta reda på vilka prenumerationer det har genom att anropa IAsyncStream<T>.GetAllSubscriptionHandles(). Kornet måste köras ResumeAsync
på varje handtag som den vill fortsätta bearbetningen eller avprenumereraAsync på alla referenser som den görs med. Kornet kan också ange StreamSequenceToken
som ett argument för anropen, vilket gör att den här explicita prenumerationen börjar förbruka från den ResumeAsync
token.
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
Strömma ordnings- och sekvenstoken
Ordningen på händelseleveransen mellan en enskild producent och en enskild konsument beror på strömleverantören.
Med SMS kontrollerar producenten uttryckligen ordningen på händelser som konsumenten ser genom att kontrollera hur producenten publicerar dem. Som standard (om SimpleMessageStreamProviderOptions.FireAndForgetDelivery alternativet för SMS-providern är inställt på false) och om producenten väntar på varje OnNextAsync
anrop kommer händelserna att komma i FIFO-ordning. I SMS är det upp till producenten att bestämma hur leveransfelen ska hanteras som indikeras av ett avbrott Task
som returneras av OnNextAsync
samtalet.
Azure Queue-strömmar garanterar inte FIFO-ordning eftersom de underliggande Azure-köerna inte garanterar ordningen i felfall. (De garanterar FIFO-ordningen i felfria körningar.) När en producent skapar händelsen i Azure Queue, om köåtgärden misslyckas, är det upp till producenten att försöka en annan kö och senare hantera potentiella duplicerade meddelanden. På leveranssidan Orleans rensar strömningskörningen händelsen från kön och försöker leverera den för bearbetning till konsumenter. Orleans Strömningskörningen tar bara bort händelsen från kön när bearbetningen har slutförts. Om leveransen eller bearbetningen misslyckas tas händelsen inte bort från kön och visas automatiskt i kön igen senare. Strömningskörningen försöker leverera den igen, vilket kan bryta FIFO-ordningen. Ovanstående beteende matchar den normala semantiken i Azure Queues.
Programdefinierad ordning: Om du vill hantera ovanstående beställningsproblem kan ett program ange sin beställning. Detta uppnås via ett StreamSequenceToken, som är ett ogenomskinliga IComparable objekt som kan användas för att beställa händelser. En producent kan skicka ett valfritt StreamSequenceToken
till anropet OnNext
. Detta StreamSequenceToken
skickas till konsumenten och levereras tillsammans med händelsen. På så sätt kan ett program resonera och rekonstruera sin ordning oberoende av strömningskörningen.
Dataströmmar som kan spolas tillbaka
Vissa strömmar tillåter endast att ett program prenumererar på dem från och med den senaste tidpunkten, medan andra strömmar tillåter "tillbaka i tiden". Den senare funktionen är beroende av den underliggande kötekniken och den specifika strömleverantören. Azure-köer tillåter till exempel endast användning av de senaste aktuella händelserna, medan EventHub tillåter uppspelning av händelser från en godtycklig tidpunkt (upp till en viss förfallotid). Strömmar som stöder tillbaka i tiden kallas för bakåtspolningsbara strömmar.
Konsumenten av en bakåtspolningsbar ström kan skicka ett StreamSequenceToken
till anropet SubscribeAsync
. Körningen levererar händelser till den från och med den StreamSequenceToken
. En null-token innebär att konsumenten vill ta emot händelser från och med den senaste.
Möjligheten att spola tillbaka en ström är mycket användbar i återställningsscenarier. Tänk dig till exempel ett korn som prenumererar på en ström och regelbundet kontrollerar dess tillstånd tillsammans med den senaste sekvenstoken. När du återställer från ett fel kan kornet prenumerera på samma ström igen från den senaste kontrollpunktssekvenstoken och därmed återställa utan att förlora några händelser som har genererats sedan den senaste kontrollpunkten.
Event Hubs-providern kan spolas tillbaka. Du hittar koden på GitHub: Orleans/Azure/Orleans. Streaming.EventHubs. SMS - och Azure Queue-leverantörer kan inte spolas tillbaka.
Tillståndslös automatiskt utskalad bearbetning
Som standard Orleans är Streaming avsett att stödja ett stort antal relativt små strömmar, som var och en bearbetas av ett eller flera tillståndskänsliga korn. Tillsammans partitioneras bearbetningen av alla strömmar mellan ett stort antal vanliga (tillståndskänsliga) korn. Programkoden styr den här horisontella partitioneringen genom att tilldela ström-ID:t och korn-ID:erna och genom att uttryckligen prenumerera. Målet är horisontell tillståndskänslig bearbetning.
Det finns dock också ett intressant scenario med automatiskt utskalad tillståndslös bearbetning. I det här scenariot har ett program ett litet antal strömmar (eller till och med en stor ström) och målet är tillståndslös bearbetning. Ett exempel är en global ström av händelser, där bearbetningen innebär avkodning av varje händelse och potentiellt vidarebefordrar den till andra strömmar för ytterligare tillståndskänslig bearbetning. Tillståndslös utskalad dataströmbearbetning kan stödjas i Orleans via StatelessWorkerAttribute korn.
Aktuell status för tillståndslös automatiskt utskalad bearbetning: Detta har ännu inte implementerats. Ett försök att prenumerera på en dataström från ett StatelessWorker
korn resulterar i odefinierat beteende. Vi överväger att stödja det här alternativet.
Korn och Orleans klienter
Orleans strömmar fungerar enhetligt mellan korn och Orleans klienter. Samma API:er kan alltså användas i ett korn och i en Orleans klient för att producera och använda händelser. Detta förenklar programlogik avsevärt, vilket gör särskilda API:er på klientsidan, till exempel Kornobservatörer, redundanta.
Fullständigt hanterad och tillförlitlig pub-sub för direktuppspelning
För att spåra stream-prenumerationer använder en körningskomponent Orleans med namnet Streaming Pub-Sub som fungerar som en mötesplats för strömkonsumenter och strömproducenter. Pub-sub spårar alla stream-prenumerationer och bevarar dem och matchar strömkonsumenter med strömproducenter.
Program kan välja var och hur Pub-Sub-data lagras. Pub-Sub-komponenten implementeras som korn (kallas PubSubRendezvousGrain
), som använder Orleans deklarativ persistens. PubSubRendezvousGrain
använder lagringsprovidern med namnet PubSubStore
. Precis som med alla korn kan du ange en implementering för en lagringsprovider. För Streaming Pub-Sub kan du ändra implementeringen av at silo-byggtiden med hjälp av PubSubStore
silovärdbyggaren:
Följande konfigurerar Pub-Sub för att lagra dess tillstånd i Azure-tabeller.
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
På så sätt lagras Pub-Sub-data på ett lämpligt sätt i Azure Table. För inledande utveckling kan du även använda minneslagring. Förutom Pub-Sub Orleans levererar Streaming Runtime händelser från producenter till konsumenter, hanterar alla körningsresurser som allokerats till aktivt använda strömmar och transparent skräp samlar in körningsresurser från oanvända strömmar.
Konfiguration
Om du vill använda strömmar måste du aktivera strömproviders via silovärden eller klusterklientbyggarna. Du kan läsa mer om stream-leverantörer här. Exempel på konfiguration av stream-provider:
hostBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConfigureTableServiceClient("<Secret>")))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");