共用方式為


Orleans 串流 API

應用程式會透過與已知 .NET 中的回應式延伸模組 (Rx) 非常類似的 API 和資料流互動。 主要差異在於 Orleans 資料流延伸模組是非同步的,讓處理在 Orleans 中的「分散式和可調整的計算網狀架構」中更有效率。

非同步資料流

應用程式一開始會使用資料流提供者來取得資料流的控制碼。 您可以在這裡深入了解資料流提供者,但現在您可以將其視為資料流處理站,讓實作者自訂資料流行為和語意:

IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");

應用程式可以藉由在粒紋內部呼叫 Grain.GetStreamProvider 方法,或在用戶端上呼叫 GrainClient.GetStreamProvider 方法,以取得資料流提供者的參考。

Orleans.Streams.IAsyncStream<T> 是虛擬資料流的邏輯強型別控制碼。 其在精神上類似於 Orleans 粒紋參考。 對 GetStreamProviderGetStream 的呼叫完全是本機的。 GetStream 的引數是 GUID,以及我們呼叫資料流命名空間的額外字串 (可以是 null)。 GUID 和命名空間字串會組成資料流識別 (精神上類似於 IGrainFactory.GetGrain 的引數)。 GUID 和命名空間字串的組合提供判斷資料流識別的額外彈性。 就像粒紋 7 可能存在於粒紋類型 PlayerGrain 內,而不同的粒紋 7 可能存在於粒紋類型 ChatRoomGrain 內,資料流 123 可能存在於資料流命名空間 PlayerEventsStream 中,而不同的資料流 123 可能存在於資料流命名空間 ChatRoomMessagesStream 中。

產生和取用

IAsyncStream<T> 會同時實作 IAsyncObserver<T>IAsyncObservable<T> 介面。 如此一來,應用程式就可以使用資料流,使用 Orleans.Streams.IAsyncObserver<T> 將新的事件產生至資料流,或使用 Orleans.Streams.IAsyncObservable<T> 來訂閱和取用資料流中的事件。

public interface IAsyncObserver<in T>
{
    Task OnNextAsync(T item, StreamSequenceToken token = null);
    Task OnCompletedAsync();
    Task OnErrorAsync(Exception ex);
}

public interface IAsyncObservable<T>
{
    Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}

若要將事件產生至資料流,應用程式只需呼叫

await stream.OnNextAsync<T>(event)

若要訂閱資料流,應用程式會呼叫

StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)

SubscribeAsync 的引數可以是實作 IAsyncObserver<T> 介面的物件,或處理傳入事件的 Lambda 函式組合。 SubscribeAsync 的更多選項可透過 AsyncObservableExtensions 類別取得。 SubscribeAsync 會傳回 StreamSubscriptionHandle<T>,這是不透明的控制碼,可用來取消訂閱資料流 (精神上類似於非同步版本的 IDisposable)。

await subscriptionHandle.UnsubscribeAsync()

請務必注意,訂用帳戶適用於粒紋,而非啟用。 一旦粒紋程式碼訂閱至資料流,此訂用帳戶就會超過此啟用的存留期,並永久維持耐久性,直到粒紋程式碼 (可能在不同的啟用) 明確取消訂閱為止。 這是虛擬資料流抽象概念的核心:不僅所有資料流一律存在,資料流訂用帳戶在邏輯上也是永久性的,且存留不受建立訂用帳戶的特定實體啟用影響。

多重性

Orleans 資料流可能會有多個產生者和多個取用者。 產生者所發佈的訊息將會傳遞至在發佈訊息之前訂閱資料流的所有取用者。

此外,取用者可以多次訂閱相同的資料流。 每次訂閱時,都會傳回唯一的 StreamSubscriptionHandle<T>。 如果粒紋 (或用戶端) 訂閱相同資料流的 X 次,則每個訂用帳戶都會收到相同的事件 X 次。 取用者也可以取消訂閱個別訂用帳戶。 呼叫下列項目,即可找到其所有目前的訂用帳戶:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

從失敗復原

如果資料流的產生者無作用 (或其粒紋停用),則不需要執行任何動作。 下次此粒紋想要產生更多事件時,可以再次取得資料流控制碼,並以相同的方式產生新的事件。

取用者邏輯稍微更加涉及一些。 如先前所述,一旦取用者粒紋訂閱資料流,此訂用帳戶就會有效,直到明確取消訂閱粒紋為止。 如果資料取用者無作用 (或其粒紋停用),且在資料流上產生新的事件,則取用者粒紋會自動重新啟用 (就像當訊息傳送至該資料流時自動啟用任何一般 Orleans 粒紋一樣)。 粒紋程式碼現在唯一需要執行的動作是提供 IAsyncObserver<T> 來處理資料。 取用者必須在 OnActivateAsync() 方法中重新附加處理邏輯。 若要這樣做,其可以呼叫:

StreamSubscriptionHandle<int> newHandle =
    await subscriptionHandle.ResumeAsync(IAsyncObserver);

取用者會使用其第一次訂閱「繼續處理」時取得的先前控制碼。 請注意,ResumeAsync 只會使用 IAsyncObserver 邏輯的新執行個體來更新現有的訂用帳戶,且不會變更此取用者已訂閱此資料流的事實。

取用者如何取得舊版 subscriptionHandle? 有 2 個選項。 取用者可能已保存其從原始 SubscribeAsync 作業傳回的控制碼,且現在可以使用。 或者,如果取用者沒有控制碼,則其可以要求 IAsyncStream<T> 以取得其所有作用中的訂用帳戶控制碼,方法是呼叫:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

如果想要的話,取用者現在可以繼續所有訂閱或取消訂閱。

提示

如果取用者粒紋直接實作 IAsyncObserver<T> 介面 (public class MyGrain<T> : Grain, IAsyncObserver<T>),則理論上不需要重新附加 IAsyncObserver,且因此不需要呼叫 ResumeAsync。 串流執行階段應該能夠自動判斷粒紋已經實作 IAsyncObserver,且只會叫用這些 IAsyncObserver 方法。 不過,串流執行階段目前不支援此功能,且即使粒紋直接實作 ResumeAsync,粒紋程式碼仍然需要明確呼叫 IAsyncObserver

明確和隱含訂閱

根據預設,資料流取用者必須明確訂閱資料流程。 此訂用帳戶通常會由一些外部訊息觸發,這些訊息會由指示其訂閱的粒紋 (或用戶端) 接收。 例如在聊天服務中,當使用者加入聊天室時,其粒紋會收到具有聊天名稱的 JoinChatGroup 訊息,這會導致使用者粒紋訂閱此聊天資料流。

此外,Orleans 資料流也支援隱含訂閱。 在此模型中,粒紋不會明確訂閱資料流。 此粒紋會隱含地根據其粒紋身分識別和 ImplicitStreamSubscriptionAttribute 自動訂閱。 隱含訂用帳戶的主要值是允許資料流活動以自動觸發粒紋啟用 (因此觸發訂用帳戶)。 例如,使用 SMS 資料流時,如果有一個粒紋想要產生資料流,而另一個粒紋處理此資料流,則產生者必須知道取用者粒紋的身分識別,並呼叫粒紋以告知其訂閱資料流。 只有在這之後才能開始傳送事件。 相反地,使用隱含訂用帳戶時,產生者可以直接開始產生資料流的事件,且會自動啟用取用者粒紋並訂閱資料流。 在此情況下,產生者完全不在意讀取事件的人員

粒紋實作 MyGrainType 可以宣告屬性 [ImplicitStreamSubscription("MyStreamNamespace")]。 這會告訴資料流執行階段,當事件在身分識別為 GUID XXX 和 "MyStreamNamespace" 命名空間的資料流上產生時,事件應該傳遞至身分識別為 XXX 且類型為 MyGrainType 的粒紋。 也就是說,執行階段會將資料流 <XXX, MyStreamNamespace> 對應至取用者粒紋 <XXX, MyGrainType>

ImplicitStreamSubscription 的存在會導致資料流執行階段自動訂閱此粒紋至資料流,並將資料流事件傳遞給粒紋。 不過,粒紋程式碼仍然需要告訴執行階段其想要如何處理事件。 基本上,其必須附加 IAsyncObserver。 因此,當啟用粒紋時,內部 OnActivateAsync 的粒紋程式碼需要呼叫:

IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

StreamId streamId =
    StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
    streamProvider.GetStream<T>(streamId);

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

IAsyncStream<T> stream =
    streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);

撰寫訂用帳戶邏輯

以下是如何針對各種案例撰寫訂用帳戶邏輯的指導方針:明確和隱含的訂用帳戶、可倒轉和不可倒轉的資料流。 明確和隱含訂用帳戶的主要差異在於,隱含上對於粒紋而言,每個資料流命名空間一律只有一個隱含訂用帳戶;沒有辦法建立多個訂用帳戶 (沒有訂用帳戶多重性)、沒有辦法取消訂閱,且粒紋邏輯一律只需要附加處理邏輯。 這也表示對於隱含訂閱,永遠不需要繼續訂閱。 另一方面,針對明確訂用帳戶必須繼續訂閱,否則如果粒紋再次訂閱,則會導致粒紋多次訂閱。

隱含訂閱:

對於隱含訂閱,粒紋仍然需要訂閱以附加處理邏輯。 這可以透過實 IStreamSubscriptionObserver 作 和 IAsyncObserver<T> 介面,在取用者粒紋中完成,讓粒紋與訂閱分開啟用。 若要訂閱數據流,粒紋會在其 OnSubscribed(...) 方法中建立句柄和呼叫await handle.ResumeAsync(this)

為了處理訊息,會 IAsyncObserver<T>.OnNextAsync(...) 實作 方法來接收數據流數據和序列令牌。 或者,ResumeAsync方法可能會採用一組委派,代表 介面、 onNextAsynconErrorAsynconCompletedAsync的方法IAsyncObserver<T>

public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
    _logger.LogInformation($"Received an item from the stream: {item}");
}

public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
    var handle = handleFactory.Create<string>();
    await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(
            this.GetPrimaryKey(), "MyStreamNamespace");

    await stream.SubscribeAsync(OnNextAsync);
}

明確訂用帳戶:

針對明確的訂用帳戶,粒紋必須呼叫 SubscribeAsync 以訂閱資料流。 這會建立訂用帳戶,並附加處理邏輯。 明確訂用帳戶會存在,直到粒紋取消訂閱為止,因此如果粒紋已停用並重新啟用,則粒紋仍會明確訂閱,但不會附加任何處理邏輯。 在此情況下,粒紋需要重新附加處理邏輯。 若要這樣做,在其 OnActivateAsync 中,粒紋必須先呼叫 IAsyncStream<T>.GetAllSubscriptionHandles() 來找出其擁有的訂用帳戶。 此粒紋必須在想要繼續處理的每個控制碼上執行 ResumeAsync,或在其完成的任何控制碼上執行 UnsubscribeAsync。 粒紋也可以選擇性地將 StreamSequenceToken 指定為 ResumeAsync 呼叫的引數,這會導致此明確訂用帳戶開始從該權杖取用。

public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
    var stream = streamProvider.GetStream<string>(streamId);

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    foreach (var handle in subscriptionHandles)
    {
       await handle.ResumeAsync(this);
    } 
}
public async override Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    if (!subscriptionHandles.IsNullOrEmpty())
    {
        subscriptionHandles.ForEach(
            async x => await x.ResumeAsync(OnNextAsync));
    }
}

資料流順序和序列權杖

個別產生者與個別取用者之間的事件傳遞順序取決於資料流提供者。

使用 SMS 時,產生者會藉由控制產生者發佈事件的方式,明確地控制取用者所看到的事件順序。 根據預設 (如果 SMS 提供者的 SimpleMessageStreamProviderOptions.FireAndForgetDelivery 選項設定為 false),且如果產生者等待每個 OnNextAsync 呼叫,則事件會依 FIFO 順序抵達。 在 SMS 中,由產生者決定如何處理 Task 呼叫所傳回中斷 OnNextAsync 指出的傳遞失敗。

Azure 佇列串流不保證 FIFO 順序,因為基礎 Azure 佇列並不保證失敗案例中的順序。 (這可以保證無失敗執行中的 FIFO 順序。)當產生者將事件產生至 Azure 佇列時,如果佇列作業失敗,則由產生者嘗試另一個佇列,稍後再處理潛在的重複訊息。 在傳遞端,Orleans 串流執行階段會從佇列清除事件佇列,並嘗試將其傳遞給取用者。 Orleans 串流執行階段只會在成功處理時,從佇列中刪除事件。 如果傳遞或處理失敗,則事件不會從佇列中刪除,且稍後會自動重新出現在佇列中。 串流執行階段會嘗試再次傳遞事件,因此可能會中斷 FIFO 順序。 上述行為符合 Azure 佇列的一般語意。

應用程式定義順序:若要處理上述排序問題,應用程式可以選擇性地指定其順序。 這可透過 StreamSequenceToken 來達成,這是可用來排序事件的不透明 IComparable 物件。 產生者可以將選擇性的 StreamSequenceToken 傳遞給 OnNext 呼叫。 此 StreamSequenceToken 將會傳遞給取用者,並與事件一起傳遞。 如此一來,應用程式就可以獨立於串流執行階段來推論和重新建構其順序。

可倒轉資料流

某些資料流只允許應用程式從最新時間點開始訂閱,而其他資料流則允許「時間回溯」。 後者的功能取決於基礎佇列技術和特定資料流提供者。 例如,Azure 佇列只允許取用最新的入佇列排事件,而 EventHub 允許從任意時間點重新執行事件 (最多一些到期時間)。 支援時間回溯的資料流稱為可倒轉資料流

可倒轉資料流的取用者可以將 StreamSequenceToken 傳遞至 SubscribeAsync 呼叫。 執行階段會開始從該 StreamSequenceToken 將事件傳遞至呼叫。 Null 權杖表示取用者想要從最新的時間開始接收事件。

在復原情節中,倒轉資料流程的能力非常有用。 例如,請考慮訂閱資料流的粒紋,並定期檢查其狀態與最新的序列權杖。 從失敗中復原時,粒紋可以從最新的檢查點序列權杖重新訂閱相同的資料流程,藉此復原而不會遺失自上一個檢查點之後產生的任何事件。

事件中樞提供者可倒轉。 您可以在 GitHub: Orleans/Azure/Orleans.Streaming.EventHubs 上找到其程式碼。 SMSAzure 佇列提供者「無法」倒轉。

無狀態自動相應放大處理

根據預設,Orleans 串流的目標是支援大量相對較小的資料流,每個資料流都是由一或多個具狀態的粒紋所處理。 整體上,所有資料流的處理都會在大量的一般 (具狀態) 粒紋之間分區化。 應用程式程式碼會藉由指派資料流識別碼和粒紋識別碼,以及透過明確訂閱以控制此分區化。 目標是分區化具狀態處理。

不過,也有一個有趣的情節,可自動相應放大無狀態處理。 在此情節中,應用程式有少量的資料流 (或者甚至是一個大型資料流),而目標是無狀態處理。 例如,事件的全域資料流,其中處理牽涉到解碼每個事件,且可能將其轉送至其他資料流,以進行進一步具狀態處理。 可透過 StatelessWorkerAttribute 粒紋在 Orleans 中支援無狀態相應放大串流處理。

無狀態自動相應放大處理的目前狀態:這尚未實作。 嘗試從 StatelessWorker 粒紋訂閱資料流將會導致未定義的行為。 我們考慮支援此選項

粒紋和 Orleans 用戶端

Orleans 資料流會跨粒紋與 Orleans 用戶端一致地運作。 也就是說,相同的 API 可以在粒紋和 Orleans 用戶端內使用,以產生和取用事件。 這可大幅簡化應用程式邏輯,讓特殊的用戶端 API,例如粒紋觀察者具有備援。

完全受控且可靠的串流發行/訂閱

若要追蹤資料流訂閱,Orleans 會使用名為 Streaming Pub-Sub 的執行階段元件,以作為資料流取用者和資料流產生者的交集點。 發行/訂閱會追蹤所有資料流訂用帳戶並予以保存,並與資料流產生者比對資料流取用者。

應用程式可以選擇儲存發行/訂閱資料的位置和方式。 發行/訂閱元件本身會實作為使用 Orleans 宣告式持續性 (稱為 PubSubRendezvousGrain) 的粒紋。 PubSubRendezvousGrain 會使用名為 PubSubStore 的儲存體提供者。 如同任何粒紋,您可以指定儲存體提供者的實作。 針對資料流發行/訂閱,您可以使用定址接收器主機產生器,在定址接收器建構階段變更 PubSubStore 的實作:

下列設定發行/訂閱以將其狀態儲存在 Azure 資料表中。

hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConnectionString = "<Secret>");

如此一來,發行/訂閱資料就會永久儲存在 Azure 資料表中。 針對初始開發,您也可以使用記憶體儲存體。 除了發行/訂閱之外,Orleans 資料流執行階段還會從產生者將事件傳遞給取用者、管理配置給主動使用資料流的所有執行階段資源,以及透明地記憶體回收未使用的資料流中的執行階段資源。

組態

若要使用資料流,您必須透過定址接收器主機或叢集用戶端產生器來啟用資料流提供者。 您可以在這裡深入了解串流提供者。 範例資料流提供者設定:

hostBuilder.AddMemoryStreams("StreamProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConfigureTableServiceClient("<Secret>")))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConnectionString = "<Secret>"))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConnectionString = "<Secret>");

另請參閱

Orleans 資料流提供者