Orleans を使用したストリーミング
Orleans v.1.0.0 では、プログラミング モデルにストリーミング拡張機能のサポートが追加されました。 ストリーミング拡張機能では、ストリームに関する検討と操作をよりシンプルかつ堅牢にする、一連の抽象化と API が提供されます。 ストリーミング拡張機能を使うと、開発者は一連のイベントを操作するリアクティブ アプリケーションを、構造化された方法で記述できます。 ストリーム プロバイダーの機能拡張モデルによって、プログラミング モデルに、Event Hubs、ServiceBus、Azure Queues、Apache Kafka など、広範な既存のキュー テクノロジとの互換性を持たせ、それら全体で移植可能にすることができます。 このようなキューを操作するために、特別なコードを記述したり、専用プロセスを実行したりする必要はありません。
注目すべき理由
お客様がストリーム処理に関するすべてを既に把握していて、Event Hubs、Kafka、Azure Stream Analytics、Apache Storm、Apache Spark Streaming、.NET の Reactive Extensions (Rx) などのテクノロジに精通している場合、なぜこれに注目すべきか疑問に思うかもしれません。 なぜさらに別のストリーム処理システムが必要で、アクターはストリームとどのように関係するのでしょうか?Orleans ストリームを使う理由に関する記事の目的は、その疑問に答えることです。
プログラミング モデル
Orleans ストリーム プログラミング モデルの背後には、いくつかの原則があります。
- Orleans ストリームは "仮想的" です。 つまり、ストリームは常に存在します。 明示的に作成されることも破棄されることもなく、失敗することもありません。
- ストリームはストリーム ID "によって識別されます"。これは GUID と文字列で構成される "論理名" にすぎません。
- Orleans ストリームを使用すると、"時間と空間の両方で、データの生成をその処理から切り離すことができるようになります"。 つまり、ストリームのプロデューサーとストリームのコンシューマーは、異なるサーバー上または異なるタイム ゾーンに存在することができ、障害に耐えることができます。
- Orleans ストリームは "軽量かつ動的" です。 Orleans ストリーミング ランタイムは、高いレートで送受信される大量のストリームを処理するように設計されています。
- Orleans ストリームの "バインディングは動的です"。 Orleans ストリーミング ランタイムは、高いレートでグレインがストリームと接続および切断するケースを処理するように設計されています。
- Orleans ストリーミング ランタイムは "ストリーム消費のライフサイクルを透過的に管理します"。 アプリケーションがストリームをサブスクライブした後は、障害が存在している場合でもストリームのイベントを受信します。
- Orleans ストリームは "各グレインおよび Orleans クライアント間で均一に動作します"。
プログラミング API
アプリケーションは Orleans.Streams.IAsyncStream<T> を使用してストリームと対話します。これは Orleans.Streams.IAsyncObserver<T> および Orleans.Streams.IAsyncObservable<T> インターフェイスを実装するものです。 これらの API は、よく知られた .NET の Reactive Extensions (Rx) に似ています。
次の典型的な例では、デバイスによっていくつかのデータが生成され、クラウドで実行されているサービスに HTTP 要求として送信されます。 フロントエンド サーバーで実行されている Orleans クライアントはこの HTTP 呼び出しを受け取り、一致するデバイス ストリームにデータを発行します。
public async Task OnHttpCall(DeviceEvent deviceEvent)
{
// Post data directly into the device's stream.
IStreamProvider streamProvider =
GrainClient.GetStreamProvider("MyStreamProvider");
IAsyncStream<DeviceEventData> deviceStream =
streamProvider.GetStream<DeviceEventData>(
deviceEvent.DeviceId, "MyNamespace");
await deviceStream.OnNextAsync(deviceEvent.Data);
}
次の別の例では、チャット ユーザー (Orleans グレインとして実装) がチャット ルームに参加し、このルーム内の他のすべてのユーザーによって生成されたチャット メッセージのストリームに対するハンドルを取得し、それにサブスクライブします。 チャット ユーザーは、チャット ルームのグレイン自体 (システムにそのようなグレインが存在しない可能性があります) や、メッセージを生成するそのグループ内の他のユーザーについて知る必要がないことがわかります。 言うまでもなく、ユーザーはチャット ストリームに発行するために、現在ストリームにサブスクライブしているユーザーを知る必要はありません。 これは、時間と空間でチャット ユーザーを完全に切り離すことができるしくみを示しています。
public class ChatUser: Grain
{
public async Task JoinChat(Guid chatGroupId)
{
IStreamProvider streamProvider =
base.GetStreamProvider("MyStreamProvider");
IAsyncStream<string> chatStream =
streamProvider.GetStream<string>(chatGroupId, "MyNamespace");
await chatStream.SubscribeAsync(
async (message, token) => Console.WriteLine(message))
}
}
クイック スタート サンプル
このクイック スタート サンプルは、アプリケーションでストリームを使用するワークフロー全体の簡単な概要として最適です。 これを読んだ後は、概念をより深く理解するために、ストリーム プログラミング API に関する記事を参照する必要があります。
ストリーム プログラミング API
ストリーム プログラミング API に関する記事には、プログラミング API の詳細な説明が記載されています。
Stream プロバイダー
ストリームは、さまざまな形や形式の物理チャネルを介して取得でき、異なるセマンティクスを持つことができます。 Orleans ストリーミングは、システムの機能拡張ポイントであるストリーム プロバイダーの概念を通じて、このような多様性をサポートするように設計されています。 現在、Orleans には 2 つのストリーム プロバイダーの実装があります。TCP ベースの Simple Message Stream Provider と、Azure Queue ベースの Azure Queue Stream Provider です。 ストリーム プロバイダーの詳細については、ストリーム プロバイダーに関する記事を参照してください。
ストリームのセマンティクス
ストリーム サブスクリプションのセマンティクス:
Orleans ストリームでは、ストリーム サブスクリプション操作の逐次一貫性が保証されます。 具体的には、コンシューマーがストリームにサブスクライブしている場合、サブスクリプション操作を表す Task
が正常に解決されると、そのコンシューマーにはサブスクライブ後に生成されたすべてのイベントが表示されます。 さらに、巻き戻し可能なストリームによって、StreamSequenceToken を使って過去の任意の時点からサブスクライブできます。 詳細については、「Orleans ストリーム プロバイダー」を参照してください。
個々のストリーム イベント配信の保証:
個々のイベント配信の保証は、個々のストリーム プロバイダーに依存します。 ベスト エフォートで最大でも 1 回の配信が提供されることもあれば (Simple Message Stream (SMS) など)、少なくとも 1 回の配信が提供されることもあります (Azure Queue ストリームなど)。 厳密に 1 回の配信を保証するストリーミング プロバイダーを構築することもできます (そのようなプロバイダーはまだありませんが、構築することは可能です)。
イベント配信の順序:
イベントの順序も、特定のストリーム プロバイダーに依存します。 SMS ストリームでは、コンシューマーに表示されるイベントの順序をプロデューサーが明示的に制御するために、それらの発行方法を制御します。 Azure Queue ストリームでは FIFO の順序が保証されません。基になる Azure キューで障害が発生した場合の順序が保証されないためです。 アプリケーションでは、StreamSequenceToken
を使用してストリーム配信の順序を制御することもできます。
ストリームの実装
Orleans ストリームの実装に関する記事では、内部実装の大まかな概要が説明されています。
コード サンプル
グレイン内でストリーミング API を使用する方法に関するその他の例については、こちらを参照してください。 今後、追加のサンプルを作成する予定です。