Details van de implementatie van Orleans streams
Deze sectie biedt een algemeen overzicht van Orleans Stream-implementatie. Hierin worden concepten en details beschreven die niet zichtbaar zijn op toepassingsniveau. Als u alleen streams wilt gebruiken, hoeft u deze sectie niet te lezen.
Terminologie:
We verwijzen door het woord 'queue' naar elke duurzame opslagtechnologie die stroomgebeurtenissen kan opnemen en waarmee gebeurtenissen kunnen worden opgehaald of een push-mechanisme wordt geboden om gebeurtenissen te gebruiken. Meestal bieden deze technologieën shard-/gepartitioneerde wachtrijen om schaalbaarheid te bieden. Met Azure Queues kunt u bijvoorbeeld meerdere wachtrijen maken en Event Hubs hebben meerdere hubs.
Permanente stromen
Alle persistente streamproviders van Orleans delen een gemeenschappelijke implementatie PersistentStreamProvider. Deze algemene streamproviders moeten worden geconfigureerd met een technologiespecifieke IQueueAdapterFactory.
Voor testdoeleinden hebben we bijvoorbeeld wachtrijadapters die hun testgegevens genereren in plaats van de gegevens uit een wachtrij te lezen. De onderstaande code laat zien hoe we een permanente streamprovider configureren voor het gebruik van onze aangepaste (generator) wachtrijadapter. Dit doet u door de permanente streamprovider te configureren met een fabrieksfunctie die wordt gebruikt om de adapter te maken.
hostBuilder.AddPersistentStreams(
StreamProviderName, GeneratorAdapterFactory.Create);
Wanneer een streamproducent een nieuw streamitem en aanroept stream.OnNext()
, roept de streamingruntime van Orleans de juiste methode aan op de IQueueAdapter streamprovider die het item rechtstreeks naar de juiste wachtrij enqueueert.
Agents ophalen
De kern van de Persistent Stream Provider zijn de pull-agents. Door agents op te halen, worden gebeurtenissen opgehaald uit een set duurzame wachtrijen en geleverd aan de toepassingscode in korrels die ze verbruiken. Men kan de pull-agents beschouwen als een gedistribueerde 'microservice': een gepartitioneerd, maximaal beschikbaar en elastisch gedistribueerd onderdeel. De pull-agents worden uitgevoerd binnen dezelfde silo's die toepassingskorrels hosten en volledig worden beheerd door de Orleans Streaming Runtime.
StreamQueueMapper
en StreamQueueBalancer
Pull-agents worden geparameteriseerd met IStreamQueueMapper en IStreamQueueBalancer. De IStreamQueueMapper
lijst bevat een lijst met alle wachtrijen en is ook verantwoordelijk voor het toewijzen van streams aan wachtrijen. Op die manier weet de producentzijde van de Persistent Stream Provider in welke wachtrij het bericht moet worden geplaatst.
De IStreamQueueBalancer
uiting van de manier waarop wachtrijen worden verdeeld over Orleans silo's en agenten. Het doel is om wachtrijen op een evenwichtige manier toe te wijzen aan agents om knelpunten te voorkomen en elasticiteit te ondersteunen. Wanneer er een nieuwe silo wordt toegevoegd aan het Orleans-cluster, worden wachtrijen automatisch opnieuw verdeeld over de oude en nieuwe silo's. Hiermee StreamQueueBalancer
kunt u dat proces aanpassen. Orleans heeft verschillende ingebouwde StreamQueueBalancers, ter ondersteuning van verschillende taakverdelingsscenario's (groot en klein aantal wachtrijen) en verschillende omgevingen (Azure, on-premises, statisch).
Met behulp van het bovenstaande voorbeeld van de testgenerator ziet u in de onderstaande code hoe u de wachtrijtoewijzingper en queue balancer kunt configureren.
hostBuilder
.AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
providerConfigurator =>
providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
ob => ob.Configure(options => options.TotalQueueCount = 8))
.UseDynamicClusterConfigDeploymentBalancer());
Met de bovenstaande code wordt het GeneratorAdapterFactory gebruik van een wachtrijtoewijzing met acht wachtrijen geconfigureerd en worden de wachtrijen verdeeld over het cluster met behulp van de DynamicClusterConfigDeploymentBalancer.
Pull-protocol
Elke silo voert een set pull-agents uit, elke agent trekt uit één wachtrij. Pull-agents zelf worden geïmplementeerd door een intern runtime-onderdeel, SystemTarget genaamd. SystemTargets zijn in wezen runtimekorrels, zijn onderhevig aan gelijktijdigheid met één thread, kunnen reguliere graanberichten gebruiken en zijn net zo licht als korrels. In tegenstelling tot korrels zijn SystemTargets niet virtueel: ze worden expliciet gemaakt (door de runtime) en zijn niet transparant. Door pull-agents te implementeren als SystemTargets, kan de Orleans Streaming Runtime afhankelijk zijn van ingebouwde Orleans-functies en kan worden geschaald naar een zeer groot aantal wachtrijen, omdat het maken van een nieuwe pull-agent net zo goedkoop is als het maken van een nieuw graan.
Elke pull-agent voert een periodieke timer uit die uit de wachtrij haalt door de IQueueAdapterReceiver.GetQueueMessagesAsync methode aan te roepen. De geretourneerde berichten worden in de interne gegevensstructuur per agent geplaatst.IQueueCache Elk bericht wordt geïnspecteerd om de doelstroom te achterhalen. De agent gebruikt de Pub-Sub om de lijst met streamgebruikers te achterhalen die zijn geabonneerd op deze stream. Zodra de consumentenlijst is opgehaald, slaat de agent deze lokaal op (in de pub-subcache), zodat deze niet hoeft te worden geraadpleegd met Pub-Sub voor elk bericht. De agent abonneert zich ook op de pub-sub om meldingen te ontvangen van nieuwe consumenten die zich op die stream abonneren. Deze handshake tussen de agent en de pub-sub garandeert sterke semantiek van het streaming-abonnement: zodra de consument zich heeft geabonneerd op de stream, ziet u alle gebeurtenissen die zijn gegenereerd nadat deze zich heeft geabonneerd. Bovendien kunt u met het gebruik StreamSequenceToken
ervan zich in het verleden abonneren.
Wachtrijcache
IQueueCache is een interne gegevensstructuur per agent waarmee nieuwe gebeurtenissen uit de wachtrij kunnen worden ontkoppeld en aan consumenten kunnen worden geleverd. Het biedt ook de mogelijkheid om levering aan verschillende streams en verschillende consumenten los te koppelen.
Imagine een situatie waarin één stream 3 streamgebruikers heeft en een ervan traag is. Als er niets wordt gedaan, kan deze trage consument de voortgang van de agent beïnvloeden, het verbruik van andere consumenten van die stream vertragen en zelfs het dequeueren en leveren van gebeurtenissen voor andere streams vertragen. Om dat te voorkomen en maximale parallelle uitvoering in de agent toe te staan, gebruiken IQueueCache
we .
IQueueCache
buffers streamen gebeurtenissen en biedt een manier voor de agent om gebeurtenissen in een eigen tempo aan elke consument te leveren. De levering per consument wordt geïmplementeerd door het interne onderdeel IQueueCacheCursor, dat de voortgang per consument bijhoudt. Op die manier ontvangt elke consument gebeurtenissen in zijn eigen tempo: snelle consumenten ontvangen gebeurtenissen zo snel als ze uit de wachtrij worden verwijderd, terwijl gebruikers ze later langzaam ontvangen. Zodra het bericht aan alle consumenten is bezorgd, kan het uit de cache worden verwijderd.
Backpressure
Backpressure in Orleans Streaming Runtime is op twee plaatsen van toepassing: streamgebeurtenissen uit de wachtrij naar de agent brengen en de gebeurtenissen van de agent leveren om consumenten te streamen.
De laatste wordt geleverd door het ingebouwde mechanisme voor berichtbezorging van Orleans. Elke streamgebeurtenis wordt van de agent aan consumenten geleverd via de standaard Orleans grain messaging, één voor één. Dat wil gezegd, de agents verzenden één gebeurtenis (of een beperkte batch gebeurtenissen) naar elke streamgebruiker en wachten op deze aanroep. De volgende gebeurtenis wordt pas geleverd als de taak voor de vorige gebeurtenis is opgelost of verbroken. Op die manier beperken we natuurlijk de leveringssnelheid per consument tot één bericht tegelijk.
Wanneer u streamgebeurtenissen uit de wachtrij naar de agent brengt, biedt Orleans Streaming een nieuw speciaal backpressuremechanisme. Omdat de agent het loskoppelen van gebeurtenissen uit de wachtrij loskoppelt en deze aan consumenten levert, kan één trage consument zo veel achterblijven dat de IQueueCache
gebeurtenissen vol raken. Om te voorkomen dat IQueueCache
ze voor onbepaalde tijd groeien, beperken we de grootte (de groottelimiet kan worden geconfigureerd). De agent gooit echter nooit niet-bezorgde gebeurtenissen weg.
Wanneer de cache begint te vullen, vertragen de agents in plaats daarvan de snelheid van het verwijderen van gebeurtenissen uit de wachtrij. Op die manier kunnen we de trage leveringsperioden "uitrijden" door de snelheid aan te passen waarmee we verbruiken uit de wachtrij ('backpressure') en later weer terug te gaan naar snelle verbruikstarieven. Voor het detecteren van de 'langzame levering' valleien gebruikt u IQueueCache
een interne gegevensstructuur van cachebuckets die de voortgang van de levering van gebeurtenissen aan individuele streamgebruikers bijhoudt. Dit resulteert in een zeer responsief en zelf aanpassend systeem.