Implementeringsinformation för Orleans-strömmar
Det här avsnittet innehåller en översikt över Implementeringen av Orleans Stream. Den beskriver begrepp och information som inte visas på programnivå. Om du bara planerar att använda strömmar behöver du inte läsa det här avsnittet.
Terminologi:
Vi refererar med ordet "kö" till all hållbar lagringsteknik som kan mata in strömhändelser och tillåter antingen att hämta händelser eller tillhandahåller en push-baserad mekanism för att använda händelser. För att ge skalbarhet tillhandahåller dessa tekniker vanligtvis shardade/partitionerade köer. Med Azure Queues kan du till exempel skapa flera köer och Event Hubs har flera hubbar.
Beständiga strömmar
Alla Orleans beständiga dataströmleverantörer delar en gemensam implementering PersistentStreamProvider. Dessa generiska dataströmleverantörer måste konfigureras med en teknikspecifik IQueueAdapterFactory.
I testsyfte har vi till exempel kökort som genererar testdata i stället för att läsa data från en kö. Koden nedan visar hur vi konfigurerar en beständig dataströmprovider för att använda vårt anpassade (generator) kökort. Det gör den genom att konfigurera den beständiga strömprovidern med en fabriksfunktion som används för att skapa adaptern.
hostBuilder.AddPersistentStreams(
StreamProviderName, GeneratorAdapterFactory.Create);
När en strömproducent genererar ett nytt dataströmobjekt och anropar stream.OnNext()
anropar Orleans strömningskörning lämplig metod för IQueueAdapter den dataströmprovidern som placerar objektet direkt i rätt kö.
Hämta agenter
I hjärtat av Persistent Stream Provider finns hämtar agenter. Hämta agenter hämtar händelser från en uppsättning varaktiga köer och levererar dem till programkoden i korn som förbrukar dem. Man kan betrakta hämtarna som en distribuerad "mikrotjänst" – en partitionerad, högtillgänglig och elastisk distribuerad komponent. Mottagaragenterna körs i samma silor som är värdar för programintervall och hanteras fullständigt av Orleans Streaming Runtime.
StreamQueueMapper
och StreamQueueBalancer
Hämtar agenter parametriseras med IStreamQueueMapper och IStreamQueueBalancer. IStreamQueueMapper
innehåller en lista över alla köer och ansvarar även för att mappa strömmar till köer. På så sätt vet producentsidan för den beständiga Stream-providern i vilken kö meddelandet ska placeras i.
IStreamQueueBalancer
Uttrycker hur köer balanseras över Orleans silor och agenter. Målet är att tilldela köer till agenter på ett balanserat sätt för att förhindra flaskhalsar och stödja elasticitet. När en ny silo läggs till i Orleans-klustret balanseras köerna automatiskt om över gamla och nya silor. StreamQueueBalancer
tillåter anpassning av den processen. Orleans har flera inbyggda StreamQueueBalancers för att stödja olika balansscenarier (stort och litet antal köer) och olika miljöer (Azure, lokal, statisk).
Med testgeneratorexemplet ovan visar koden nedan hur man kan konfigurera kömapparen och köbalanseraren.
hostBuilder
.AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
providerConfigurator =>
providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
ob => ob.Configure(options => options.TotalQueueCount = 8))
.UseDynamicClusterConfigDeploymentBalancer());
Koden ovan konfigurerar GeneratorAdapterFactory för att använda en kömappare med åtta köer och balanserar köerna i klustret med hjälp av DynamicClusterConfigDeploymentBalancer.
Hämtar protokoll
Varje silo kör en uppsättning hämtar agenter, varje agent hämtar från en kö. Själva hämtar agenter implementeras av en intern körningskomponent som kallas SystemTarget. SystemTargets är i princip körningsintervall, omfattas av enkeltrådad samtidighet, kan använda vanliga grain-meddelanden och är lika lätta som korn. Till skillnad från kornigheter är SystemTargets inte virtuella: de skapas uttryckligen (av körningen) och är inte platstransparenta. Genom att implementera hämtar agenter som SystemTargets kan Orleans Streaming Runtime förlita sig på inbyggda Orleans-funktioner och kan skalas till ett mycket stort antal köer, eftersom det är lika billigt att skapa en ny dragagent som att skapa ett nytt korn.
Varje pull-agent kör en periodisk timer som hämtar från kön genom att IQueueAdapterReceiver.GetQueueMessagesAsync anropa metoden . De returnerade meddelandena placeras i den interna datastrukturen per agent med namnet IQueueCache. Varje meddelande inspekteras för att ta reda på dess målström. Agenten använder Pub-Sub för att ta reda på listan över strömkonsumenter som prenumererar på den här strömmen. När konsumentlistan har hämtats lagrar agenten den lokalt (i pub-sub-cachen) så att den inte behöver rådgöra med Pub-Sub i varje meddelande. Agenten prenumererar också på pub-sub för att ta emot meddelanden om nya konsumenter som prenumererar på strömmen. Den här handskakningen mellan agenten och pub-sub garanterar stark strömmande prenumerationssemantik: när konsumenten har prenumererat på dataströmmen visas alla händelser som genererades när den har prenumererat. Med hjälp av StreamSequenceToken
kan den dessutom prenumerera tidigare.
Köcache
IQueueCache är en intern datastruktur per agent som gör det möjligt att skilja från nya händelser från kön och leverera dem till konsumenter. Det möjliggör också avkoppling av leverans till olika strömmar och olika konsumenter.
Imagine en situation där en ström har 3 strömkonsumenter och en av dem är långsam. Om ingen försiktighet vidtas kan den här långsamma konsumenten påverka agentens framsteg, bromsa förbrukningen av andra användare av strömmen och till och med sakta ned filtreringen och leveransen av händelser för andra strömmar. För att förhindra detta och tillåta maximal parallellitet i agenten använder IQueueCache
vi .
IQueueCache
buffrar strömhändelser och gör det möjligt för agenten att leverera händelser till varje konsument i sin egen takt. Leverans per konsument implementeras av den interna komponenten , IQueueCacheCursorsom spårar förloppet per konsument. På så sätt får varje konsument händelser i sin egen takt: snabba konsumenter får händelser lika snabbt som de tas bort från kön, medan långsamma konsumenter får dem senare. När meddelandet har levererats till alla konsumenter kan det tas bort från cachen.
Ryggtryck
Återtryck i Orleans Streaming Runtime gäller på två platser: att hämta strömhändelser från kön till agenten och leverera händelserna från agenten till strömkonsumenter.
Den senare tillhandahålls av den inbyggda meddelandeleveransmekanismen i Orleans. Varje stream-händelse levereras från agenten till konsumenter via Standard Orleans grain messaging, en i taget. Det vill säga att agenterna skickar en händelse (eller en begränsad mängd händelser) till varje strömkonsument och väntar på det här anropet. Nästa händelse börjar inte levereras förrän uppgiften för föregående händelse har lösts eller brutits. På så sätt begränsar vi naturligtvis leveransfrekvensen per konsument till ett meddelande i taget.
När stream-händelser hämtas från kön till agenten tillhandahåller Orleans Streaming en ny speciell mekanism för backpressure. Eftersom agenten frikopplar från kön och levererar dem till konsumenter kan en enda långsam konsument hamna på efterkälken så mycket att viljan IQueueCache
fylls. För att förhindra att IQueueCache
den växer på obestämd tid begränsar vi dess storlek (storleksgränsen kan konfigureras). Agenten kastar dock aldrig bort händelser som inte har levererats.
När cacheminnet i stället börjar fyllas på saktar agenterna ned frekvensen för att avqueuera händelser från kön. På så sätt kan vi "rida ut" de långsamma leveransperioderna genom att justera den hastighet som vi förbrukar från kön ("backpressure") och återgå till snabba förbrukningsfrekvenser senare. För att identifiera "långsam leverans" dalar IQueueCache
använder en intern datastruktur av cache buckets som spårar förloppet för leverans av händelser till enskilda strömkonsumenter. Detta resulterar i ett mycket responsivt och självjusterande system.