Orleans-Streaming-APIs
Anwendungen interagieren mit Streams über APIs, die dem Konzept der Reactive Extensions (Rx) in .NET sehr ähnlich sind. Der Hauptunterschied besteht darin, dass Orleans-Streamerweiterungen asynchron arbeiten, um im verteilten und skalierbaren Computefabric von Orleans eine effizientere Verarbeitung zu ermöglichen.
AsyncStream
Der erste Schritt einer Anwendung besteht darin, mithilfe eines Streamanbieters ein Handle für einen Stream abzurufen. Weitere Informationen zu Streamanbietern finden Sie hier. Vorerst können Sie sich diese als eine Streamfactory vorstellen, bei deren Implementierung das Verhalten und die Semantik von Streams angepasst werden können:
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
Eine Anwendung kann einen Verweis auf den Streamanbieter entweder durch Aufruf der Grain.GetStreamProvider-Methode innerhalb eines Grains oder durch Aufruf der GrainClient.GetStreamProvider-Methode auf dem Client abrufen.
Orleans.Streams.IAsyncStream<T> ist ein logisches, stark typisiertes Handle für einen virtuellen Stream. Es ähnelt im Ansatz dem Orleans-Grainverweis. Die Aufrufe von GetStreamProvider
und GetStream
sind rein lokal. Als Argumente für GetStream
dienen eine GUID und eine zusätzliche Zeichenfolge, die wir als Streamnamespace bezeichnen (dieser kann NULL sein). Die GUID und die Namespacezeichenfolge bilden zusammen die Streamidentität (ähnlich wie die Argumente für IGrainFactory.GetGrain). Die Kombination aus GUID und Namespacezeichenfolge bietet zusätzliche Flexibilität bei der Bestimmung von Streamidentitäten. Genauso wie Grain 7 im Graintyp PlayerGrain
und ein anderes Grain 7 im Graintyp ChatRoomGrain
vorliegen kann, kann Stream 123 im Streamnamespace PlayerEventsStream
und ein anderer Stream 123 im Streamnamespace ChatRoomMessagesStream
vorliegen.
Erzeugung und Nutzung
IAsyncStream<T> implementiert sowohl die Schnittstelle IAsyncObserver<T> als auch die Schnittstelle IAsyncObservable<T>. Auf diese Weise kann eine Anwendung den Stream entweder nutzen, um mit Orleans.Streams.IAsyncObserver<T>
neue Ereignisse im Stream zu erzeugen, oder um mit Orleans.Streams.IAsyncObservable<T>
Ereignisse aus einem Stream zu abonnieren und zu nutzen.
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
Um Ereignisse im Stream zu erzeugen, verwendet eine Anwendung einfach den folgenden Aufruf:
await stream.OnNextAsync<T>(event)
Zum Abonnieren eines Streams verwendet eine Anwendung diesen Aufruf:
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
Das Argument für SubscribeAsync kann entweder ein Objekt sein, das die Schnittstelle IAsyncObserver<T> implementiert, oder eine Kombination von Lambda-Funktionen zur Verarbeitung eingehender Ereignisse. Weitere Optionen für SubscribeAsync
stehen über die Klasse AsyncObservableExtensions zur Verfügung. SubscribeAsync
gibt ein StreamSubscriptionHandle<T> zurück, ein opakes Handle, das zum Kündigen eines Streamabonnements verwendet werden kann (ähnlich wie eine asynchrone Version von IDisposable).
await subscriptionHandle.UnsubscribeAsync()
Es ist wichtig zu beachten, dass das Abonnement für ein Grain gilt, nicht für die Aktivierung. Sobald der Graincode den Stream abonniert hat, überdauert dieses Abonnement die Lebensdauer dieser Aktivierung und bleibt so lange bestehen, bis der Graincode (möglicherweise in einer anderen Aktivierung) das Abonnement ausdrücklich kündigt. Dies ist der Kern der Abstraktion eines virtuellen Streams: Nicht nur alle Streams sind logisch dauerhaft vorhanden, sondern auch ein Streamabonnement ist beständig und überdauert eine bestimmte physische Aktivierung, durch die das Abonnement erstellt wurde.
Multiplizität
Ein Orleans-Stream kann über mehrere Producer und mehrere Consumer verfügen. Eine von einem Producer veröffentlichte Nachricht wird an alle Consumer übermittelt, die den Stream abonniert haben, bevor die Nachricht veröffentlicht wurde.
Darüber hinaus kann der Consumer einen Stream mehrfach abonnieren. Für jedes Abonnement wird ein eindeutiges StreamSubscriptionHandle<T> zurückgegeben. Wenn ein Grain (oder Client) X-mal denselben Stream abonniert hat, empfängt er das gleiche Ereignis X-mal, einmal für jedes Abonnement. Der Consumer kann auch ein einzelnes Abonnement abbestellen. Alle derzeit bestehenden Abonnements können durch den folgenden Aufruf ermittelt werden:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Wiederherstellung bei Fehlern
Wenn der Producer eines Streams wegfällt (oder das zugehörige Grain deaktiviert wird), ist keine weitere Maßnahme erforderlich. Wenn dieses Grain später weitere Ereignisse erzeugen möchte, kann es das Streamhandle erneut abrufen und in gleicher Weise neue Ereignisse erzeugen.
Die Consumerlogik ist etwas komplizierter. Wie bereits erwähnt, bleibt ein Streamabonnement eines Conumergrains so lange gültig, bis es ausdrücklich vom Grain abbestellt wird. Wenn der Consumer des Streams wegfällt (oder das zugehörige Grain deaktiviert wird) und ein neues Ereignis für den Stream erzeugt wird, wird das Consumergrain automatisch reaktiviert (so wie jedes reguläre Orleans-Grain automatisch aktiviert wird, wenn eine Nachricht an das Grain gesendet wird). Dann muss der Graincode nur noch eine IAsyncObserver<T>-Schnittstelle bereitstellen, um die Daten zu verarbeiten. Der Consumer muss die Verarbeitungslogik als Teil der OnActivateAsync()-Methode neu anfügen. Dazu kann folgender Aufruf verwendet werden:
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
Der Consumer verwendet das vorherige Handle, das er beim erstmaligen Abonnieren der Verarbeitungsfortsetzung erhalten hat. Beachten Sie, dass ResumeAsync lediglich ein bestehendes Abonnement mit der neuen Instanz der IAsyncObserver
-Logik aktualisiert. Dies ändert nichts an dem Umstand, dass der Consumer diesen Stream bereits abonniert hat.
Wie ruft der Consumer ein altes subscriptionHandle
ab? Es gibt 2 Möglichkeiten. Möglicherweise hat der Consumer das im ursprünglichen SubscribeAsync
-Vorgang zurückgegebene Handle beibehalten und kann es jetzt verwenden. Wenn der Consumer nicht über das Handle verfügt, kann er alternativ über IAsyncStream<T>
alle für ihn aktiven Abonnementhandles abfragen:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Der Consumer kann diese dann entweder alle fortsetzen oder nach Belieben einige abbestellen.
Tipp
Wenn das Consumergrain die Schnittstelle IAsyncObserver<T> direkt implementiert (public class MyGrain<T> : Grain, IAsyncObserver<T>
), sollte ein erneutes Anfügen der Schnittstelle IAsyncObserver
theoretisch nicht erforderlich sein, und somit ist auch kein Aufruf von ResumeAsync
notwendig. Die Streamingruntime sollte automatisch feststellen können, dass das Grain bereits IAsyncObserver
implementiert und nur diese IAsyncObserver
-Methoden aufrufen. Dies wird derzeit jedoch von der Streamingruntime nicht unterstützt. Deshalb muss der Graincode weiterhin explizit ResumeAsync
aufrufen, selbst wenn das Grain IAsyncObserver
direkt implementiert.
Explizite und implizite Abonnements
Standardmäßig muss ein Streamconsumer den Stream explizit abonnieren. Dieses Abonnement wird in der Regel durch eine externe Nachricht ausgelöst, die das Grain (oder der Client) empfängt und in der das Grain angewiesen wird, den Stream zu abonnieren. Wenn Benutzer*innen beispielsweise einem Chatroom beitreten, empfängt das zugehörige Grain eine JoinChatGroup
-Nachricht mit dem Chatnamen. Dies führt dazu, dass das Grain der Benutzer*innen diesen Chatstream abonniert.
Darüber hinaus unterstützen Orleans-Streams auch implizite Abonnements. In diesem Modell abonniert das Grain den Stream nicht explizit. Dieses Grain wird automatisch und implizit abonniert, nur basierend auf seiner Grainidentität und einem ImplicitStreamSubscriptionAttribute. Der Hauptnutzen von impliziten Abonnements besteht darin, dass die Streamaktivität die Grainaktivierung (und damit das Abonnement) automatisch auslösen kann. Wenn z. B. ein Grain unter Verwendung von SMS-Streams einen Stream erzeugt und ein anderes Grain diesen Stream verarbeiten möchte, müsste der Producer die Identität des Consumergrains kennen und es per Grainaufruf anweisen, den Stream zu abonnieren. Erst danach kann das Grain mit dem Senden von Ereignissen beginnen. Stattdessen kann der Producer bei Verwendung impliziter Abonnements einfach damit beginnen, Ereignisse für einen Stream zu erzeugen. Das Consumergrain wird dann automatisch aktiviert und abonniert den Stream. In diesem Fall ist es für den Producer nicht relevant, wer die Ereignisse liest.
Die Grainimplementierung MyGrainType
kann ein Attribut [ImplicitStreamSubscription("MyStreamNamespace")]
deklarieren. Dadurch wird der Streamingruntime mitgeteilt, dass ein für den Stream generiertes Ereignis (dessen Identität die GUID XXX und den Namespace "MyStreamNamespace"
aufweist) an das Grain mit der Identität XXX vom Typ MyGrainType
übermittelt werden soll. Anders ausgedrückt: Die Runtime ordnet den Stream <XXX, MyStreamNamespace>
dem Consumergrain <XXX, MyGrainType>
zu.
Das Vorhandensein von ImplicitStreamSubscription
veranlasst die Streamingruntime, diesen Stream automatisch für das Grain zu abonnieren und die Streamereignisse an das Grain zu übermitteln. Allerdings muss der Graincode der Runtime weiterhin Informationen dazu bereitstellen, wie die Ereignisse verarbeitet werden sollen. Im Wesentlichen muss IAsyncObserver
angefügt werden. Deshalb muss der Graincode in OnActivateAsync
beim Aktivieren des Grains folgenden Aufruf ausführen:
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
streamProvider.GetStream<T>(streamId);
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
Schreiben von Abonnementlogik
Im Folgenden finden Sie die Richtlinien zum Schreiben der Abonnementlogik für verschiedene Fälle: explizite und implizite Abonnements zurückspulbare und nicht zurückspulbare Streams. Der Hauptunterschied zwischen expliziten und impliziten Abonnements besteht darin, dass das Grain bei impliziten Abonnements immer über genau ein implizites Abonnement für jeden Streamnamespace verfügt. Es gibt keine Möglichkeit, mehrere Abonnements zu erstellen oder Abonnements abzubestellen, und die Grainlogik kann stets nur die Verarbeitungslogik anfügen. Das bedeutet auch, dass es bei impliziten Abonnements nie nötig ist, ein Abonnement fortzusetzen. Bei expliziten Abonnements hingegen muss das Abonnement fortgesetzt werden, da das Grain bei einem erneuten Abonnieren ansonsten mehrfach abonniert wird.
Implizite Abonnements:
Bei impliziten Abonnements muss das Aggregationsintervall dennoch den Stream abonnieren, um die Verarbeitungslogik anzufügen. Dies kann im Aggregationsintervall des Consumers durch die Implementierung der Schnittstellen IStreamSubscriptionObserver
und IAsyncObserver<T>
erfolgen, sodass das Aggregationsintervall getrennt vom Abonnieren aktiviert werden kann. Um den Datenstrom zu abonnieren, erstellt das Aggregationsintervall ein Handle und ruft await handle.ResumeAsync(this)
in seiner Methode OnSubscribed(...)
auf.
Zum Verarbeiten von Nachrichten wird die Methode IAsyncObserver<T>.OnNextAsync(...)
zum Empfangen von Datenstromdaten und eines Sequenztokens implementiert. Alternative kann die Methode ResumeAsync
einen Delegiertensatz empfangen, der die Methoden der Schnittstelle IAsyncObserver<T>
sowie von onNextAsync
, onErrorAsync
und onCompletedAsync
darstellt.
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
Explizite Abonnements:
Bei expliziten Abonnements muss ein Grain SubscribeAsync
aufrufen, um den Stream zu abonnieren. Dadurch wird ein Abonnement erstellt und die Verarbeitungslogik angefügt. Das explizite Abonnement bleibt bestehen, bis das Grain das Abonnement kündigt. Wenn also ein Grain deaktiviert und wieder aktiviert wird, ist das Grain weiterhin explizit abonniert, aber es wird keine Verarbeitungslogik angefügt. In diesem Fall muss das Grain die Verarbeitungslogik erneut anfügen. Dazu muss das Grain zunächst im zugehörigen OnActivateAsync
die bestehenden Abonnements ermitteln, indem es IAsyncStream<T>.GetAllSubscriptionHandles() aufruft. Das Grain muss ResumeAsync
für jedes Handle ausführen, das weiterhin verarbeitet werden soll, oder „UnsubscribeAsync“ für alle Handles, die nicht mehr benötigt werden. Das Grain kann zudem optional StreamSequenceToken
als Argument für die ResumeAsync
-Aufrufe angeben, wodurch die Nutzung dieses expliziten Abonnements ab diesem Token beginnt.
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
Streamreihenfolge und Sequenztoken
Die Reihenfolge der Ereignisübermittlung zwischen einem einzelnen Producer und einem einzelnen Consumer richtet sich nach dem Streamanbieter.
Bei einem SMS-Stream steuert der Producer ausdrücklich die Reihenfolge der Ereignisse, die für den Consumer sichtbar sind, indem er die Art und Weise der Veröffentlichung durch den Producer steuert. Standardmäßig (wenn die Option SimpleMessageStreamProviderOptions.FireAndForgetDelivery für SMS-Anbieter auf FALSE festgelegt ist) und wenn der Producer auf jeden OnNextAsync
-Aufruf wartet, treffen die Ereignisse in FIFO-Reihenfolge ein. Bei einem SMS-Stream muss der Producer entscheiden, wie mit Übermittlungsfehlern umgegangen werden soll, auf die durch einen fehlerhaften Task
hingewiesen wird, der vom OnNextAsync
-Aufruf zurückgegeben wird.
Azure Queue-Streams garantieren keine FIFO-Reihenfolge, da die zugrunde liegenden Azure-Warteschlangen die Reihenfolge im Fehlerfall nicht garantieren. (Bei einer fehlerfreien Ausführung wird die FIFO-Reihenfolge garantiert.) Wenn ein Producer das Ereignis in Azure Queue erzeugt und der Vorgang in der Warteschlange fehlschlägt, liegt es im Ermessen des Producers, eine andere Warteschlange zu verwenden und sich später um mögliche doppelte Nachrichten zu kümmern. Auf der Übermittlungsseite entfernt die Orleans-Streamingruntime das Ereignis aus der Warteschlange und versucht, es zur Verarbeitung an den Consumer zu übermitteln. Die Orleans-Streamingruntime löscht das Ereignis erst nach erfolgreicher Verarbeitung aus der Warteschlange. Wenn die Übermittlung oder Verarbeitung fehlschlägt, wird das Ereignis nicht aus der Warteschlange gelöscht und später automatisch wieder in der Warteschlange angezeigt. Die Streamingruntime führt einen erneuten Übermittlungsversuch durch, wodurch die FIFO-Reihenfolge möglicherweise nicht mehr eingehalten werden kann. Das obige Verhalten entspricht der normalen Semantik von Azure Queue-Instanzen.
Anwendungsdefinierte Reihenfolge: Um die oben genannten Probleme hinsichtlich der Reihenfolge zu lösen, kann eine Anwendung die Reihenfolge optional festlegen. Dies wird über ein StreamSequenceToken erreicht, ein opakes IComparable-Objekt, das zum Ordnen von Ereignissen verwendet werden kann. Ein Producer kann ein optionales StreamSequenceToken
an den OnNext
-Aufruf übergeben. Dieses StreamSequenceToken
wird an den Consumer übergeben und gemeinsam mit dem Ereignis übermittelt. Auf diese Weise kann eine Anwendung die Reihenfolge unabhängig von der Streamingruntime bestimmen und neu festlegen.
Zurückspulbare Streams
Einige Streams können von einer Anwendung nur ab dem aktuellen Zeitpunkt abonniert werden, während andere Streams einen „Rücksprung in der Zeit“ zulassen. Letztere Fähigkeit hängt von der zugrunde liegenden Warteschlangentechnologie und dem jeweiligen Streamanbieter ab. Azure Queue erlaubt zum Beispiel nur den Verbrauch der letzten in die Warteschlange eingereihten Ereignisse, während EventHub die Wiedergabe von Ereignissen ab einem beliebigen Zeitpunkt (bis zu einer bestimmten Ablaufzeit) ermöglicht. Streams, die einen Rücksprung in der Zeit ermöglichen, werden als zurückspulbare Streams bezeichnet.
Der Consumer eines zurückspulbaren Streams kann ein StreamSequenceToken
an den SubscribeAsync
-Aufruf übergeben. Die Runtime übermittelt dann ab diesem StreamSequenceToken
Ereignisse an den Consumer. Ein NULL-Token bedeutet, dass der Consumer Ereignisse ab dem letzten Ereignis erhalten möchte.
Die Möglichkeit zum Zurückspulen eines Streams ist insbesondere in Wiederherstellungsszenarien sehr nützlich. Stellen Sie sich zum Beispiel ein Grain vor, das einen Stream abonniert und seinen Status gemeinsam mit dem letzten Sequenztoken in regelmäßigen Abständen überprüft. Bei der Wiederherstellung nach einem Fehler kann das Grain denselben Stream über das letzte Prüfpunktsequenztoken erneut abonnieren. Auf diese Weise gehen keine Ereignisse verloren, die seit dem letzten Prüfpunkt erzeugt wurden.
Der Event Hubs-Anbieter ist zurückspulbar. Sie finden den zugehörigen Code auf GitHub: Orleans/Azure/Orleans.Streaming.EventHubs. SMS- und Azure Queue-Anbieter sind nicht zurückspulbar.
Zustandslose, automatisch aufskalierte Verarbeitung
Standardmäßig ist das Orleans-Streaming darauf ausgerichtet, eine große Anzahl von relativ kleinen Streams zu unterstützen, die jeweils von einem oder mehreren zustandsbehafteten Grains verarbeitet werden. Insgesamt wird die Verarbeitung aller Streams auf eine große Anzahl regulärer (zustandsbehafteter) Grains aufgeteilt. Der Anwendungscode steuert dieses Sharding durch die Zuweisung von Stream-IDs und Grain-IDs und durch explizite Abonnements. Das Ziel ist eine zustandsbehaftete Verarbeitung mit Sharding.
Es gibt jedoch auch ein interessantes Szenario mit zustandsloser, automatisch aufskalierter Verarbeitung. In diesem Szenario verfügt eine Anwendung über eine kleine Anzahl von Streams (oder sogar nur über einen großen Stream) und strebt eine zustandslose Verarbeitung an. Ein Beispiel wäre ein globaler Ereignisstream, bei dem jedes Ereignis bei der Verarbeitung entschlüsselt und gegebenenfalls zur weiteren zustandsbehafteten Verarbeitung an andere Streams weitergeleitet wird. Die zustandslose aufskalierte Streamverarbeitung kann in Orleans über StatelessWorkerAttribute-Grains unterstützt werden.
Aktueller Status der zustandslosen, automatisch aufskalierten Verarbeitung: Diese ist noch nicht implementiert. Der Versuch, einen Stream über ein StatelessWorker
-Grain zu abonnieren, führt zu einem nicht definierten Verhalten. Wir erwägen, diese Option zu unterstützen.
Grains und Orleans-Clients
Die Funktionsweise von Orleans-Streams ist für Grains und Orleans Clients gleich. Dies bedeutet, dass innerhalb eines Grains und in einem Orleans-Client die gleichen APIs verwendet werden können, um Ereignisse zu erzeugen und zu nutzen. Dies vereinfacht die Anwendungslogik erheblich und macht spezielle clientseitige APIs, wie z. B. „GrainObserver“, überflüssig.
Vollständig verwaltetes und zuverlässiges Streaming-Pub/Sub
Zur Nachverfolgung von Streamabonnements verwendet Orleans eine Runtimekomponente namens Streaming-Pub/Sub, die als Rendezvous-Punkt für Streamconsumer und Streamproducer dient. Die Pub/Sub-Komponente verfolgt alle Streamabonnements, speichert sie dauerhaft und gleicht Streamconsumer mit Streamproducern ab.
Anwendungen können auswählen, wo und wie die Pub/Sub-Daten gespeichert werden. Die Pub/Sub-Komponente selbst ist in Form von Grains (genannt PubSubRendezvousGrain
) implementiert, die deklarative Orleans-Persistenz nutzen. PubSubRendezvousGrain
verwendet den Speicheranbieter PubSubStore
. Wie bei jedem Grain können Sie eine Implementierung für einen Speicheranbieter angeben. Für die Streaming-Pub/Sub-Komponente können Sie die Implementierung von PubSubStore
zum Zeitpunkt der Siloerstellung über „SiloHostBuilder“ ändern:
Im Folgenden wird die Pub/Sub-Komponente zur Speicherung ihres Zustands in Azure-Tabellen konfiguriert.
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
Auf diese Weise werden die Pub/Sub-Daten dauerhaft in Azure Table Storage gespeichert. Für die anfängliche Entwicklung können Sie auch Arbeitsspeicher verwenden. Zusätzlich zu Pub/Sub übermittelt die Orleans-Streamingruntime Ereignisse von Producern an Consumer, verwaltete alle Laufzeitressourcen, die aktiv genutzten Streams zugewiesen sind, und führt eine transparente Garbage Collection für Laufzeitressourcen von ungenutzten Streams durch.
Konfiguration
Zur Verwendung von Streams müssen Sie Streamanbieter über „SiloHostBuilder“ oder „ClientBuilder“ für das Clustering aktivieren. Weitere Informationen zu Streamanbietern finden Sie hier. Beispiel für die Einrichtung eines Streamanbieters:
hostBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConfigureTableServiceClient("<Secret>")))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");