如何:转换使用取消的 OpenMP 循环以使用并发运行时
某些并行循环不需要执行所有迭代。 例如,搜索值的算法可以在找到值后终止。 OpenMP 不提供算法以跳出并行循环。 但是,您可以使用布尔值或标志,以允许循环的迭代指示已找到解决方案。 并发运行时提供允许一个任务取消其他尚未开始任务的功能。
本示例演示如何转换不需要运行所有迭代的 OpenMP parallel for 循环,以使用并发运行时取消机制。
示例
本示例使用 OpenMP 和并发运行时实现 std::any_of 算法的并行版本。 本示例的 OpenMP 版本使用标志协调所有满足条件的并行循环迭代。 使用并发运行时版本使用 concurrency::structured_task_group::cancel 在满足条件时停止整个操作的方法。
// concrt-omp-parallel-any-of.cpp
// compile with: /EHsc /openmp
#include <ppl.h>
#include <array>
#include <random>
#include <iostream>
using namespace concurrency;
using namespace std;
// Uses OpenMP to determine whether a condition exists in
// the specified range of elements.
template <class InIt, class Predicate>
bool omp_parallel_any_of(InIt first, InIt last, const Predicate& pr)
{
typedef typename std::iterator_traits<InIt>::value_type item_type;
// A flag that indicates that the condition exists.
bool found = false;
#pragma omp parallel for
for (int i = 0; i < static_cast<int>(last-first); ++i)
{
if (!found)
{
item_type& cur = *(first + i);
// If the element satisfies the condition, set the flag to
// cancel the operation.
if (pr(cur)) {
found = true;
}
}
}
return found;
}
// Uses the Concurrency Runtime to determine whether a condition exists in
// the specified range of elements.
template <class InIt, class Predicate>
bool concrt_parallel_any_of(InIt first, InIt last, const Predicate& pr)
{
typedef typename std::iterator_traits<InIt>::value_type item_type;
structured_task_group tasks;
// Create a predicate function that cancels the task group when
// an element satisfies the condition.
auto for_each_predicate = [&pr, &tasks](const item_type& cur) {
if (pr(cur)) {
tasks.cancel();
}
};
// Create a task that calls the predicate function in parallel on each
// element in the range.
auto task = make_task([&]() {
parallel_for_each(first, last, for_each_predicate);
});
// The condition is satisfied if the task group is in the cancelled state.
return tasks.run_and_wait(task) == canceled;
}
int wmain()
{
// The length of the array.
const size_t size = 100000;
// Create an array and initialize it with random values.
array<int, size> a;
generate(begin(a), end(a), mt19937(42));
// Search for a value in the array by using OpenMP and the Concurrency Runtime.
const int what = 9114046;
auto predicate = [what](int n) -> bool {
return (n == what);
};
wcout << L"Using OpenMP..." << endl;
if (omp_parallel_any_of(begin(a), end(a), predicate))
{
wcout << what << L" is in the array." << endl;
}
else
{
wcout << what << L" is not in the array." << endl;
}
wcout << L"Using the Concurrency Runtime..." << endl;
if (concrt_parallel_any_of(begin(a), end(a), predicate))
{
wcout << what << L" is in the array." << endl;
}
else
{
wcout << what << L" is not in the array." << endl;
}
}
该示例产生下面的输出。
Using OpenMP...
9114046 is in the array.
Using the Concurrency Runtime...
9114046 is in the array.
在使用 OpenMP 的版本中,即使设置了标志,也会执行循环的所有迭代。 另外,如果任务具有任何子任务,则标志也必须可用于这些子任务以完成通信取消。 在并发运行时中,当取消任务组时,运行时会取消整个工作树,包括子任务。 Concurrency::parallel_for_each 算法用来执行工作的任务。 因此,当循环的一个迭代取消根任务时,整个计算树也将被取消。 当取消工作树时,运行时不会开始新的任务。 但是,运行时允许已开始的任务完成。 因此,在执行 parallel_for_each 算法时,活动循环迭代可以清除它们的资源。
在本示例的两个版本中,如果数组包含多个要搜索的值的副本,则多个循环迭代可以各自同时设置结果并取消全部操作。 如果您的问题要求在满足条件时仅一个任务执行工作,则可以使用同步基元(例如临界区)。
您还可以使用异常处理取消使用 PPL 的任务。 有关取消操作的更多信息,请参见 PPL 中的取消操作。
有关 parallel_for_each 和其他并行算法的更多信息,请参见并行算法。
编译代码
将示例代码复制并将其粘贴在 Visual Studio 项目中,或将它粘贴到一个文件,名为 concrt-omp-平行的任意-of.cpp ,然后在 Visual Studio 命令提示符窗口中运行以下命令。
cl.exe /EHsc /openmp concrt-omp-parallel-any-of.cpp