Orleans 串流實作詳細資料
本節提供 Orleans 串流實作的高階概觀。 其描述應用程式層級上看不到的概念和詳細資料。 如果您只打算使用串流,就不需要閱讀本章節。
用語:
我們將「佇列」一詞描述為任何持久性的儲存體技術,這項技術可以內嵌串流事件,並允許提取事件或提供提取型機制來取用事件。 為了提供可擴縮性,這些技術通常會提供分區/分割佇列。 例如,Azure 佇列可讓您建立多個佇列,而事件中樞則具有多個中樞。
持續性串流
所有 Orleans 持續性串流提供者都會共用通用實作 PersistentStreamProvider。 這些一般串流提供者必須使用技術特定的 IQueueAdapterFactory 來加以設定。
例如,為了進行測試,我們有佇列配接器可產生其測試資料,而不是從佇列中讀取資料。 下列程式碼示範如何設定持續性串流提供者,以使用我們自訂的 (產生器) 佇列配接器。 其作法是透過用來建立配接器的中心函式來設定持續性串流提供者。
hostBuilder.AddPersistentStreams(
StreamProviderName, GeneratorAdapterFactory.Create);
當串流產生者產生新的串流項目,並呼叫 stream.OnNext()
時,Orleans 串流執行階段會在該串流提供者的 IQueueAdapter 上叫用適當方法,這會將項目直接加入適當的佇列中。
提取代理程式
持續性串流提供者的核心是提取代理程式。 提取代理程式會從一組持久性佇列中提取事件,並將其傳遞至粒紋中的應用程式程式碼以取用這些事件。 您可以將提取代理程式視為分散式「微服務」,也就是分割、高可用性的彈性分散式元件。 提取代理程式會在裝載應用程式粒紋的相同定址接收器內執行,並由 Orleans 串流執行階段完全管理。
StreamQueueMapper
和 StreamQueueBalancer
提取代理程式會使用 IStreamQueueMapper 和 IStreamQueueBalancer 進行參數化。 IStreamQueueMapper
提供所有佇列的清單,也負責將串流對應至佇列。 如此一來,持續性串流提供者的產生者端就會知道哪個佇列要加入訊息佇列中。
IStreamQueueBalancer
表示佇列在 Orleans 定址接收器和代理程式之間達成平衡的方式。 目標是以平衡的方式將佇列指派給代理程式,防止遇到瓶頸並提供彈性。 將新的定址接收器新增至 Orleans 叢集時,佇列會自動在新舊定址接收器之間重新達成平衡。 StreamQueueBalancer
允許自訂該處理程序。 Orleans 有數個內建的 StreamQueueBalancers,可支援不同平衡案例 (大量和少量佇列) 以及不同環境 (Azure、內部部署和靜態)。
請使用上述測試產生器範例,下列程式碼會示範如何設定佇列對應工具和佇列平衡器。
hostBuilder
.AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
providerConfigurator =>
providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
ob => ob.Configure(options => options.TotalQueueCount = 8))
.UseDynamicClusterConfigDeploymentBalancer());
上述程式碼會設定 GeneratorAdapterFactory 以使用具八個佇列的佇列對應工具,並使用 DynamicClusterConfigDeploymentBalancer 來平衡叢集間的佇列。
提取通訊協定
每個定址接收器都會執行一組提取代理程式,每個代理程式都會從一個佇列中提取。 提取代理程式本身是由名為 SystemTarget 的內部執行階段元件所實作。 SystemTarget 基本上是執行階段粒紋,受限於單一執行緒並行、可使用一般粒紋傳訊,而且和粒紋都屬於輕量型。 與粒紋不同的是,SystemTargets 不是虛擬的:其是由執行階段明確建立,而且不是透明的位置。 藉由將提取代理程式實作為 SystemTargets,Orleans 串流執行階段可以依賴內建的 Orleans 功能,並可調整為非常大量的佇列,因為建立新的提取代理程式與建立新粒紋一樣容易。
每個提取代理程式可透過叫用 IQueueAdapterReceiver.GetQueueMessagesAsync 方法,以執行從佇列中提取的定期計時器。 傳回的訊息會放在名為 IQueueCache 內部個別代理程式資料結構中。 系統會檢查每則訊息以找出其目的地串流。 代理程式會使用「發佈-訂閱」來找出訂閱此串流的串流取用者清單。 擷取取用者清單後,代理程式會在本機 (在其發佈-訂閱快取中) 儲存清單,因此其不需要在每則訊息上查閱「發佈-訂閱」。 代理程式也會訂閱「發佈-訂閱」,以接收任何訂閱該串流的新取用者通知。 此代理程式與發佈-訂閱之間的交握可保證強式串流訂閱語意:當取用者訂閱串流後,取用者就會看到其訂閱之後所產生的所有事件。 此外,使用 StreamSequenceToken
可讓取用者訂閱過去事件。
佇列快取
IQueueCache 是內部個別代理程式資料結構,可從佇列中分離新事件的取消佇列作業,並將其傳遞給取用者。 其也允許將傳遞資料分離至不同串流和不同取用者。
假設一個串流有 3 個串流取用者,其中 1 個串流取用者速度緩慢。 如果未留意其中狀態,此緩慢的取用者可能會影響代理程式進度,降低該串流中其他取用者的使用量,甚至使其他串流的事件取消佇列和傳遞程序變慢。 為了防止此狀況,並允許代理程式的平行處理原則上限,我們使用了 IQueueCache
。
IQueueCache
緩衝區會串流事件,並提供代理程式一個方法,讓其以自己的步調將事件傳遞給每個取用者。 每個取用者傳遞程序是由名為 IQueueCacheCursor 的內部元件所實作,其會追蹤每個取用者的進度。 如此一來,每個取用者都會以自己的步調接收事件:快速的取用者會在從佇列中清除佇列時接收事件,而緩慢的取用者則會在稍後收到事件。 訊息傳遞至所有取用者之後,即可從快取中刪除訊息。
背壓
Orleans 串流執行階段中的背壓會應用在兩方面:從佇列中將串流事件傳遞至代理程式,以及從代理程式將事件傳遞至串流取用者。
後者是由內建的 Orleans 訊息傳遞機制所提供。 每個串流事件都會透過標準 Orleans 粒紋傳訊,一次從代理程式傳遞一個串流事件給取用者。 也就是說,代理程式會將一個事件 (或一個大小有限的事件批次) 傳送給每個串流取用者,並等候此呼叫。 直到先前事件的工作已解決或中斷,下一個新事件才會開始進行傳遞。 如此一來,我們就自然將每個取用者的傳遞速率限制為一次傳遞一則訊息。
將串流事件從佇列傳遞至代理程式時,Orleans 串流會提供新的特殊背壓機制。 由於代理程式會從佇列中分離事件的取消佇列作業,並將其傳遞給取用者,因此單一緩慢的取用者可能會落後太多,以至於 IQueueCache
會填滿。 為了防止 IQueueCache
無限成長,我們會限制其大小 (可設定大小限制)。 不過,代理程式不會丟棄未傳遞的事件。
相反地,當快取開始填滿時,代理程式會降低佇列中清除佇列事件的速率。 如此一來,我們可以透過調整從佇列中取用的速率 (「背壓」),來安穩渡過緩慢的傳遞期間,並在稍後提升取用速率。 若要偵測「緩慢傳遞」的低峰,IQueueCache
會使用快取貯體的內部資料結構,其可追蹤事件傳遞至每個串流取用者的進度。 這會產生反應快速且可自我調整的系統。