タスクの並列化 (コンカレンシー ランタイム)
コンカレンシー ランタイムでは、タスクは特定のジョブを実行する作業の単位です。通常、タスクは他のタスクと並列に実行できます。 タスクは、より細かい別のタスクに分割することができ、それをタスク グループに整理することができます。
非同期コードを記述して、非同期操作が完了したときに操作を実行する場合に、タスクを使用します。 たとえば、タスクを使用して非同期的にファイルを読み取り、さらに別のタスク (このドキュメントで後ほど説明する継続タスク) を使用して、データが使用できる状態になったときにそれを処理することができます。 逆に、タスク グループを使用して、並列処理を分解することができます。 たとえば、残存作業を 2 つのパーティションに分割する再帰的なアルゴリズムがあるとします。 タスク グループを使用すると、これらのパーティションを同時に実行して、分割処理の完了を待つことができます。
ヒント
同じルーチンをコレクションの全要素に並列に適用する場合は、タスクやタスク グループでなく、concurrency::parallel_for などの並列アルゴリズムを使用します。 並列アルゴリズムの詳細については、「並列アルゴリズム」を参照してください。
重要なポイント
ラムダ式に変数を渡すときに参照渡しを使用する場合、タスクが終了するまでその変数の有効期間が続くようにする必要があります。
非同期コードを記述するときには、タスク (concurrency::task クラス) を使用します。 タスク クラスは、コンカレンシー ランタイムではなく、Windows ThreadPool をスケジューラとして使用します。
並列処理を分解して、その分割処理の完了を待つ場合には、タスク グループ (concurrency::task_group クラスまたは concurrency::parallel_invoke アルゴリズム) を使用します。
継続を作成するには、concurrency::task::then メソッドを使用します。 continuation は別のタスクが完了した後に非同期的に実行されるタスクです。 一連の非同期処理を形成するために、継続をいくつでも接続できます。
タスク ベースの継続は、継続元タスクが取り消されたり、例外をスローした場合でも、継続元タスクが完了すると常に実行がスケジュールされます。
一連のタスクのすべてのメンバーが完了した後に完了するタスクを作成するには、concurrency::when_all を使用します。 一連のタスクの 1 つのメンバーが完了した後に完了するタスクを作成するには、concurrency::when_any を使用します。
タスクとタスク グループは、並列パターン ライブラリ (PPL) の取り消し機構に参加できます。 詳細については、「PPL における取り消し処理」を参照してください。
タスクおよびタスク グループによってスローされた例外をランタイムが処理するしくみの詳細については、「例外処理」を参照してください。
目次
ラムダ式の使用
ラムダ式は簡潔な構文であるため、タスクおよびタスク グループで実行される作業を定義する一般的な方法です。 使用のヒントを次に示します。
通常、タスクはバックグラウンド スレッドで実行されるため、ラムダ式の変数をキャプチャする場合にはオブジェクトの有効期間に注意してください。 変数を値でキャプチャすると、その変数のコピーがラムダの本体に作成されます。 参照によってキャプチャする場合には、コピーは作成されません。 したがって、参照によってキャプチャするすべての変数の有効期間が、それを使用するタスクのために十分であるように注意します。
タスクにラムダ式を渡す場合、スタックに割り当てられた変数を参照によってキャプチャしないようにします。
ラムダ式でキャプチャする変数を明示して、値でのキャプチャと参照でのキャプチャを識別できるようにします。 このため、ラムダ式に対して
[=]
または[&]
オプションを使用しないことをお勧めします。
一般的なパターンは、継続のチェーンの 1 つのタスクが変数に割り当てられ、別のタスクがその変数を読み取る場合です。 各継続タスクが変数のそれぞれのコピーを保持するため、値によるキャプチャができません。 スタック割り当て変数においても、変数が有効でなくなる場合があるため、参照によるキャプチャができません。
この問題を解決するには、std::shared_ptr などのスマート ポインターを使用し、変数をラップして、スマート ポインターを値で渡します。 この方法を使用すると、基になるオブジェクトが割り当てられ、読み込むことができ、それを使用するタスクのために十分な有効期間となります。 変数が Windows ランタイム オブジェクトへのポインターであったり、参照カウント ハンドル (^
) である場合でも、この方法を使用できます。 基本的な例を次に示します。
// lambda-task-lifetime.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>
#include <string>
using namespace concurrency;
using namespace std;
task<wstring> write_to_string()
{
// Create a shared pointer to a string that is
// assigned to and read by multiple tasks.
// By using a shared pointer, the string outlives
// the tasks, which can run in the background after
// this function exits.
auto s = make_shared<wstring>(L"Value 1");
return create_task([s]
{
// Print the current value.
wcout << L"Current value: " << *s << endl;
// Assign to a new value.
*s = L"Value 2";
}).then([s]
{
// Print the current value.
wcout << L"Current value: " << *s << endl;
// Assign to a new value and return the string.
*s = L"Value 3";
return *s;
});
}
int wmain()
{
// Create a chain of tasks that work with a string.
auto t = write_to_string();
// Wait for the tasks to finish and print the result.
wcout << L"Final value: " << t.get() << endl;
}
/* Output:
Current value: Value 1
Current value: Value 2
Final value: Value 3
*/
ラムダ式について詳しくは、「ラムダ式」をご覧ください。
タスク クラス
concurrency::task クラスを使用すると、タスクを一連の依存する操作に構成できます。 このコンポジション モデルは、継続の概念によってサポートされます。 継続を使うと、前のタスクまたは継続元タスクが完了したときに、コードを実行できます。 継続元タスクの結果は、1 つ以上の継続タスクへの入力として渡されます。 継続元タスクが完了すると、それを待っているすべての継続タスクが実行のためにスケジュールされます。 各継続タスクは継続元タスクの結果のコピーを受信します。 また、これらの継続タスクが、他の継続の継続元タスクである場合もあり、このようにしてタスクのチェーンが作成されます。 継続を使うと、特定の依存関係を持つ、任意の長さのタスクのチェーンを作成できます。 また、タスクは開始前または実行中に協調的に、取り消しに参加できます。 取り消しモデルの詳細については、「PPL における取り消し処理」を参照してください。
task
はテンプレート クラスです。 型パラメーター T
は、タスクで生成される結果の型です。 タスクが値を返さない場合には、この型は void
となります。 T
は const
修飾子を使用できません。
タスクを作成する場合には、タスクの本体を実行する処理関数を提供します。 この処理関数には、ラムダ関数、関数ポインター、または関数オブジェクトを使用できます。 結果を取得せずに、タスクの完了を待つためには concurrency::task::wait メソッドを呼び出します。 task::wait
メソッドは、タスクが完了したか取り消されたかを示す concurrency::task_status の値を返します。 タスクの結果を取得するには、concurrency::task::get メソッドを呼び出します。 このメソッドは、task::wait
を呼び出してタスクの完了を待ち、結果が使用できるようになるまで現在のスレッドの実行をブロックします。
次の例では、タスクを作成し、結果を待って、値を表示する方法を示します。 このドキュメントの例では、より簡潔な構文を提供するため、ラムダ関数を使用しています。 しかし、タスクを使用する場合には、関数ポインター、および関数オブジェクトを使用することも可能です。
// basic-task.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
// Create a task.
task<int> t([]()
{
return 42;
});
// In this example, you don't necessarily need to call wait() because
// the call to get() also waits for the result.
t.wait();
// Print the result.
wcout << t.get() << endl;
}
/* Output:
42
*/
concurrency::create_task 関数を使用する場合には、型を宣言する代わりに auto
キーワードを使用できます。 たとえば、単位行列を作成して印刷する、次のコードを考えてみます。
// create-task.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <string>
#include <iostream>
#include <array>
using namespace concurrency;
using namespace std;
int wmain()
{
task<array<array<int, 10>, 10>> create_identity_matrix([]
{
array<array<int, 10>, 10> matrix;
int row = 0;
for_each(begin(matrix), end(matrix), [&row](array<int, 10>& matrixRow)
{
fill(begin(matrixRow), end(matrixRow), 0);
matrixRow[row] = 1;
row++;
});
return matrix;
});
auto print_matrix = create_identity_matrix.then([](array<array<int, 10>, 10> matrix)
{
for_each(begin(matrix), end(matrix), [](array<int, 10>& matrixRow)
{
wstring comma;
for_each(begin(matrixRow), end(matrixRow), [&comma](int n)
{
wcout << comma << n;
comma = L", ";
});
wcout << endl;
});
});
print_matrix.wait();
}
/* Output:
1, 0, 0, 0, 0, 0, 0, 0, 0, 0
0, 1, 0, 0, 0, 0, 0, 0, 0, 0
0, 0, 1, 0, 0, 0, 0, 0, 0, 0
0, 0, 0, 1, 0, 0, 0, 0, 0, 0
0, 0, 0, 0, 1, 0, 0, 0, 0, 0
0, 0, 0, 0, 0, 1, 0, 0, 0, 0
0, 0, 0, 0, 0, 0, 1, 0, 0, 0
0, 0, 0, 0, 0, 0, 0, 1, 0, 0
0, 0, 0, 0, 0, 0, 0, 0, 1, 0
0, 0, 0, 0, 0, 0, 0, 0, 0, 1
*/
create_task
関数を使用して同等の操作を行うことができます。
auto create_identity_matrix = create_task([]
{
array<array<int, 10>, 10> matrix;
int row = 0;
for_each(begin(matrix), end(matrix), [&row](array<int, 10>& matrixRow)
{
fill(begin(matrixRow), end(matrixRow), 0);
matrixRow[row] = 1;
row++;
});
return matrix;
});
タスクの実行時に例外がスローされた場合、ランタイムは、それ以降の task::get
または task::wait
の呼び出し、またはタスク ベースの継続への呼び出しで、その例外をマーシャリングします。 タスク例外処理機構の詳細については、「例外処理」を参照してください。
task
を使用する例、concurrency::task_completion_event、キャンセルについては、「チュートリアル: タスクおよび XML HTTP 要求を使用した接続」を参照してください。 (task_completion_event
クラスについてはドキュメントの後の部分で説明されています。)
ヒント
UWP アプリのタスクに固有の詳細情報については、「C++ での非同期プログラミング」および「C++ における Windows ストア アプリ用の非同期操作の作成」を参照してください。
継続タスク
非同期プログラミングでは、非同期操作で完了時に 2 番目の操作を呼び出してデータを渡すのが一般的です。 これまで、この処理はコールバック メソッドを使用して行っていました。 コンカレンシー ランタイムでは、継続タスクに同じ機能が用意されています。 継続タスク (単に "継続" とも呼ばれます) とは、別のタスク ("継続元" と呼ばれます) が完了したときにそのタスクによって呼び出される非同期タスクのことです。 継続を使用して、次の操作を行うことができます。
継続元のデータを継続に渡します。
継続を呼び出す場合、呼び出さない場合についての正確な条件を指定します。
継続が開始される前、または継続の実行中に、継続を取り消します。
継続をスケジュールする方法についてのヒントを提供します。 (これはユニバーサル Windows プラットフォーム (UWP) アプリにのみ適用されます。 詳細は、「C++ での UWP アプリ用の非同期操作の作成」を参照してください。)
同じ継続元から複数の継続を呼び出します。
複数の継続元のすべてまたはいずれかが完了したときに 1 つの継続を呼び出します。
任意の長さで連続して継続を実行します。
継続元によってスローされた例外を処理するために継続を使用します。
これらの機能を使うと、最初のタスクが完了したときに、1 つ以上のタスクを実行できます。 たとえば、最初のタスクがディスクからデータを読み取った後に、ファイルを圧縮する継続を作成できます。
前の例を変更した次の例では、concurrency::task::then メソッドを使用して、継続元タスクが使用可能となったときにその値を印刷する継続をスケジュールします。
// basic-continuation.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
auto t = create_task([]() -> int
{
return 42;
});
t.then([](int result)
{
wcout << result << endl;
}).wait();
// Alternatively, you can chain the tasks directly and
// eliminate the local variable.
/*create_task([]() -> int
{
return 42;
}).then([](int result)
{
wcout << result << endl;
}).wait();*/
}
/* Output:
42
*/
タスクを任意の長さにチェーンしたり、入れ子にできます。 またタスクは、複数の継続を持つことができます。 次の例では、前のタスクの値を 3 回インクリメントする、基本的な継続のチェーンを示しています。
// continuation-chain.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
auto t = create_task([]() -> int
{
return 0;
});
// Create a lambda that increments its input value.
auto increment = [](int n) { return n + 1; };
// Run a chain of continuations and print the result.
int result = t.then(increment).then(increment).then(increment).get();
wcout << result << endl;
}
/* Output:
3
*/
継続は、別のタスクを返すこともできます。 取り消されない場合、このタスクは後続の継続の前に実行されます。 この手法を非同期ラッピング解除と呼びます。 非同期ラッピング解除は、追加の作業をバックグラウンドで実行するときに、現在のタスクが現在のスレッドをブロックしないようにする場合に役立ちます。 (これは、継続が UI スレッドで実行される UWP アプリケーションで共通です)。 次の例は、3 つのタスクを示しています。 最初のタスクは、継続タスクの前に実行される、別のタスクを返します。
// async-unwrapping.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
auto t = create_task([]()
{
wcout << L"Task A" << endl;
// Create an inner task that runs before any continuation
// of the outer task.
return create_task([]()
{
wcout << L"Task B" << endl;
});
});
// Run and wait for a continuation of the outer task.
t.then([]()
{
wcout << L"Task C" << endl;
}).wait();
}
/* Output:
Task A
Task B
Task C
*/
重要
タスクの継続が N
型の入れ子のタスクを返す場合、結果のタスクは N
型でなく task<N>
型となり、入れ子のタスクが完了すると終了します。 つまり、継続は入れ子のタスクのラッピング解除を行います。
値ベースとタスクベースの継続
task
オブジェクトは戻り値の型が T
であるため、その継続タスクに T
または task<T>
型の値を指定できます。 T
型を受け取る継続は、値ベースの継続と呼ばれます。 値ベースの継続は、継続元タスクがエラーなしに完了して取り消されない場合に、実行のためにスケジュールされます。 パラメーターとして task<T>
型を受け取る継続は、タスク ベースの継続と呼ばれます。 タスク ベースの継続は、継続元タスクが取り消されたり、例外をスローした場合でも、継続元タスクが完了すると常に実行がスケジュールされます。 次に task::get
を呼び出して、継続元タスクの結果を取得できます。 継続元タスクが取り消されると、task::get
は concurrency::task_canceled をスローします。 継続元タスクが例外をスローすると、task::get
は再度、例外をスローします。 タスク ベースの継続は、その継続元タスクが取り消された場合、取り消し済みとしてマークされません。
タスクを構成する
このセクションでは、一般的なパターンを実装する複数のタスクを構成するために役立つ concurrency::when_all と concurrency::when_any 関数について説明します。
when_all関数
when_all
関数は、一連のタスクの完了後に完了するタスクを生成します。 この関数は、セット内の各タスクの結果を含む std::vector オブジェクトを返します。 次の基本的な例では、when_all
を使用して、3 つの他のタスクの完了を表すタスクを作成します。
// join-tasks.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <array>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
// Start multiple tasks.
array<task<void>, 3> tasks =
{
create_task([] { wcout << L"Hello from taskA." << endl; }),
create_task([] { wcout << L"Hello from taskB." << endl; }),
create_task([] { wcout << L"Hello from taskC." << endl; })
};
auto joinTask = when_all(begin(tasks), end(tasks));
// Print a message from the joining thread.
wcout << L"Hello from the joining thread." << endl;
// Wait for the tasks to finish.
joinTask.wait();
}
/* Sample output:
Hello from the joining thread.
Hello from taskA.
Hello from taskC.
Hello from taskB.
*/
Note
when_all
に渡すタスクは均一である必要があります。 つまり、これらはすべて同じ型を返す必要があります。
次に示す例のように、&&
構文を使用して、一連のタスクの完了後に完了するタスクを生成することもできます。
auto t = t1 && t2; // same as when_all
通常は、継続を when_all
と共に使用して、一連のタスクが終了した後に操作を実行します。 前の例を変更した次の例では、それぞれが int
の結果を生成する、3 つのタスクの合計を印刷します。
// Start multiple tasks.
array<task<int>, 3> tasks =
{
create_task([]() -> int { return 88; }),
create_task([]() -> int { return 42; }),
create_task([]() -> int { return 99; })
};
auto joinTask = when_all(begin(tasks), end(tasks)).then([](vector<int> results)
{
wcout << L"The sum is "
<< accumulate(begin(results), end(results), 0)
<< L'.' << endl;
});
// Print a message from the joining thread.
wcout << L"Hello from the joining thread." << endl;
// Wait for the tasks to finish.
joinTask.wait();
/* Output:
Hello from the joining thread.
The sum is 229.
*/
この例では、タスク ベースの継続を作成するために task<vector<int>>
を指定することもできます。
一連のタスクのいずれかのタスクが取り消されたり、例外をスローした場合、when_all
は残りのタスクを完了を待たずに、直ちに終了します。 例外がスローされた場合、task::get
を返すタスク オブジェクトで task::wait
または when_all
を呼び出すと、ランタイムは再度、例外をスローします。 複数のタスクからスローすると、ランタイムはその中の 1 つを選択します。 したがって、すべてのタスクが完了してからすべての例外を確認するようにします。タスクの例外がハンドルされない場合、アプリケーションは終了します。
プログラムがすべての例外を確認するために使用できるユーティリティ関数を次に示します。 指定された範囲のタスクごとに、observe_all_exceptions
は再スローされる例外をトリガーし、その例外を受け取ります。
// Observes all exceptions that occurred in all tasks in the given range.
template<class T, class InIt>
void observe_all_exceptions(InIt first, InIt last)
{
std::for_each(first, last, [](concurrency::task<T> t)
{
t.then([](concurrency::task<T> previousTask)
{
try
{
previousTask.get();
}
// Although you could catch (...), this demonstrates how to catch specific exceptions. Your app
// might handle different exception types in different ways.
catch (Platform::Exception^)
{
// Swallow the exception.
}
catch (const std::exception&)
{
// Swallow the exception.
}
});
});
}
C++ と XAML を使用し、ディスクに一連のファイルを書き込む UWP アプリケーションを考えてみます。 次の例は、when_all
と observe_all_exceptions
を使用して、プログラムがすべての例外を確認する方法を示します。
// Writes content to files in the provided storage folder.
// The first element in each pair is the file name. The second element holds the file contents.
task<void> MainPage::WriteFilesAsync(StorageFolder^ folder, const vector<pair<String^, String^>>& fileContents)
{
// For each file, create a task chain that creates the file and then writes content to it. Then add the task chain to a vector of tasks.
vector<task<void>> tasks;
for (auto fileContent : fileContents)
{
auto fileName = fileContent.first;
auto content = fileContent.second;
// Create the file. The CreationCollisionOption::FailIfExists flag specifies to fail if the file already exists.
tasks.emplace_back(create_task(folder->CreateFileAsync(fileName, CreationCollisionOption::FailIfExists)).then([content](StorageFile^ file)
{
// Write its contents.
return create_task(FileIO::WriteTextAsync(file, content));
}));
}
// When all tasks finish, create a continuation task that observes any exceptions that occurred.
return when_all(begin(tasks), end(tasks)).then([tasks](task<void> previousTask)
{
task_status status = completed;
try
{
status = previousTask.wait();
}
catch (COMException^ e)
{
// We'll handle the specific errors below.
}
// TODO: If other exception types might happen, add catch handlers here.
// Ensure that we observe all exceptions.
observe_all_exceptions<void>(begin(tasks), end(tasks));
// Cancel any continuations that occur after this task if any previous task was canceled.
// Although cancellation is not part of this example, we recommend this pattern for cases that do.
if (status == canceled)
{
cancel_current_task();
}
});
}
この例を実行するには
- MainPage.xaml で、
Button
コントロールを追加します。
<Button x:Name="Button1" Click="Button_Click">Write files</Button>
- MainPage.xaml.h で、これらの事前宣言を
private
クラス宣言のMainPage
セクションに追加します。
void Button_Click(Platform::Object^ sender, Windows::UI::Xaml::RoutedEventArgs^ e);
concurrency::task<void> WriteFilesAsync(Windows::Storage::StorageFolder^ folder, const std::vector<std::pair<Platform::String^, Platform::String^>>& fileContents);
- MainPage.xaml.cpp で、
Button_Click
イベント ハンドラーを実装します。
// A button click handler that demonstrates the scenario.
void MainPage::Button_Click(Object^ sender, RoutedEventArgs^ e)
{
// In this example, the same file name is specified two times. WriteFilesAsync fails if one of the files already exists.
vector<pair<String^, String^>> fileContents;
fileContents.emplace_back(make_pair(ref new String(L"file1.txt"), ref new String(L"Contents of file 1")));
fileContents.emplace_back(make_pair(ref new String(L"file2.txt"), ref new String(L"Contents of file 2")));
fileContents.emplace_back(make_pair(ref new String(L"file1.txt"), ref new String(L"Contents of file 3")));
Button1->IsEnabled = false; // Disable the button during the operation.
WriteFilesAsync(ApplicationData::Current->TemporaryFolder, fileContents).then([this](task<void> previousTask)
{
try
{
previousTask.get();
}
// Although cancellation is not part of this example, we recommend this pattern for cases that do.
catch (const task_canceled&)
{
// Your app might show a message to the user, or handle the error in some other way.
}
Button1->IsEnabled = true; // Enable the button.
});
}
- MainPage.xaml.cpp で、例に示すように
WriteFilesAsync
を実装します。
ヒント
when_all
は、その結果、task
を生成する、非ブロッキング関数です。 task::wait とは異なり、UWP アプリのこの関数は ASTA (アプリケーション STA) スレッドで安全に呼び出することができます。
when_any関数
when_any
関数は、一連のタスクの最初のタスクの完了後に完了するタスクを生成します。 この関数は、完了したタスクの結果とタスクのインデックスをセットに含む std::pair オブジェクトを返します。
when_any
関数は、特に次のシナリオに役立ちます。
重複した操作。 多くの方法で実行できるアルゴリズムまたは操作を検討してください。
when_any
関数を使用すると、最初の操作を完了して残りの操作を取り消すように、操作を選択できます。インタリーブされた操作。 複数の操作を開始して、それらの操作のすべてが完了し、各操作が完了したら
when_any
関数を使って結果を処理するようにできます。 1 つの操作が完了したら、1 つ以上の追加タスクを開始できます。制限された操作。
when_any
関数を使用して、前のシナリオを拡張し、同時操作の数を制限することができます。有効期限切れの操作。
when_any
関数を使用して、1 つ以上のタスクと特定の時間以降に完了する 1 つのタスクから選択することができます。
when_all
と同様に、通常は継続を when_any
と共に使用して、一連タスクの最初のタスクが終了した後に操作を実行します。 次の基本的な例では、when_any
を使用して、他の 3 つのタスクの最初のタスクが完了したときに完了するタスクを作成します。
// select-task.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <array>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
// Start multiple tasks.
array<task<int>, 3> tasks = {
create_task([]() -> int { return 88; }),
create_task([]() -> int { return 42; }),
create_task([]() -> int { return 99; })
};
// Select the first to finish.
when_any(begin(tasks), end(tasks)).then([](pair<int, size_t> result)
{
wcout << "First task to finish returns "
<< result.first
<< L" and has index "
<< result.second
<< L'.' << endl;
}).wait();
}
/* Sample output:
First task to finish returns 42 and has index 1.
*/
この例では、タスク ベースの継続を作成するために task<pair<int, size_t>>
を指定することもできます。
Note
when_all
と同様に、when_any
に渡すタスクはすべて同じ型を返す必要があります。
次に示す例のように、||
構文を使用して、一連のタスクの最初のタスクの完了後に完了するタスクを生成することもできます。
auto t = t1 || t2; // same as when_any
ヒント
when_all
と同様に、when_any
は非ブロッキングであり、ASTA スレッドの UWP アプリケーションで呼び出しても安全です。
タスクの実行の遅延
条件が満たされるまで、または外部イベントに応答してタスクを開始するまで、タスクの実行を遅延する必要がある場合があります。 たとえば、非同期プログラミングでは、I/O 完了のイベントに応答してタスクを開始する必要があります。
これを実現する 2 とおりの方法は、継続を使用するか、タスクを開始してタスクの処理関数の中でイベントを待つことです。 しかし、これらの方法の 1 つを使用できない場合もあります。 たとえば、継続を作成するためには、継続元タスクが必要です。 しかし、継続元タスクがない場合には、タスクの完了イベントを作成し、後で継続元タスクが使用できるようになったときに、その完了イベントをチェーンすることができます。 また、待機中のタスクはスレッドをブロックするため、タスクの完了イベントを使用して、非同期操作が完了したときに処理を行い、スレッドを解放することもできます。
concurrency::task_completion_event クラスによって、このようなタスクの構成が容易となります。 task
クラスと同様に、型パラメーター T
は、タスクで生成される結果の型です。 タスクが値を返さない場合には、この型は void
となります。 T
は const
修飾子を使用できません。 通常、task_completion_event
オブジェクトは、値が使用できるようになると通知する、スレッドまたはタスクに提供されます。 同時に、1 つ以上のタスクは、そのイベントのリスナーとして設定されます。 イベントが設定されると、リスナー タスクが完了し、継続が実行されるようにスケジュールされます。
task_completion_event
を使用して遅延後に完了するタスクを実装する例の詳細については、「方法: 遅延後に完了するタスクを作成する」を参照してください。
タスク グループ
タスク グループを使用して、タスクのコレクションを編成します。 タスク グループでは、ワーク スティーリング キューにタスクを置きます。 スケジューラはこのキューからタスクを削除し、使用できるコンピューティング リソースでそのタスクを実行します。 タスク グループにタスクを追加した場合、すべてのタスクが終了するまで待機することも、まだ開始されていないタスクを取り消すこともできます。
PPL では、concurrency::task_group および concurrency::structured_task_group クラスを使用してタスク グループを表し、concurrency::task_handle クラスを使用してグループで実行されるタスクを表します。 task_handle
クラスは、処理を行うコードをカプセル化します。 task
クラスと同様に、この処理関数には、ラムダ関数、関数ポインター、または関数オブジェクトを使用できます。 通常、task_handle
オブジェクトを直接操作する必要はありません。 代わりに、タスク グループに処理関数を渡します。タスク グループによって task_handle
オブジェクトが作成および管理されます。
タスク グループは非構造化タスク グループ および構造化タスク グループという 2 つのカテゴリに分類されます。 PPL では、task_group
クラスを使用して非構造化タスク グループを表し、structured_task_group
クラスを使用して構造化タスク グループを表します。
重要
また、PPL では、structured_task_group
クラスを使用して一連のタスクを並列に実行する concurrency::parallel_invoke アルゴリズムも定義します。 parallel_invoke
アルゴリズムにはより簡潔な構文が用意されているため、可能であれば structured_task_group
クラスの代わりに使用することをお勧めします。 parallel_invoke
については、「並列アルゴリズム」で詳しく説明されています。
parallel_invoke
は、同時に実行する独立したタスクが複数あり、すべてのタスクが終了するまで待機してから処理を続行する必要がある場合に使用します。 この方法は通常、分岐と結合の並列化と呼ばれます。 task_group
は、同時に実行する独立したタスクが複数あり、それらのタスクが終了するタイミングがまだ先である場合に使用します。 たとえば、task_group
オブジェクトにタスクを追加して、それらのタスクが別の関数や別のストレッドで終了するまで待機できます。
タスク グループでは、キャンセル処理の概念がサポートされています。 キャンセル処理を使用すると、操作全体を取り消すことをアクティブなすべてのタスクに通知できます。 また、キャンセル処理により、まだ開始されていないタスクが実行されるのを防止できます。 取り消し処理の詳細については、PPL における取り消し処理 をご覧ください。
また、ランタイムでは、例外処理モデルを使用することによって、タスクから例外をスローし、関連するタスク グループが終了するまで待機しているときにその例外を処理できます。 この例外処理モデルの詳細については、「例外処理」を参照してください。
task_groupとstructured_task_groupの比較
task_group
クラスの代わりに parallel_invoke
または structured_task_group
を使用することが推奨されますが、可変個のタスクを実行する並列アルゴリズムやキャンセル処理のサポートが必要な並列アルゴリズムを記述する場合など、structured_task_group
を使用した方がよい場合もあります。 ここでは、task_group
クラスと structured_task_group
クラスの違いについて説明します。
task_group
クラスはスレッド セーフです。 したがって、複数のスレッドから task_group
オブジェクトにタスクを追加したり、複数のスレッドから task_group
オブジェクトに対して待機や取り消しの操作を行ったりしてもかまいません。 structured_task_group
オブジェクトの構築と破棄は、同じ構文のスコープで行う必要があります。 また、structured_task_group
オブジェクトに対する操作はすべて同じスレッドで行う必要があります。 この規則の例外は、concurrency::structured_task_group::cancel メソッドと concurrency::structured_task_group::is_canceling メソッドです。 子タスクでこれらのメソッドを呼び出すことで、任意のタイミングで親タスク グループを取り消したり、取り消し処理をチェックしたりできます。
concurrency::task_group::wait メソッドまたは concurrency::task_group::run_and_wait メソッドの呼び出し後、task_group
オブジェクトに対しては別のタスクを実行できます。 一方、concurrency::structured_task_group::wait メソッドまたは concurrency::structured_task_group::run_and_wait メソッドの呼び出し後、structured_task_group
オブジェクトに対して別のタスクを実行する場合の動作は未定義です。
structured_task_group
クラスはスレッド間で同期されないため、実行に伴うオーバーヘッドが task_group
クラスよりも少なくなります。 したがって、複数のスレッドから処理をスケジュールする必要のない問題に対処する場合に、parallel_invoke
アルゴリズムを使用できないときは、structured_task_group
クラスを使用すると、よりパフォーマンスの高いコードを作成できます。
structured_task_group
オブジェクトを別の structured_task_group
オブジェクト内で使用する場合は、外部オブジェクトが終了する前に内部オブジェクトが終了して破棄される必要があります。 task_group
クラスの場合、外部グループが終了する前に、入れ子になったタスク グループが終了する必要はありません。
非構造化タスク グループと構造化タスク グループでは、さまざまな方法でタスク ハンドルを操作します。 task_group
オブジェクトには処理関数を直接渡すことができます。task_group
オブジェクトによってタスク ハンドルが自動的に作成および管理されます。 structured_task_group
クラスを使用する場合は、タスクごとに task_handle
オブジェクトを管理する必要があります。 各 task_handle
オブジェクトは、関連する structured_task_group
オブジェクトの有効期間を通じて有効である必要があります。 次の基本的な例に示すように、concurrency::make_task 関数を使用して task_handle
オブジェクトを作成します:
// make-task-structure.cpp
// compile with: /EHsc
#include <ppl.h>
using namespace concurrency;
int wmain()
{
// Use the make_task function to define several tasks.
auto task1 = make_task([] { /*TODO: Define the task body.*/ });
auto task2 = make_task([] { /*TODO: Define the task body.*/ });
auto task3 = make_task([] { /*TODO: Define the task body.*/ });
// Create a structured task group and run the tasks concurrently.
structured_task_group tasks;
tasks.run(task1);
tasks.run(task2);
tasks.run_and_wait(task3);
}
可変個のタスクが存在する状況でタスク ハンドルを管理するには、_malloca などのスタック割り当てルーチンを使用するか、std::vector などのコンテナー クラスを使用します。
task_group
と structured_task_group
の両方でキャンセル処理がサポートされています。 取り消し処理の詳細については、PPL における取り消し処理 をご覧ください。
例
次の簡単な例では、タスク グループの操作方法を示します。 この例では、parallel_invoke
アルゴリズムを使用して、2 つのタスクを同時に実行します。 各タスクでは、サブタスクを task_group
オブジェクトに追加します。 task_group
クラスを使用した場合、複数のタスクでタスクを同時に追加できることに注意してください。
// using-task-groups.cpp
// compile with: /EHsc
#include <ppl.h>
#include <sstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// Prints a message to the console.
template<typename T>
void print_message(T t)
{
wstringstream ss;
ss << L"Message from task: " << t << endl;
wcout << ss.str();
}
int wmain()
{
// A task_group object that can be used from multiple threads.
task_group tasks;
// Concurrently add several tasks to the task_group object.
parallel_invoke(
[&] {
// Add a few tasks to the task_group object.
tasks.run([] { print_message(L"Hello"); });
tasks.run([] { print_message(42); });
},
[&] {
// Add one additional task to the task_group object.
tasks.run([] { print_message(3.14); });
}
);
// Wait for all tasks to finish.
tasks.wait();
}
この例のサンプル出力を次に示します。
Message from task: Hello
Message from task: 3.14
Message from task: 42
parallel_invoke
アルゴリズムではタスクを同時に実行するため、出力メッセージの順序が変わる可能性があります。
parallel_invoke
アルゴリズムの使用方法を示す完全な例については、「方法: 並列呼び出しを使用して並列並べ替えルーチンを記述する」および「方法: 並列呼び出しを使用して並列操作を実行する」を参照してください。 task_group
クラスを使用して非同期フューチャを実装する方法を示す完全な例については、「チュートリアル: フューチャの実装」を参照してください。
信頼性の高いプログラミング
タスク、タスク グループ、および並列アルゴリズムを使用する場合は、キャンセル処理と例外処理の役割を十分に理解しておいてください。 たとえば、並列処理ツリーでタスクを取り消すと、子タスクも実行されなくなります。 そのため、アプリケーションで重要となる操作 (リソースの解放など) が子タスクのいずれかで実行されるような場合に問題となります。 また、子タスクが例外をスローすると、その例外がオブジェクトのデストラクターを介して反映され、アプリケーションで未定義の動作が発生する可能性があります。 これらの点を示す例については、「並列パターン ライブラリに関するベスト プラクティス」の「取り消し処理および例外処理がオブジェクトの破棄に及ぼす影響について」のセクションを参照してください。 PPL でのキャンセル モデルと例外処理モデルの詳細については、「キャンセル」および「例外処理」を参照してください。
関連項目
Title | 説明 |
---|---|
方法: 並列呼び出しを使用して並列並べ替えルーチンを記述する | parallel_invoke アルゴリズムを使用して、バイトニック ソート アルゴリズムのパフォーマンスを向上させる方法について説明します。 |
方法: Parallel.Invoke を使用して並列操作を実行する | parallel_invoke アルゴリズムを使用して、共有データ ソースに対して複数の操作を実行するプログラムのパフォーマンスを向上させる方法について説明します。 |
方法: 遅延後に完了するタスクを作成する | task 、cancellation_token_source 、cancellation_token と task_completion_event クラスを使用して、遅延後に完了するタスクを作成する方法を示します。 |
チュートリアル: フューチャの実装 | コンカレンシー ランタイムの既存の機能を組み合わせて、より効果的に使用する方法を示します。 |
並列パターン ライブラリ (PPL) | 同時実行アプリケーションの開発に不可欠なプログラミング モデルを提供する PPL について説明します。 |