次の方法で共有


Orleans のストリームの実装の詳細

このセクションでは、Orleans Stream の実装の概要について説明します。 アプリケーション レベルでは表示されない概念と詳細について説明します。 ストリームのみを使用する予定の場合は、このセクションを読む必要はありません。

用語:

"queue" という単語は、ストリーム イベントを取り込み、イベントをプルしたり、イベントを実行するプッシュベースのメカニズムを提供したりする永続的なストレージ テクノロジを指します。 通常、スケーラビリティを提供するために、これらのテクノロジではシャード化/パーティション分割されたキューが指定されます。 たとえば、Azure Queues では複数のキューを作成でき、Event Hubs には複数のハブがあります。

永続的なストリーム

すべての Orleans 永続ストリーム プロバイダーでは、共通の実装 PersistentStreamProvider が共有されます。 これらの汎用ストリーム プロバイダーは、テクノロジ固有の IQueueAdapterFactory を使用して構成する必要があります。

たとえば、テスト目的の場合、キューからデータを読み取るのではなく、テスト データを生成するキュー アダプターがあります。 以下のコードは、カスタム (ジェネレーター) キュー アダプターを使用するように永続ストリーム プロバイダーを構成する方法を示しています。 これは、アダプターの作成に使用される Factory 関数を使用して永続ストリーム プロバイダーを構成することで行われます。

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

ストリーム プロデューサーで新しいストリーム項目を生成し、stream.OnNext() を呼び出すと、Orleans ストリーミング ランタイムは、そのストリーム プロバイダーの IQueueAdapter で適切なメソッドを呼び出して、項目を適切なキューに直接エンキューします。

プル エージェント

永続ストリーム プロバイダーの中心にプル エージェントがあります。 エージェントをプルすると、一連の永続キューからイベントがプルされ、それらを使用するグレインでアプリケーション コードに配信されます。 プル エージェントは、パーティション分割された高可用性のエラスティックな分散コンポーネントである分散 "マイクロサービス" と考えることができます。 プル エージェントは、アプリケーション グレインをホストし、Orleans Streaming Runtime によって完全に管理されているのと同じサイロ内で実行されます。

StreamQueueMapper および StreamQueueBalancer

プル エージェントは、IStreamQueueMapperIStreamQueueBalancer を使用してパラメーター化されます。 IStreamQueueMapper には、すべてのキューの一覧があり、ストリームをキューにマッピングする役割も担います。 これにより、永続ストリーム プロバイダーのプロデューサー側は、メッセージをエンキューするキューを認識します。

IStreamQueueBalancer は、Orleans サイロとエージェント間でキューのバランスを取る方法を表します。 目標は、ボトルネックを防ぎ、弾力性をサポートするために、バランスの取れた方法でエージェントにキューを割り当てることです。 新しいサイロが Orleans クラスターに追加されると、キューは古いサイロと新しいサイロの間で自動的に再調整されます。 StreamQueueBalancer では、そのプロセスをカスタマイズできます。 Orleans には、さまざまな分散シナリオ (多数のキューと少数のキュー) と異なる環境 (Azure、オンプレミス、静的) をサポートするために、複数の StreamQueueBalancers が組み込まれています。

上記のテスト ジェネレーターの例を使用して、以下のコードは、キュー マッパーとキュー バランサーを構成する方法を示しています。

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

上記のコードでは、GeneratorAdapterFactory を構成して 8 つのキューを持つキュー マッパーを使用し、DynamicClusterConfigDeploymentBalancer を使用してクラスター全体でキューのバランスを取ります。

プル プロトコル

すべてのサイロで一連のプル エージェントが実行され、すべてのエージェントが 1 つのキューからプルされます。 プル エージェント自体は、SystemTarget と呼ばれる内部ランタイム コンポーネントで実装されます。 SystemTargets は基本的にランタイム グレインであり、シングルスレッド コンカレンシーの対象であり、通常のグレイン メッセージングを使用でき、グレインと同じくらい軽量です。 グレインとは対照的に、SystemTargets は仮想ではなく、(ランタイムによって) 明示的に作成され、場所は透過的ではありません。 プル エージェントを SystemTargets として実装することで、Orleans Streaming Runtime は組み込みの Orleans 機能に依存でき、新しいプル エージェントの作成は新しいグレインの作成と同じくらい安価であるため、非常に多くのキューにスケーリングできます。

すべてのプル エージェントは、IQueueAdapterReceiver.GetQueueMessagesAsync メソッドを呼び出してキューからプルする定期的なタイマーを実行します。 返されたメッセージは、IQueueCache という名前のエージェントごとの内部データ構造に格納されます。 すべてのメッセージが検査され、宛先ストリームが見つかります。 エージェントは、Pub-Sub を使用して、このストリームにサブスクライブしたストリーム コンシューマーの一覧を確認します。 コンシューマー リストを取得すると、エージェントはそれをローカル (Pub-Sub キャッシュ) に格納するため、すべてのメッセージで Pub-Sub を参照する必要はありません。 また、エージェントは Pub-Sub をサブスクライブして、そのストリームをサブスクライブする新しいコンシューマーの通知を受け取ります。 エージェントと Pub-Sub の間のこのハンドシェイクにより、強力なストリーミング サブスクリプション セマンティクスが保証されます。コンシューマーがストリームをサブスクライブすると、サブスクライブ後に生成されたすべてのイベントが表示されます。 さらに、StreamSequenceToken を使用することで、過去のバージョンにサブスクライブできます。

キュー キャッシュ

IQueueCache は、キューから新しいイベントのデキューを分離し、コンシューマーに配信できるようにする、エージェントごとの内部データ構造です。 また、異なるストリームと異なるコンシューマーへの配信を分離することもできます。

1 つのストリームに 3 つのストリーム コンシューマーがあり、そのうちの 1 つが遅い状況を想像してみてください。 注意が必要でない場合、この低速コンシューマーはエージェントの進行状況に影響を与え、そのストリームの他のコンシューマーの消費に時間がかかり、他のストリームのイベントのでキューと配信が遅くなる可能性があります。 これを防ぎ、エージェントで最大限の並列処理を可能にするには、IQueueCache を使用します。

IQueueCache はストリーム イベントをバッファーし、エージェントが各コンシューマーに自身のペースでイベントを配信する方法を提供します。 コンシューマーごとの配信は、IQueueCacheCursor という内部コンポーネントで実装され、コンシューマーごとの進行状況が追跡されます。 それにより、各コンシューマーは自身のペースでイベントを受け取ります。高速コンシューマーはキューからデキューされるのと同じくらい早くイベントを受信し、低速コンシューマーは後でイベントを受信します。 メッセージがすべてのコンシューマーに配信されると、キャッシュから削除できます。

バックプレッシャ

Orleans Streaming Runtime のバックプレッシャは、キューからエージェントにストリーム イベントを取り込みエージェントからストリーム コンシューマーにイベントを配信する 2 つの場所に適用されます。

後者は、組み込みの Orleans メッセージ配信メカニズムで提供されます。 すべてのストリーム イベントは、一度に 1 つずつ、標準の Orleans グレイン メッセージングを介してエージェントからコンシューマーに配信されます。 つまり、エージェントは各ストリーム コンシューマーに 1 つのイベント (またはイベントの限られたサイズのバッチ) を送信し、この呼び出しを待機します。 前のイベントのタスクが解決されるか破損するまで、次のイベントの配信は開始されません。 このように、コンシューマーごとの配信レートは、一度に 1 つのメッセージに制限されます。

キューからエージェントにストリーム イベントを取り込む場合、Orleans Streaming には新しい特殊なバックプレッシャー メカニズムが用意されています。 エージェントは、キューからイベントのデキューを切り離してコンシューマーに配信するため、低速なコンシューマーが 1 人ほど遅れて、IQueueCache がいっぱいになる可能性があります。 IQueueCache が無期限に大きくなるのを防ぐために、サイズを制限します (サイズ制限は構成可能です)。 ただし、エージェントは配信不能なイベントをスローすることはありません。

代わりに、キャッシュがいっぱいになると、エージェントはキューからのイベントのデキューの速度を低下させます。 そうすることで、キューから消費する速度 ("バックプレッシャー") を調整して、遅い配信期間を "乗り切り"、後で高速消費率に戻すことができます。 "低速配信" の谷を検出するために、IQueueCache では、個々のストリーム コンシューマーへのイベントの配信の進行状況を追跡するキャッシュ バケットの内部データ構造が使用されます。 これにより、非常に応答性の高い自己調整システムが実現します。