并发运行时中的异常处理
并发运行时使用 C++ 异常处理来传达多种错误。 这些错误包括:无效使用运行时、无法获取资源等运行时错误,以及你提供给任务和任务组的工作函数中发生的错误。 当任务或任务组引发异常时,运行时会保存该异常并将其编组到等待任务或任务组完成的上下文。 对于轻量级任务和代理等组件,运行时不会为你管理异常。 在这些情况下,你必须实现自己的异常处理机制。 此主题描述运行时如何处理任务、任务组、轻量级任务和异步代理引发的异常,以及如何在应用程序中响应异常。
要点
当任务或任务组引发异常时,运行时会保存该异常并将其编组到等待任务或任务组完成的上下文。
如果可能,请在对 concurrency::task::get 和 concurrency::task::wait 的每个调用中加上一个
try
/catch
块,以处理可以从中恢复的错误。 如果任务发生异常并且该异常未被任务、其延续任务之一或主要应用捕获,则运行时将终止应用。基于任务的延续始终会运行;无论前面的任务是否成功完成、引发异常还是被取消。 如果前面的任务引发异常或取消,则基于值的延续不会运行。
由于基于任务的延续始终运行,因此请考虑是否在延续链的末尾添加基于任务的延续。 这有助于保证你的代码遵守所有异常。
调用 concurrency::task::get 并且该任务被取消时,运行时会引发 concurrency::task_canceled。
运行时不管理轻量级任务和代理的异常。
在本文档中
任务和延续
本节介绍运行时如何处理由 concurrency::task 对象及其延续引发的异常。 有关任务和延续模型的更多信息,请参阅任务并行。
当你在传递给 task
对象的工作函数的主体中引发异常时,运行时会存储该异常并将其编组到调用 concurrency::task::get 或 concurrency::task::wait 的上下文中。 任务并行化描述了基于任务与基于值的延续,但总而言之,基于值的延续采用 T
类型的参数,而基于任务的延续采用 task<T>
类型的参数。 如果引发的任务具有一个或多个基于值的延续,则不会安排这些延续运行。 下面的示例阐释了这种行为:
// eh-task.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
wcout << L"Running a task..." << endl;
// Create a task that throws.
auto t = create_task([]
{
throw exception();
});
// Create a continuation that prints its input value.
auto continuation = t.then([]
{
// We do not expect this task to run because
// the antecedent task threw.
wcout << L"In continuation task..." << endl;
});
// Wait for the continuation to finish and handle any
// error that occurs.
try
{
wcout << L"Waiting for tasks to finish..." << endl;
continuation.wait();
// Alternatively, call get() to produce the same result.
//continuation.get();
}
catch (const exception& e)
{
wcout << L"Caught exception." << endl;
}
}
/* Output:
Running a task...
Waiting for tasks to finish...
Caught exception.
*/
基于任务的延续使你能够处理由先前任务引发的任何异常。 基于任务的延续始终会运行;无论任务是否成功完成、引发异常还是被取消。 当任务引发异常时,其基于任务的延续将计划运行。 下面的示例显示了一个始终引发的任务。 该任务有两个延续;一个任务是基于值的,另一个任务是基于任务的。 基于任务的异常始终会运行,因此可以捕获先前任务引发的异常。 当示例等待两个延续完成时,再次引发异常,因为在调用 task::get
或 task::wait
时始终会引发任务异常。
// eh-continuations.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
wcout << L"Running a task..." << endl;
// Create a task that throws.
auto t = create_task([]() -> int
{
throw exception();
return 42;
});
//
// Attach two continuations to the task. The first continuation is
// value-based; the second is task-based.
// Value-based continuation.
auto c1 = t.then([](int n)
{
// We don't expect to get here because the antecedent
// task always throws.
wcout << L"Received " << n << L'.' << endl;
});
// Task-based continuation.
auto c2 = t.then([](task<int> previousTask)
{
// We do expect to get here because task-based continuations
// are scheduled even when the antecedent task throws.
try
{
wcout << L"Received " << previousTask.get() << L'.' << endl;
}
catch (const exception& e)
{
wcout << L"Caught exception from previous task." << endl;
}
});
// Wait for the continuations to finish.
try
{
wcout << L"Waiting for tasks to finish..." << endl;
(c1 && c2).wait();
}
catch (const exception& e)
{
wcout << L"Caught exception while waiting for all tasks to finish." << endl;
}
}
/* Output:
Running a task...
Waiting for tasks to finish...
Caught exception from previous task.
Caught exception while waiting for all tasks to finish.
*/
我们建议你使用基于任务的延续来捕获你能够处理的异常。 由于基于任务的延续始终运行,因此请考虑是否在延续链的末尾添加基于任务的延续。 这有助于保证你的代码遵守所有异常。 以下示例显示了一个基本的基于值的延续链。 链中的第三个任务引发异常,因此它后面的任何基于值的延续都不会运行。 然而,最终的延续是基于任务的,因此始终会运行。 最后的延续处理第三个任务引发的异常。
我们建议你尽可能捕获最具体的异常。 如果你没有要捕获的特定异常,则可以忽略这个最终的基于任务的延续。 任何异常都将保持未处理状态,并且可以终止应用程序。
// eh-task-chain.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
int n = 1;
create_task([n]
{
wcout << L"In first task. n = ";
wcout << n << endl;
return n * 2;
}).then([](int n)
{
wcout << L"In second task. n = ";
wcout << n << endl;
return n * 2;
}).then([](int n)
{
wcout << L"In third task. n = ";
wcout << n << endl;
// This task throws.
throw exception();
// Not reached.
return n * 2;
}).then([](int n)
{
// This continuation is not run because the previous task throws.
wcout << L"In fourth task. n = ";
wcout << n << endl;
return n * 2;
}).then([](task<int> previousTask)
{
// This continuation is run because it is task-based.
try
{
// The call to task::get rethrows the exception.
wcout << L"In final task. result = ";
wcout << previousTask.get() << endl;
}
catch (const exception&)
{
wcout << L"<exception>" << endl;
}
}).wait();
}
/* Output:
In first task. n = 1
In second task. n = 2
In third task. n = 4
In final task. result = <exception>
*/
提示
可以使用 concurrency::task_completion_event::set_exception 方法将异常与任务完成事件相关联。 任务并行性文档更详细地描述了 concurrency::task_completion_event 类。
concurrency::task_canceled 是一个重要的运行时异常类型,与 task
相关。 如果你调用 task::get
但该任务被取消,则运行时会引发 task_canceled
。 (相反,task::wait
会返回 task_status::canceled 并且不会引发)你可以通过基于任务的延续或在调用 task::get
时捕获并处理此异常。 有关任务取消的详细信息,请参阅 PPL 中的取消操作。
注意
切勿从代码中引发 task_canceled
。 改为调用 concurrency::cancel_current_task。
如果任务发生异常并且该异常未被任务、其延续任务之一或主要应用捕获,则运行时将终止应用。 如果你的应用程序崩溃,你可以将 Visual Studio 配置为在引发 C++ 异常时中断。 诊断出未处理异常的位置后,使用基于任务的延续来处理它。
本文档中的运行时引发的异常部分更详细地描述了如何处理运行时异常。
[返回页首]
任务组和并行算法
本节介绍运行时如何处理任务组引发的异常。 本节也适用于并行算法,例如 concurrency::parallel_for,因为这些算法建立在任务组之上。
注意
确保你了解异常对相关任务的影响。 有关如何对任务或并行算法使用异常处理的推荐实践,请参阅“并行模式库中的最佳实践”主题中的了解取消和异常处理如何影响对象销毁一节。
有关任务组的详细信息,请参阅任务并行。 有关并行算法的详细信息,请参阅并行算法。
在传递给 concurrency::task_group 或 concurrency::structured_task_group 对象的工作函数的主体中引发异常时,运行时会存储该异常并将其封送到调用 concurrency::task_group::wait、concurrency::structured_task_group::wait、concurrency::task_group::run_and_wait 或 concurrency::structured_task_group::run_and_wait 的上下文中。 运行时还会停止任务组中的所有活动任务(包括子任务组中的任务),并丢弃任何尚未启动的任务。
以下示例显示了引发异常的工作函数的基本结构。 该示例使用一个 task_group
对象并行打印两个 point
对象的值。 print_point
工作函数将 point
对象的值打印到控制台。 如果输入值为 NULL
,则工作函数会引发异常。 运行时存储此异常并将其封送到调用 task_group::wait
的上下文中。
// eh-task-group.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// Defines a basic point with X and Y coordinates.
struct point
{
int X;
int Y;
};
// Prints the provided point object to the console.
void print_point(point* pt)
{
// Throw an exception if the value is NULL.
if (pt == NULL)
{
throw exception("point is NULL.");
}
// Otherwise, print the values of the point.
wstringstream ss;
ss << L"X = " << pt->X << L", Y = " << pt->Y << endl;
wcout << ss.str();
}
int wmain()
{
// Create a few point objects.
point pt = {15, 30};
point* pt1 = &pt;
point* pt2 = NULL;
// Use a task group to print the values of the points.
task_group tasks;
tasks.run([&] {
print_point(pt1);
});
tasks.run([&] {
print_point(pt2);
});
// Wait for the tasks to finish. If any task throws an exception,
// the runtime marshals it to the call to wait.
try
{
tasks.wait();
}
catch (const exception& e)
{
wcerr << L"Caught exception: " << e.what() << endl;
}
}
本示例生成以下输出。
X = 15, Y = 30Caught exception: point is NULL.
有关在任务组中使用异常处理的完整示例,请参阅如何:使用异常处理中断并行循环。
[返回页首]
运行时引发的异常
调用运行时可能会导致异常。 大多数异常类型(除了 concurrency::task_canceled 和 concurrency::operation_timed_out),都表示编程错误。 这些错误通常是不可恢复的,因此不应由应用程序代码捕获或处理。 我们建议你仅在需要诊断编程错误时才在应用程序代码中捕获或处理不可恢复的错误。 但是,了解运行时定义的异常类型可以帮助你诊断编程错误。
运行时引发的异常与工作函数引发的异常的异常处理机制相同。 例如,concurrency::receive 函数如果在指定时间段内没有收到消息,则会引发 operation_timed_out
。 如果 receive
在传递给任务组的工作函数中引发异常,则运行时存储该异常并将其编组到调用 task_group::wait
、structured_task_group::wait
、task_group::run_and_wait
或 structured_task_group::run_and_wait
的上下文中。
以下示例使用 concurrency::parallel_invoke 算法并行运行两个任务。 第一个任务等待五秒钟,然后将消息发送到消息缓冲区。 第二个任务使用 receive
函数等待三秒钟以接收来自同一消息缓冲区的消息。 如果在该时间段内没有收到消息,receive
函数将引发 operation_timed_out
。
// eh-time-out.cpp
// compile with: /EHsc
#include <agents.h>
#include <ppl.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
single_assignment<int> buffer;
int result;
try
{
// Run two tasks in parallel.
parallel_invoke(
// This task waits 5 seconds and then sends a message to
// the message buffer.
[&] {
wait(5000);
send(buffer, 42);
},
// This task waits 3 seconds to receive a message.
// The receive function throws operation_timed_out if it does
// not receive a message in the specified time period.
[&] {
result = receive(buffer, 3000);
}
);
// Print the result.
wcout << L"The result is " << result << endl;
}
catch (operation_timed_out&)
{
wcout << L"The operation timed out." << endl;
}
}
本示例生成以下输出。
The operation timed out.
为防止应用程序异常终止,请确保你的代码在调用运行时时处理异常。 调用使用并发运行时的外部代码(例如第三方库)时,还要处理异常。
[返回页首]
多个异常
如果任务或并行算法接收到多个异常,则运行时仅将其中一个异常封送到调用上下文。 运行时不保证它会封送哪个异常。
以下示例使用 parallel_for
算法将数字打印到控制台。 如果输入值小于某个最小值或大于某个最大值,则会引发异常。 在此示例中,多个工作函数可以引发异常。
// eh-multiple.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
int wmain()
{
const int min = 0;
const int max = 10;
// Print values in a parallel_for loop. Use a try-catch block to
// handle any exceptions that occur in the loop.
try
{
parallel_for(-5, 20, [min,max](int i)
{
// Throw an exeception if the input value is less than the
// minimum or greater than the maximum.
// Otherwise, print the value to the console.
if (i < min)
{
stringstream ss;
ss << i << ": the value is less than the minimum.";
throw exception(ss.str().c_str());
}
else if (i > max)
{
stringstream ss;
ss << i << ": the value is greater than than the maximum.";
throw exception(ss.str().c_str());
}
else
{
wstringstream ss;
ss << i << endl;
wcout << ss.str();
}
});
}
catch (exception& e)
{
// Print the error to the console.
wcerr << L"Caught exception: " << e.what() << endl;
}
}
以下显示了此示例的输出样本。
8293104567Caught exception: -5: the value is less than the minimum.
[返回页首]
取消
并非所有异常都表示错误。 例如,搜索算法会在找到结果时可能会使用异常处理来停止其关联的任务。 有关如何在代码中使用取消机制的更多信息,请参阅 PPL 中的取消。
[返回页首]
轻量级任务
轻量级任务是直接从 concurrency::Scheduler 对象计划的任务。 轻量级任务的开销比普通任务少。 但是,运行时不会捕获轻量级任务引发的异常。 相反,异常将被未经处理的异常处理程序捕获,默认情况下会终止进程。 因此,在你的应用程序中使用适当的错误处理机制。 有关轻量级任务的详细信息,请参阅任务计划程序。
[返回页首]
异步代理
与轻量级任务一样,运行时不管理由异步代理引发的异常。
以下示例显示了一种处理派生自 concurrency::agent 的类中的异常的方法。 这个例子定义了 points_agent
类。 points_agent::run
方法从消息缓冲区中读取 point
对象并将它们打印到控制台。 如果 run
方法接收到 NULL
指针,则会引发异常。
run
方法的所有工作都是围绕一个 try
-catch
块。 catch
块将异常存储在消息缓冲区中。 应用程序通过在代理完成后读取此缓冲区来检查代理是否遇到错误。
// eh-agents.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Defines a point with x and y coordinates.
struct point
{
int X;
int Y;
};
// Informs the agent to end processing.
point sentinel = {0,0};
// An agent that prints point objects to the console.
class point_agent : public agent
{
public:
explicit point_agent(unbounded_buffer<point*>& points)
: _points(points)
{
}
// Retrieves any exception that occurred in the agent.
bool get_error(exception& e)
{
return try_receive(_error, e);
}
protected:
// Performs the work of the agent.
void run()
{
// Perform processing in a try block.
try
{
// Read from the buffer until we reach the sentinel value.
while (true)
{
// Read a value from the message buffer.
point* r = receive(_points);
// In this example, it is an error to receive a
// NULL point pointer. In this case, throw an exception.
if (r == NULL)
{
throw exception("point must not be NULL");
}
// Break from the loop if we receive the
// sentinel value.
else if (r == &sentinel)
{
break;
}
// Otherwise, do something with the point.
else
{
// Print the point to the console.
wcout << L"X: " << r->X << L" Y: " << r->Y << endl;
}
}
}
// Store the error in the message buffer.
catch (exception& e)
{
send(_error, e);
}
// Set the agent status to done.
done();
}
private:
// A message buffer that receives point objects.
unbounded_buffer<point*>& _points;
// A message buffer that stores error information.
single_assignment<exception> _error;
};
int wmain()
{
// Create a message buffer so that we can communicate with
// the agent.
unbounded_buffer<point*> buffer;
// Create and start a point_agent object.
point_agent a(buffer);
a.start();
// Send several points to the agent.
point r1 = {10, 20};
point r2 = {20, 30};
point r3 = {30, 40};
send(buffer, &r1);
send(buffer, &r2);
// To illustrate exception handling, send the NULL pointer to the agent.
send(buffer, reinterpret_cast<point*>(NULL));
send(buffer, &r3);
send(buffer, &sentinel);
// Wait for the agent to finish.
agent::wait(&a);
// Check whether the agent encountered an error.
exception e;
if (a.get_error(e))
{
cout << "error occurred in agent: " << e.what() << endl;
}
// Print out agent status.
wcout << L"the status of the agent is: ";
switch (a.status())
{
case agent_created:
wcout << L"created";
break;
case agent_runnable:
wcout << L"runnable";
break;
case agent_started:
wcout << L"started";
break;
case agent_done:
wcout << L"done";
break;
case agent_canceled:
wcout << L"canceled";
break;
default:
wcout << L"unknown";
break;
}
wcout << endl;
}
本示例生成以下输出。
X: 10 Y: 20
X: 20 Y: 30
error occurred in agent: point must not be NULL
the status of the agent is: done
由于 try
-catch
块存在于 while
循环之外,因此代理在遇到第一个错误时结束处理。 如果 try
-catch
块在 while
循环内,则代理将在发生错误后继续。
此示例将异常存储在消息缓冲区中,以便另一个组件可以在代理运行时监控代理是否存在错误。 此示例使用 concurrency::single_assignment 对象来存储错误。 在代理处理多个异常的情况下,single_assignment
类仅存储传递给它的第一条消息。 要仅存储最后一个异常,请使用 concurrency::overwrite_buffer 类。 要存储所有异常,请使用 concurrency::unbounded_buffer 类。 有关这些消息块的更多信息,请参阅异步消息块。
有关异步代理的详细信息,请参阅异步代理。
[返回页首]
总结
[返回页首]