Udostępnij za pośrednictwem


[TechDays 2010]並列タスクのデモについて

今回は時間が足りなくなってお見せできなかった、並列タスクに関して記述します。最初にウォームアップということでConcurrent(同時実行)に関して説明します。

最初にご説明したのは、インフラストラクチャという観点でした。

  • OS
    1)32ビットカーネルであれば、スレッド単位でスケジュールされていること。
    2)64ビットカーネル(Win7以降)はユーザーモードスケジューラ(UMS)によって、スレッドの中にあるユーザー空間とカーネル空間を別個に管理することでコンテキストスイッチの切り替えが少なくなるようにスケジュールします。この機能によってスケジューラの速度を向上させています。
  • CLR
    1)CLR 2.0までは、スレッドプールはグローバルキューのみでスレッドを管理していること。
    2)CLR 4では、スレッドプールはグローバルキューとローカルワークスティーリングキューで構成されていて、動作しているスレッドが存在する場合にローカルワークスティーリングキューにタスクがキューイングされます。スレッドが空いた場合に、ローカルワークスティーリングキューから取り出して実行する仕組みになっています。この時のキューからの取り出し方が、CPUのL0キャッシュを考慮した取り出し方になっているためプロファイリングなどで調べると木目細かくスケジュールされていることがわかります。さらに補足すれば、実行されるスレッドは64ビットカーネルではUMSでスケジューリングされますのでOSのスケジューラとの相乗効果が得られます。

次に.NET Framework 4で強化されたものとしてライブラリがあります。強化された主なものとしては、以下のものがあります。

  • 並列LINQ(System.Linq)
  • 同期プリミティブやキャンセルの仕組み(System.Threading)
  • 並列タスク(System.Threading.Tasks)
  • ロックフリーコレクション(System.Collections.Concurrent)

並列リンク(PLINQ)に関しては、前回にデモの内容を補足しました。処理の並列化を考える上で重要になるのがアルゴリズムになります。アルゴリズムを大別すると、一般的に以下のようになります。

  • データ並列(PLINQなど)
  • タスク並列(前回の解説では、ForAllを使った並列実行など)

ここでデモとしてお見せしようと考えていたのが、タスク並列だったのです。が、時間の関係でお見せできなかったのです。用意していたデモは、以下のようなものになります。
ParallelTask 
このプログラム(PhotoViwer.exe)は、Windows AzureのBlobストレージサービスからデータを取得して表示するという動作を行います。内部的には、AzureImageDownloaderクラスを使ってテンポラリーにイメージファイルを生成し、ビューワーがFileSystemWatcher で監視して表示するようになっています。このようなプログラムを用意した理由は、一般的に並列化する場所は処理速度に大きな差のある個所を選択することで全体のスループットを向上しやすいからです。このプログラムには、以下のような機能を用意してあります。

  • 同期:一般的なプログラミングモデルです。
  • 非同期:CLRの非同期パターン(Begin/End)。
  • タスク:並列タスクを利用したパターンです。
  • エージェント:メッセージエージェントを利用したパターンです。
  • エージェント+アルファ:メッセージエージェントと並列タスクを利用したパターンです。

同期方式で行う方法は、多くの方が予測がつくことでしょう。問題は非同期パターンにあるのですが、何を問題にしているかといえばCloudBlobクラスの非同期ダウンロードとFileStreamクラスの非同期I/Oを組み合わせていることです。複数の非同期I/Oを組み合わせると、面倒な作業が多くなるという点に気付かれた方が多いことでしょう。このためにC#では、以下のようなコードを利用して非同期パターンを実現しています。

 DownloadState state = new DownloadState()
          {
                CurrentImage = 0,
                Blobs = this.container.ListBlobs().ToArray(),
                Container = this.container,
                Folder = this.Folder
            };

state.Iterate();

このコードは、非同期操作をラップするためにDownloadStateクラスを用意していることを示しています。そしてIterateメソッドによって、Blobリストを使って非同期操作(CloudBlobとFileStream)を行っています。

次にタスクを処理しているコードを以下に示します。

 Action action =
       () =>
        {
           DownloadState state = new DownloadState()
           {
              CurrentImage = 0,
              Blobs = this.container.ListBlobs().ToArray(),
              Container = this.container,
              Folder = this.Folder
           };
           state.Iterate();
        };

System.Threading.Tasks.Task task = 
    System.Threading.Tasks.Task.Factory.StartNew(action);

このコードは先に示した非同期操作をActionデリゲートにラップして、1つのタスクとして実行するものです。簡単なタスクにすることで同時実行ランタイムが、スケジュールするようになります。メッセージエージェントですが、残念ですがマネージ ライブラリとして用意されていませんので、C#のサンプルとしては記述していません(C++には用意されていますし、マネージライブラリとして使用したいのであればMPI.NETなどを検索してみてください)。この状態で作成したC#のAzureImageDownloaderクラスは、166行前後(コメントなどを含む)あります。これをF#で記述するとどうなるでしょうか。私が記述したF#のAzureImageDownloaderクラスでは、130行前後(コメントなどを含む)になります。このステップ数の差が、非同期操作や並列タスクに対する言語側の取り組み方の差を表していると考えることができるでしょう。

F#で非同期操作を実現するコードを以下に示します。

 member this.DownloadAllAsync() =
  async {
          for blob in container.ListBlobs() do
            let! name = downloadImageAsyn(
                  container.GetBlobReference(
                     blob.Uri.ToString()))
            ()
        }
  |> Async.StartImmediate

この記述は、F#のAsyncクラスを使った非同期ワークフローというパターンを表しています。そしてasync{}の中の最後の「 () 」に気がつかれた方もいらっしゃることでしょう。これはF#のデータ型でunitというものになり、空のデータを表すものです。C#などで使用するvoidとは異なり、中身のないデータを返すという動作を行います。この理由は、「for  ...  do 」 が式関数だからです。式は、何らかの値を返すというの当たり前のことです。この理由から、空のデータを返しています。次に、タスクを使ったコードを以下に示します。

 member this.DownloadAllTask() =
  container.ListBlobs()
  |> Seq.map (fun blob ->
                   downloadImageAsync(
                      container.GetBlobReference(
                          blob.Uri.ToString())))
  |> Async.Parallel
  |> Async.StartAsTask

このコードは、F#のパイプラインを使用してAsyncクラスのStartAsTaskメソッドによって、並列タスクライブラリで動作するようになっています。つまり、F#は並列タスクライブラリ自体をAsyncクラスで使用できるようになっているのです。ここでのポイントは、自分で並列タスクライブラリを使用するか、言語が用意するライブラリでシームレスに使用するかということです。次に、メッセージエージェントを使ったコードを示します。

 member this.DownloadAllAgent() =
  for blob in container.ListBlobs() do
      agent.Post(blob.Uri.ToString())

このコードで気がつくのは、「agent.Post」という見慣れない個所でしょう。agentとは、F#が用意しているMailboxProcessorクラスを使用した関数として定義しています。この関数の中で、Reciveメソッドで受け取ったメッセージ(BlobのUri)を取り出して処理をしています。最後のエージェント+アルファは、既に説明したメッセージエージェントとタスクの組み合わせで実現しています。ここまで実装して、130行前後のステップになっています。C#の166行前後と比べると、明らかに違うということがわかったのではないでしょうか(まあ、コードが最適化されているかどうかというがありますが、ここまで差がでるものでもないでしょう)。

ここまでの説明で私が言いたかったことは、「パラダイムを変えれば、異なるアプローチがある」ということです。この並列タスクの例でいえば、明らかに関数型パラダイムの方が便利だろうということです。

PS.F#には非同期ワークフローという概念があります。Expert F#という書籍のなかで、Don(F#の設計者)はHasklle のMonadなどを研究して作成したと述べています。具体的にどのように実現しているかいう点に関しては、スライドに記述してありますので、ご興味がある方はスライドを参照してください。最後にF#のワークフローとは、手続きという順序を保証するための概念です。なぜ順序を保証する必要があるかといえば、遅延実行などの仕組みが存在するためです。