Udostępnij za pośrednictwem


Orleans interfejsy API przesyłania strumieniowego

Aplikacje współdziałają ze strumieniami za pośrednictwem interfejsów API, które są bardzo podobne do dobrze znanych rozszerzeń reaktywnych (Rx) na platformie .NET. Główną różnicą jest to, że Orleans rozszerzenia strumienia są asynchroniczne, aby wydajniejsze przetwarzanie w Orleansrozproszonej i skalowalnej sieci szkieletowej obliczeniowej.

Strumień asynchroniczny

Aplikacja uruchamia się przy użyciu dostawcy strumienia, aby uzyskać dojście do strumienia. Więcej informacji na temat dostawców strumieni można znaleźć tutaj, ale na razie możesz traktować ją jako fabrykę strumieni, która umożliwia implementatorom dostosowywanie zachowania strumieni i semantyki:

IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");

Aplikacja może uzyskać odwołanie do dostawcy strumienia przez wywołanie Grain.GetStreamProvider metody wewnątrz ziarna lub wywołanie GrainClient.GetStreamProvider metody podczas pracy na kliencie.

Orleans.Streams.IAsyncStream<T> jest logicznym, silnie typiowanym dojściem do strumienia wirtualnego. Jest podobny w duchu do Orleans odwołania do ziarna. Wywołania do GetStreamProvider i GetStream są wyłącznie lokalne. Argumenty, które GetStream mają być identyfikatorem GUID i dodatkowym ciągiem, który wywołujemy przestrzeń nazw strumienia (która może mieć wartość null). Razem identyfikator GUID i ciąg przestrzeni nazw składają się z tożsamości strumienia (podobnie jak w duchu do argumentów ).IGrainFactory.GetGrain Kombinacja identyfikatora GUID i ciągu przestrzeni nazw zapewnia dodatkową elastyczność w określaniu tożsamości strumienia. Podobnie jak ziarno 7 może istnieć w obrębie typu PlayerGrain ziarno, a inne ziarno 7 może istnieć w obrębie typu ChatRoomGrainziarna , strumień 123 może istnieć z przestrzenią PlayerEventsStream nazw strumienia, a inny strumień 123 może istnieć w przestrzeni nazw ChatRoomMessagesStreamstrumienia .

Tworzenie i używanie

IAsyncStream<T> implementuje zarówno interfejsy , jak IAsyncObserver<T> i IAsyncObservable<T> . Dzięki temu aplikacja może używać strumienia do tworzenia nowych zdarzeń do strumienia przy użyciu polecenia Orleans.Streams.IAsyncObserver<T> lub subskrybowania i używania zdarzeń ze strumienia przy użyciu polecenia Orleans.Streams.IAsyncObservable<T>.

public interface IAsyncObserver<in T>
{
    Task OnNextAsync(T item, StreamSequenceToken token = null);
    Task OnCompletedAsync();
    Task OnErrorAsync(Exception ex);
}

public interface IAsyncObservable<T>
{
    Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}

Aby wygenerować zdarzenia do strumienia, aplikacja po prostu wywołuje

await stream.OnNextAsync<T>(event)

Aby zasubskrybować strumień, aplikacja wywołuje

StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)

Argumentem SubscribeAsync do może być obiekt, który implementuje IAsyncObserver<T> interfejs lub kombinację funkcji lambda do przetwarzania zdarzeń przychodzących. Więcej opcji jest SubscribeAsync dostępnych za pośrednictwem AsyncObservableExtensions klasy. SubscribeAsync Zwraca element StreamSubscriptionHandle<T>, który jest nieprzezroczystym uchwytem, który może służyć do anulowania subskrypcji strumienia (podobnie jak w przypadku asynchronicznej wersji IDisposableprogramu ).

await subscriptionHandle.UnsubscribeAsync()

Należy pamiętać, że subskrypcja dotyczy ziarna, a nie aktywacji. Gdy kod ziarna zostanie zasubskrybowany do strumienia, ta subskrypcja przekroczy żywotność tej aktywacji i pozostanie trwała na zawsze, dopóki kod ziarna (potencjalnie w innej aktywacji) jawnie anuluje subskrypcję. Jest to serce abstrakcji strumienia wirtualnego: nie tylko wszystkie strumienie zawsze istnieją, logicznie, ale także subskrypcja strumienia jest trwała i istnieje poza określoną aktywacją fizyczną, która utworzyła subskrypcję.

Kardynalność

Strumień Orleans może mieć wielu producentów i wielu użytkowników. Wiadomość opublikowana przez producenta zostanie dostarczona do wszystkich odbiorców, którzy zostali zasubskrybowane do strumienia przed opublikowaniem wiadomości.

Ponadto użytkownik może wielokrotnie subskrybować ten sam strumień. Za każdym razem, gdy subskrybuje, zostanie przywrócony unikatowy element StreamSubscriptionHandle<T>. Jeśli ziarno (lub klient) jest subskrybowane X razy do tego samego strumienia, otrzyma to samo zdarzenie X razy, raz dla każdej subskrypcji. Użytkownik może również anulować subskrypcję indywidualną. Wszystkie bieżące subskrypcje można znaleźć, wywołując następujące wywołania:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Odzyskiwanie po awariach

Jeśli producent strumienia umiera (lub jego ziarno jest dezaktywowane), nic nie musi zrobić. Następnym razem, gdy to ziarno chce wygenerować więcej zdarzeń, może ponownie uzyskać dojście strumienia i utworzyć nowe zdarzenia w taki sam sposób.

Logika konsumentów jest nieco bardziej zaangażowana. Jak mówiliśmy wcześniej, gdy ziarno konsumenta jest subskrybowane do strumienia, ta subskrypcja jest prawidłowa, dopóki ziarno jawnie anuluje subskrypcję. Jeśli odbiorca strumienia umiera (lub jego ziarno jest dezaktywowane) i nowe zdarzenie jest generowane na strumieniu, ziarno konsumenta zostanie automatycznie ponownie aktywowane (podobnie jak każde zwykłe Orleans ziarno jest automatycznie aktywowane po wysłaniu do niego komunikatu). Jedyną rzeczą, jaką musi teraz wykonać kod ziarna, jest dostarczenie elementu IAsyncObserver<T> do przetwarzania danych. Użytkownik musi ponownie dołączyć logikę przetwarzania w ramach OnActivateAsync() metody . Aby to zrobić, może wywołać następujące wywołanie:

StreamSubscriptionHandle<int> newHandle =
    await subscriptionHandle.ResumeAsync(IAsyncObserver);

Użytkownik używa poprzedniego dojścia, który dostał, gdy po raz pierwszy zasubskrybował "wznowienie przetwarzania". Zwróć uwagę, że ResumeAsync wystarczy zaktualizować istniejącą subskrypcję przy użyciu nowego wystąpienia IAsyncObserver logiki i nie zmienia faktu, że ten użytkownik jest już subskrybowany do tego strumienia.

Jak konsument ma starą?subscriptionHandle Dostępne są 2 opcje. Użytkownik mógł utrwał uchwyt, który został przekazany z powrotem z oryginalnej SubscribeAsync operacji i może go teraz użyć. Alternatywnie, jeśli konsument nie ma uchwytu, może poprosić o IAsyncStream<T> wszystkie aktywne dojścia subskrypcji, wywołując następujące wywołanie:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Użytkownik może teraz wznowić wszystkie z nich lub anulować subskrypcję niektórych, jeśli chce.

Napiwek

Jeśli ziarno konsumenta implementuje IAsyncObserver<T> interfejs bezpośrednio (public class MyGrain<T> : Grain, IAsyncObserver<T>), teoretycznie nie powinno być wymagane do ponownego IAsyncObserver dołączenia elementu i w ten sposób nie będzie konieczne wywołanie metody ResumeAsync. Środowisko uruchomieniowe przesyłania strumieniowego powinno być w stanie automatycznie ustalić, że ziarno już implementuje IAsyncObserver i po prostu wywoła te IAsyncObserver metody. Jednak środowisko uruchomieniowe przesyłania strumieniowego obecnie tego nie obsługuje, a kod ziarna nadal musi jawnie wywołać ResumeAsyncmetodę , nawet jeśli ziarno implementuje IAsyncObserver się bezpośrednio.

Jawne i niejawne subskrypcje

Domyślnie odbiorca strumienia musi jawnie subskrybować strumień. Ta subskrypcja jest zwykle wyzwalana przez jakiś komunikat zewnętrzny odbierający ziarno (lub klient), który instruuje go do subskrybowania. Na przykład w usłudze czatu, gdy użytkownik dołącza do pokoju rozmów, jego ziarno otrzymuje JoinChatGroup wiadomość z nazwą czatu, co spowoduje, że ziarno użytkownika zasubskrybuje ten strumień czatu.

Ponadto Orleans strumienie obsługują również niejawne subskrypcje. W tym modelu ziarno nie subskrybuje jawnie strumienia. To ziarno jest subskrybowane automatycznie, niejawnie, tylko na podstawie jego tożsamości ziarna i ImplicitStreamSubscriptionAttribute. Główna wartość niejawnych subskrypcji umożliwia automatyczne wyzwolenie aktywacji ziarna przez działanie strumienia (w związku z tym wyzwolenie subskrypcji). Na przykład przy użyciu strumieni SMS, jeśli jedno ziarno chciało utworzyć strumień, a inny proces ziarna tego strumienia, producent będzie musiał znać tożsamość ziarna konsumenta i wywołać ziarno, mówiąc mu, aby zasubskrybował strumień. Dopiero po tym może rozpocząć wysyłanie zdarzeń. Zamiast tego, korzystając z niejawnych subskrypcji, producent może po prostu rozpocząć produkcję zdarzeń do strumienia, a ziarno konsumenta zostanie automatycznie aktywowane i subskrybowane do strumienia. W takim przypadku producent w ogóle nie obchodzi, kto czyta wydarzenia

Implementacja MyGrainType ziarna może zadeklarować atrybut [ImplicitStreamSubscription("MyStreamNamespace")]. Informuje to środowisko uruchomieniowe przesyłania strumieniowego, że gdy zdarzenie jest generowane w strumieniu, którego tożsamość jest identyfikatorem GUID XXX i "MyStreamNamespace" przestrzenią nazw, powinna zostać dostarczona do ziarna, którego tożsamość to XXX typu MyGrainType. Oznacza to, że środowisko uruchomieniowe mapuje strumień <XXX, MyStreamNamespace> na ziarno <XXX, MyGrainType>konsumenta .

Obecność ImplicitStreamSubscriptionpowoduje, że środowisko uruchomieniowe przesyłania strumieniowego automatycznie subskrybuje to ziarno do strumienia i dostarcza do niego zdarzenia strumienia. Jednak kod ziarna nadal musi poinformować środowisko uruchomieniowe o sposobie przetwarzania zdarzeń. Zasadniczo musi dołączyć element IAsyncObserver. W związku z tym po aktywowaniu ziarna kod ziarna wewnątrz OnActivateAsync musi wywołać:

IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

StreamId streamId =
    StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
    streamProvider.GetStream<T>(streamId);

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

IAsyncStream<T> stream =
    streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);

Pisanie logiki subskrypcji

Poniżej przedstawiono wskazówki dotyczące sposobu pisania logiki subskrypcji w różnych przypadkach: jawne i niejawne subskrypcje, strumienie z możliwością przewijania i niewzwiązane z przewijaniem. Główną różnicą między jawnymi i niejawnymi subskrypcjami jest to, że w przypadku niejawnego ziarna zawsze ma dokładnie jedną niejawną subskrypcję dla każdej przestrzeni nazw strumienia; Nie ma możliwości utworzenia wielu subskrypcji (nie ma wielu subskrypcji), nie ma możliwości anulowania subskrypcji, a logika ziarna zawsze musi dołączyć logikę przetwarzania. Oznacza to również, że w przypadku niejawnych subskrypcji nigdy nie trzeba wznawiać subskrypcji. Z drugiej strony w przypadku jawnych subskrypcji należy wznowić subskrypcję, w przeciwnym razie, jeśli ziarno ponownie subskrybuje, spowoduje to wielokrotne subskrybowanie ziarna.

Niejawne subskrypcje:

W przypadku niejawnych subskrypcji ziarno nadal musi subskrybować, aby dołączyć logikę przetwarzania. Można to zrobić w ziarnie konsumenta, implementując IStreamSubscriptionObserver interfejsy i IAsyncObserver<T> , co pozwala na aktywację ziarna oddzielnie od subskrybowania. Aby zasubskrybować strumień, ziarno tworzy uchwyt i wywołuje await handle.ResumeAsync(this) metodę OnSubscribed(...) .

Aby przetwarzać komunikaty, IAsyncObserver<T>.OnNextAsync(...) metoda jest implementowana w celu odbierania danych strumienia i tokenu sekwencji. Alternatywnie ResumeAsync metoda może przyjąć zestaw delegatów reprezentujących metody interfejsu IAsyncObserver<T> , onNextAsync, onErrorAsynci onCompletedAsync.

public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
    _logger.LogInformation($"Received an item from the stream: {item}");
}

public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
    var handle = handleFactory.Create<string>();
    await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(
            this.GetPrimaryKey(), "MyStreamNamespace");

    await stream.SubscribeAsync(OnNextAsync);
}

Jawne subskrypcje:

W przypadku jawnych subskrypcji ziarno musi wywoływać SubscribeAsync , aby subskrybować strumień. Spowoduje to utworzenie subskrypcji, a także dołączenie logiki przetwarzania. Jawna subskrypcja będzie istnieć do momentu anulowania subskrypcji ziarna, więc jeśli ziarno zostanie zdezaktywowane i ponownie aktywowane, ziarno jest nadal jawnie subskrybowane, ale żadna logika przetwarzania nie zostanie dołączona. W takim przypadku ziarno musi ponownie dołączyć logikę przetwarzania. Aby to zrobić, w jego OnActivateAsyncpliku ziarno najpierw musi dowiedzieć się, jakie subskrypcje ma, wywołując polecenie IAsyncStream<T>.GetAllSubscriptionHandles(). Ziarno musi być wykonywane ResumeAsync na każdym dojściu, które chce kontynuować przetwarzanie lub anulować subskrypcję na wszystkich dojściach, za pomocą których jest wykonywana. Ziarno może również opcjonalnie określić StreamSequenceToken jako argument ResumeAsync wywołań, co spowoduje, że ta jawna subskrypcja zacznie korzystać z tego tokenu.

public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
    var stream = streamProvider.GetStream<string>(streamId);

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    foreach (var handle in subscriptionHandles)
    {
       await handle.ResumeAsync(this);
    } 
}
public async override Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    if (!subscriptionHandles.IsNullOrEmpty())
    {
        subscriptionHandles.ForEach(
            async x => await x.ResumeAsync(OnNextAsync));
    }
}

Przesyłanie strumieniowe tokenów kolejności i sekwencji

Kolejność dostarczania zdarzeń między indywidualnym producentem a indywidualnym odbiorcą zależy od dostawcy strumienia.

Za pomocą programu SMS producent jawnie kontroluje kolejność zdarzeń obserwowanych przez konsumenta, kontrolując sposób ich publikowania przez producenta. Domyślnie (jeśli SimpleMessageStreamProviderOptions.FireAndForgetDelivery opcja dostawcy programu SMS jest ustawiona na wartość false) i jeśli producent czeka na każde OnNextAsync wywołanie, zdarzenia docierają do zamówienia FIFO. W wiadomości SMS zależy od producenta, aby zdecydować, jak obsługiwać błędy dostarczania, które będą wskazywane przez uszkodzenie Task zwrócone przez OnNextAsync połączenie.

Strumienie usługi Azure Queue nie gwarantują kolejności FIFO, ponieważ bazowe kolejki platformy Azure nie gwarantują kolejności w przypadkach awarii. (Gwarantują one kolejność FIFO w wykonaniach wolnych od awarii). Gdy producent generuje zdarzenie w usłudze Azure Queue, jeśli operacja kolejki zakończy się niepowodzeniem, producent będzie próbował podjąć kolejną kolejkę, a później poradzić sobie z potencjalnymi zduplikowanymi komunikatami. Po stronie Orleans dostarczania środowisko uruchomieniowe przesyłania strumieniowego usuwa zdarzenie z kolejki i próbuje dostarczyć je do przetwarzania użytkownikom. Środowisko Orleans uruchomieniowe przesyłania strumieniowego usuwa zdarzenie z kolejki tylko po pomyślnym przetworzeniu. Jeśli dostarczanie lub przetwarzanie zakończy się niepowodzeniem, zdarzenie nie zostanie usunięte z kolejki i zostanie automatycznie ponownie wyświetlone w kolejce później. Środowisko uruchomieniowe przesyłania strumieniowego spróbuje dostarczyć go ponownie, co może spowodować przerwanie zamówienia FIFO. Powyższe zachowanie jest zgodne z normalną semantyczną kolejek platformy Azure.

Kolejność zdefiniowana przez aplikację: aby rozwiązać powyższe problemy z zamawianiem, aplikacja może opcjonalnie określić jego kolejność. Jest to osiągane za pośrednictwem StreamSequenceTokenobiektu , który jest nieprzezroczystym IComparable obiektem, który może służyć do zamawiania zdarzeń. Producent może przekazać opcjonalny element StreamSequenceToken do wywołania OnNext . Zostanie to StreamSequenceToken przekazane użytkownikowi i zostanie dostarczone wraz ze zdarzeniem. Dzięki temu aplikacja może rozumować i rekonstruować kolejność niezależnie od środowiska uruchomieniowego przesyłania strumieniowego.

Strumienie z możliwością przewijania

Niektóre strumienie umożliwiają aplikacji subskrybowanie ich tylko od ostatniego punktu w czasie, podczas gdy inne strumienie umożliwiają "powrót w czasie". Ta ostatnia funkcja jest zależna od podstawowej technologii kolejkowania i konkretnego dostawcy strumienia. Na przykład usługa Azure Queues zezwala tylko na korzystanie z najnowszych zdarzeń w kolejce, podczas gdy usługa EventHub zezwala na ponowne odtwarzanie z dowolnego punktu w czasie (do pewnego czasu wygaśnięcia). Strumienie, które obsługują powrót w czasie, są nazywane strumieniami z możliwością przewijania.

Użytkownik strumienia z możliwością przewijania może przekazać StreamSequenceToken element do wywołania SubscribeAsync . Środowisko uruchomieniowe dostarczy do niego zdarzenia rozpoczynające się od tego StreamSequenceToken. Token o wartości null oznacza, że odbiorca chce odbierać zdarzenia rozpoczynające się od najnowszej.

Możliwość przewijania strumienia jest bardzo przydatna w scenariuszach odzyskiwania. Rozważmy na przykład ziarno, które subskrybuje strumień i okresowo punktów kontrolnych jego stan wraz z najnowszym tokenem sekwencji. Podczas odzyskiwania po awarii ziarno może ponownie zasubskrybować ten sam strumień z najnowszego tokenu sekwencji punktów kontrolnych, co spowoduje odzyskanie bez utraty zdarzeń, które zostały wygenerowane od ostatniego punktu kontrolnego.

Dostawca usługi Event Hubs można przewijać. Kod można znaleźć w witrynie GitHub: Orleans/Azure/Orleans. Streaming.EventHubs. Dostawcy wiadomości SMS i kolejki platformy Azure nie mogą się przewijać.

Przetwarzanie bezstanowe automatycznie skalowane w poziomie

Domyślnie Orleans przesyłanie strumieniowe jest przeznaczone do obsługi dużej liczby stosunkowo małych strumieni, z których każdy jest przetwarzany przez co najmniej jedno ziarno stanowe. Łącznie przetwarzanie wszystkich strumieni razem jest podzielone na fragmenty wśród dużej liczby zwykłych (stanowych) ziarna. Kod aplikacji steruje tym fragmentowaniem, przypisując identyfikatory strumienia i identyfikatory ziarna oraz jawnie subskrybując. Celem jest przetwarzanie stanowe podzielone na fragmenty.

Jednak istnieje również interesujący scenariusz automatycznego skalowania bezstanowego przetwarzania bezstanowego. W tym scenariuszu aplikacja ma niewielką liczbę strumieni (a nawet jednego dużego strumienia), a celem jest przetwarzanie bezstanowe. Przykładem jest globalny strumień zdarzeń, w którym przetwarzanie obejmuje dekodowanie każdego zdarzenia i potencjalnie przekazywanie go do innych strumieni w celu dalszego przetwarzania stanowego. Przetwarzanie strumieni skalowane w poziomie bezstanowe może być obsługiwane Orleans za pośrednictwem StatelessWorkerAttribute ziarna.

Bieżący stan bezstanowego automatycznego skalowania w poziomie przetwarzania: nie jest to jeszcze zaimplementowane. Próba zasubskrybowania strumienia z StatelessWorker ziarna spowoduje niezdefiniowane zachowanie. Rozważamy obsługę tej opcji.

Ziarna i Orleans klienci

Orleans strumienie działają równomiernie między ziarnami i Orleans klientami. Oznacza to, że te same interfejsy API mogą być używane wewnątrz ziarna i w Orleans kliencie do tworzenia i korzystania z zdarzeń. Znacznie upraszcza to logikę aplikacji, dzięki czemu specjalne interfejsy API po stronie klienta, takie jak Obserwatorzy ziarna, są nadmiarowe.

W pełni zarządzane i niezawodne przesyłanie strumieniowe pub-sub

Aby śledzić subskrypcje strumienia, Orleans używa składnika środowiska uruchomieniowego o nazwie Streaming Pub-Sub , który służy jako punkt spotkania dla odbiorców strumienia i producentów strumieni. Pub-sub śledzi wszystkie subskrypcje strumienia i utrwala je, a także dopasowuje odbiorców strumienia do producentów strumieni.

Aplikacje mogą wybierać lokalizację i sposób przechowywania danych pub-sub. Sam składnik Pub-Sub jest implementowany jako ziarna (o nazwie PubSubRendezvousGrain), które używają Orleans trwałości deklaratywnej. PubSubRendezvousGrain używa dostawcy magazynu o nazwie PubSubStore. Podobnie jak w przypadku dowolnego ziarna, można wyznaczyć implementację dla dostawcy magazynu. W przypadku usługi Streaming Pub-Sub można zmienić implementację budowy silosu PubSubStore przy użyciu konstruktora hosta silosu:

Poniżej przedstawiono konfigurację pub-sub w celu przechowywania stanu w tabelach platformy Azure.

hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConnectionString = "<Secret>");

Dzięki temu dane pub-sub będą trwale przechowywane w tabeli platformy Azure. Na potrzeby początkowego programowania można również użyć magazynu pamięci. Oprócz pub-sub środowisko Orleans uruchomieniowe przesyłania strumieniowego dostarcza zdarzenia od producentów do konsumentów, zarządza wszystkimi zasobami środowiska uruchomieniowego przydzielonymi do aktywnie używanych strumieni i przezroczystie zbiera zasoby środowiska uruchomieniowego z nieużywanych strumieni.

Konfigurowanie

Aby używać strumieni, należy włączyć dostawców strumieni za pośrednictwem hosta silosu lub konstruktorów klientów klastra. Więcej informacji o dostawcach strumieni można znaleźć tutaj. Przykładowa konfiguracja dostawcy strumienia:

hostBuilder.AddMemoryStreams("StreamProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConfigureTableServiceClient("<Secret>")))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConnectionString = "<Secret>"))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConnectionString = "<Secret>");

Zobacz też

Orleans Dostawcy usługi Stream