共用方式為


Orleans 串流實作詳細資料

本節提供 Orleans 串流實作的高階概觀。 其描述應用程式層級上看不到的概念和詳細資料。 如果您只打算使用串流,就不需要閱讀本章節。

用語

我們將「佇列」一詞描述為任何持久性的儲存體技術,這項技術可以內嵌串流事件,並允許提取事件或提供提取型機制來取用事件。 為了提供可擴縮性,這些技術通常會提供分區/分割佇列。 例如,Azure 佇列可讓您建立多個佇列,而事件中樞則具有多個中樞。

持續性串流

所有 Orleans 持續性串流提供者都會共用通用實作 PersistentStreamProvider。 這些一般串流提供者必須使用技術特定的 IQueueAdapterFactory 來加以設定。

例如,為了進行測試,我們有佇列配接器可產生其測試資料,而不是從佇列中讀取資料。 下列程式碼示範如何設定持續性串流提供者,以使用我們自訂的 (產生器) 佇列配接器。 其作法是透過用來建立配接器的中心函式來設定持續性串流提供者。

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

當串流產生者產生新的串流項目,並呼叫 stream.OnNext() 時,Orleans 串流執行階段會在該串流提供者的 IQueueAdapter 上叫用適當方法,這會將項目直接加入適當的佇列中。

提取代理程式

持續性串流提供者的核心是提取代理程式。 提取代理程式會從一組持久性佇列中提取事件,並將其傳遞至粒紋中的應用程式程式碼以取用這些事件。 您可以將提取代理程式視為分散式「微服務」,也就是分割、高可用性的彈性分散式元件。 提取代理程式會在裝載應用程式粒紋的相同定址接收器內執行,並由 Orleans 串流執行階段完全管理。

StreamQueueMapperStreamQueueBalancer

提取代理程式會使用 IStreamQueueMapperIStreamQueueBalancer 進行參數化。 IStreamQueueMapper 提供所有佇列的清單,也負責將串流對應至佇列。 如此一來,持續性串流提供者的產生者端就會知道哪個佇列要加入訊息佇列中。

IStreamQueueBalancer 表示佇列在 Orleans 定址接收器和代理程式之間達成平衡的方式。 目標是以平衡的方式將佇列指派給代理程式,防止遇到瓶頸並提供彈性。 將新的定址接收器新增至 Orleans 叢集時,佇列會自動在新舊定址接收器之間重新達成平衡。 StreamQueueBalancer 允許自訂該處理程序。 Orleans 有數個內建的 StreamQueueBalancers,可支援不同平衡案例 (大量和少量佇列) 以及不同環境 (Azure、內部部署和靜態)。

請使用上述測試產生器範例,下列程式碼會示範如何設定佇列對應工具和佇列平衡器。

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

上述程式碼會設定 GeneratorAdapterFactory 以使用具八個佇列的佇列對應工具,並使用 DynamicClusterConfigDeploymentBalancer 來平衡叢集間的佇列。

提取通訊協定

每個定址接收器都會執行一組提取代理程式,每個代理程式都會從一個佇列中提取。 提取代理程式本身是由名為 SystemTarget 的內部執行階段元件所實作。 SystemTarget 基本上是執行階段粒紋,受限於單一執行緒並行、可使用一般粒紋傳訊,而且和粒紋都屬於輕量型。 與粒紋不同的是,SystemTargets 不是虛擬的:其是由執行階段明確建立,而且不是透明的位置。 藉由將提取代理程式實作為 SystemTargets,Orleans 串流執行階段可以依賴內建的 Orleans 功能,並可調整為非常大量的佇列,因為建立新的提取代理程式與建立新粒紋一樣容易。

每個提取代理程式可透過叫用 IQueueAdapterReceiver.GetQueueMessagesAsync 方法,以執行從佇列中提取的定期計時器。 傳回的訊息會放在名為 IQueueCache 內部個別代理程式資料結構中。 系統會檢查每則訊息以找出其目的地串流。 代理程式會使用「發佈-訂閱」來找出訂閱此串流的串流取用者清單。 擷取取用者清單後,代理程式會在本機 (在其發佈-訂閱快取中) 儲存清單,因此其不需要在每則訊息上查閱「發佈-訂閱」。 代理程式也會訂閱「發佈-訂閱」,以接收任何訂閱該串流的新取用者通知。 此代理程式與發佈-訂閱之間的交握可保證強式串流訂閱語意:當取用者訂閱串流後,取用者就會看到其訂閱之後所產生的所有事件。 此外,使用 StreamSequenceToken 可讓取用者訂閱過去事件。

佇列快取

IQueueCache 是內部個別代理程式資料結構,可從佇列中分離新事件的取消佇列作業,並將其傳遞給取用者。 其也允許將傳遞資料分離至不同串流和不同取用者。

假設一個串流有 3 個串流取用者,其中 1 個串流取用者速度緩慢。 如果未留意其中狀態,此緩慢的取用者可能會影響代理程式進度,降低該串流中其他取用者的使用量,甚至使其他串流的事件取消佇列和傳遞程序變慢。 為了防止此狀況,並允許代理程式的平行處理原則上限,我們使用了 IQueueCache

IQueueCache 緩衝區會串流事件,並提供代理程式一個方法,讓其以自己的步調將事件傳遞給每個取用者。 每個取用者傳遞程序是由名為 IQueueCacheCursor 的內部元件所實作,其會追蹤每個取用者的進度。 如此一來,每個取用者都會以自己的步調接收事件:快速的取用者會在從佇列中清除佇列時接收事件,而緩慢的取用者則會在稍後收到事件。 訊息傳遞至所有取用者之後,即可從快取中刪除訊息。

背壓

Orleans 串流執行階段中的背壓會應用在兩方面:從佇列中將串流事件傳遞至代理程式,以及從代理程式將事件傳遞至串流取用者

後者是由內建的 Orleans 訊息傳遞機制所提供。 每個串流事件都會透過標準 Orleans 粒紋傳訊,一次從代理程式傳遞一個串流事件給取用者。 也就是說,代理程式會將一個事件 (或一個大小有限的事件批次) 傳送給每個串流取用者,並等候此呼叫。 直到先前事件的工作已解決或中斷,下一個新事件才會開始進行傳遞。 如此一來,我們就自然將每個取用者的傳遞速率限制為一次傳遞一則訊息。

將串流事件從佇列傳遞至代理程式時,Orleans 串流會提供新的特殊背壓機制。 由於代理程式會從佇列中分離事件的取消佇列作業,並將其傳遞給取用者,因此單一緩慢的取用者可能會落後太多,以至於 IQueueCache 會填滿。 為了防止 IQueueCache 無限成長,我們會限制其大小 (可設定大小限制)。 不過,代理程式不會丟棄未傳遞的事件。

相反地,當快取開始填滿時,代理程式會降低佇列中清除佇列事件的速率。 如此一來,我們可以透過調整從佇列中取用的速率 (「背壓」),來安穩渡過緩慢的傳遞期間,並在稍後提升取用速率。 若要偵測「緩慢傳遞」的低峰,IQueueCache 會使用快取貯體的內部資料結構,其可追蹤事件傳遞至每個串流取用者的進度。 這會產生反應快速且可自我調整的系統。