并发运行时中的异常处理

并发运行时使用 C++ 异常处理来告知多种错误。 这些错误包括使用的运行时无效、诸如无法获取资源之类的运行时错误,以及发生在提供给任务和任务组的工作函数中的错误。 当任务或任务组引发异常时,运行时将保存该异常,并将其封送到等待任务或任务组完成的上下文。 对于诸如轻量级任务和代理之类的组件,运行时不会为您管理异常。 在这些情况下,您必须实现自己的异常处理机制。 本主题描述运行时如何处理任务、任务组、轻量级任务和异步代理引发的异常,以及如何在应用程序中响应异常。

关键点

  • 当任务或任务组引发异常时,运行时将保存该异常,并将其封送到等待任务或任务组完成的上下文。

  • 如果可能,请包含每调用 concurrency::task::getconcurrency::task::wait 使用 try/catch 块处理可恢复的错误。 运行时终止应用,则任务引发异常,该异常不会被捕捉,延续任务之一或主应用。

  • 基于任务的延续始终运行;非关键成功完成前面的任务,是否引发了异常,也未取消。 如果前面的任务引发或移除,基于值的延续将不运行。

  • 由于基于任务的延续始终运行,则考虑添加基于任务的延续在继续字符串结尾。 这有助于确保代码观察到所有异常。

  • 运行时引发 concurrency::task_canceled,当调用 concurrency::task::get 时,该任务将取消。

  • 运行时不会管理轻量级任务和代理之类的异常。

在本文档中

  • 任务和继续

  • 任务组和并行算法

  • 运行时引发的异常

  • 多个异常

  • 取消

  • 轻量级任务

  • 异步代理

任务和继续

本节描述运行时如何处理由 concurrency::task 对象及其延续引发的异常。 有关任务和继续模型的更多信息,请参见任务并行(并发运行时).

当您在传递给task 对象的工作函数体中引发异常时,运行时存储该异常,并将该异常封送到调用concurrency::task::getconcurrency::task::wait的上下文。 任务并行(并发运行时) 文档描述基于任务与基于的值,继续,汇总,但基于值继续接受类型 T 的参数,并根据任务中继续一个采用 **task<T>**类型的参数。 如果引发的任务具有一个或多个基于值的后续,这些延续不计划运行。 下面的示例阐释了这一点:

// eh-task.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    wcout << L"Running a task..." << endl;
    // Create a task that throws.
    auto t = create_task([]
    {
        throw exception();
    });

    // Create a continuation that prints its input value.
    auto continuation = t.then([]
    {
        // We do not expect this task to run because 
        // the antecedent task threw.
        wcout << L"In continuation task..." << endl;
    });

    // Wait for the continuation to finish and handle any  
    // error that occurs. 
    try
    {
        wcout << L"Waiting for tasks to finish..." << endl;
        continuation.wait();

        // Alternatively, call get() to produce the same result. 
        //continuation.get();
    }
    catch (const exception& e)
    {
        wcout << L"Caught exception." << endl;
    }
}
/* Output:
    Running a task...
    Waiting for tasks to finish...
    Caught exception.
*/

基于任务的延续可以处理由前面的任务引发的任何异常。 基于任务的延续始终运行;非关键成功完成的任务,是否引发了异常,也未取消。 当任务引发异常时,它基于任务的计划继续运行。 下面的示例演示引发始终的任务。 有两个任务继续;一个值,此文件基于并其他任务开始。 基于任务的 Exception 始终运行,可以捕获由前面的任务引发的异常。 当两继续示例等待完成后,再次引发某个异常,这是因为任务异常始终引发,当调用 task::gettask::wait 时。

// eh-continuations.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{    
    wcout << L"Running a task..." << endl;
    // Create a task that throws.
    auto t = create_task([]() -> int
    {
        throw exception();
        return 42;
    });

    // 
    // Attach two continuations to the task. The first continuation is   
    // value-based; the second is task-based. 

    // Value-based continuation.
    auto c1 = t.then([](int n)
    {
        // We don't expect to get here because the antecedent  
        // task always throws.
        wcout << L"Received " << n << L'.' << endl;
    });

    // Task-based continuation.
    auto c2 = t.then([](task<int> previousTask)
    {
        // We do expect to get here because task-based continuations 
        // are scheduled even when the antecedent task throws. 
        try
        {
            wcout << L"Received " << previousTask.get() << L'.' << endl;
        }
        catch (const exception& e)
        {
            wcout << L"Caught exception from previous task." << endl;
        }
    });

    // Wait for the continuations to finish. 
    try
    {
        wcout << L"Waiting for tasks to finish..." << endl;
        (c1 && c2).wait();
    }
    catch (const exception& e)
    {
        wcout << L"Caught exception while waiting for all tasks to finish." << endl;
    }
}
/* Output:
    Running a task...
    Waiting for tasks to finish...
    Caught exception from previous task.
    Caught exception while waiting for all tasks to finish.
*/

建议使用基于任务延续的异常捕捉您要处理的文件。 由于基于任务的延续始终运行,则考虑添加基于任务的延续在继续字符串结尾。 这有助于确保代码观察到所有异常。 下面的示例演示一个简单的基于的值继续字符串。 链中引发的第三项任务,并且其后的任何基于值的延续将不会运行。 但是,在最终延续任务开始,并始终运行。 此最终继续由第三个任务引发的异常。

建议捕捉您的最特定的异常。 如果没有捕获特定的异常,则可以忽略此最终基于任务中继续。 所有异常都将保持未处理并且可以终止应用。

// eh-task-chain.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    int n = 1;
    create_task([n]
    {
        wcout << L"In first task. n = ";
        wcout << n << endl;

        return n * 2;

    }).then([](int n)
    {
        wcout << L"In second task. n = ";
        wcout << n << endl;

        return n * 2;

    }).then([](int n)
    {
        wcout << L"In third task. n = ";
        wcout << n << endl;

        // This task throws. 
        throw exception();
        // Not reached. 
        return n * 2;

    }).then([](int n)
    {
        // This continuation is not run because the previous task throws.
        wcout << L"In fourth task. n = ";
        wcout << n << endl;

        return n * 2;

    }).then([](task<int> previousTask)
    {
        // This continuation is run because it is value-based. 
        try
        {
            // The call to task::get rethrows the exception.
            wcout << L"In final task. result = ";
            wcout << previousTask.get() << endl;
        }
        catch (const exception&)
        {
            wcout << L"<exception>" << endl;
        }
    }).wait();
}
/* Output:
    In first task. n = 1
    In second task. n = 2
    In third task. n = 4
    In final task. result = <exception>
*/

提示

可以使用 concurrency::task_completion_event::set_exception 方法关联的异常处理任务完成事件。文档 任务并行(并发运行时) 更详细地介绍。concurrency::task_completion_event 类。

concurrency::task_canceled 是与 task的一个重要运行时异常类型。 运行时引发 task_canceled,当您调用 task::get 时,取消该任务。(反之,task_status::canceled task::wait 返回,且不会引发。)您可以捕获和处理从基于任务的延续的此异常时,或者当调用 task::get。 有关任务取消的更多信息,请参见 PPL 中的取消操作

警告

请勿从代码抛出 task_canceled。调用 concurrency::cancel_current_task

运行时终止应用,则任务引发异常,该异常不会被捕捉,延续任务之一或主应用。 如果应用程序崩溃,可以配置 Visual Studio,在 C++ 异常时中断。 在诊断未经处理的异常的位置后,请使用基于任务的后续处理它。

文档中的 运行时引发的异常 一节描述如何更详细地使用运行时异常一起使用。

[Top]

任务组和并行算法

本节描述运行时如何处理任务组引发的异常。 本节还适用于 concurrency::parallel_for 等并行算法,因为这些算法建立在任务组基础之上。

警告

确保您了解异常对于依赖任务的影响。有关如何对任务或并行算法使用异常处理的推荐做法,请参见“并行模式库中的最佳做法”主题中的Understand how Cancellation and Exception Handling Affect Object Destruction 一节。

有关任务组的更多信息,请参见任务并行(并发运行时)。 有关并行算法的更多信息,请参见并行算法

在传递给 concurrency::task_groupconcurrency::structured_task_group 对象的工作函数的主体中引发异常时,运行时将存储该异常,并将其封送到调用 concurrency::task_group::waitconcurrency::structured_task_group::waitconcurrency::task_group::run_and_waitconcurrency::structured_task_group::run_and_wait 的上下文。 运行时还将停止任务组中的所有活动任务(包括子任务组中的任务),并放弃任何尚未开始的任务。

下面的示例演示了引发异常的工作函数的基本结构。 该示例使用 task_group 对象并行打印两个 point 对象的值。 print_point 工作函数会将 point 对象的值打印到控制台。 如果输入值为 NULL,则该工作函数将引发异常。 运行时将存储此异常,并将其封送到调用 task_group::wait 的上下文。

// eh-task-group.cpp 
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// Defines a basic point with X and Y coordinates. 
struct point
{
   int X;
   int Y;
};

// Prints the provided point object to the console. 
void print_point(point* pt)
{
   // Throw an exception if the value is NULL. 
   if (pt == NULL)
   {
      throw exception("point is NULL.");
   }

   // Otherwise, print the values of the point.
   wstringstream ss;
   ss << L"X = " << pt->X << L", Y = " << pt->Y << endl;
   wcout << ss.str();
}

int wmain()
{
   // Create a few point objects.
   point pt = {15, 30};
   point* pt1 = &pt;
   point* pt2 = NULL;

   // Use a task group to print the values of the points.
   task_group tasks;

   tasks.run([&] {
      print_point(pt1);
   });

   tasks.run([&] {
      print_point(pt2);
   });

   // Wait for the tasks to finish. If any task throws an exception, 
   // the runtime marshals it to the call to wait. 
   try
   {
      tasks.wait();
   }
   catch (const exception& e)
   {
      wcerr << L"Caught exception: " << e.what() << endl;
   }
}

该示例产生下面的输出。

  

有关在任务组中使用异常处理的完整示例,请参阅如何:使用异常处理中断 Parallel 循环

[Top]

运行时引发的异常

异常可能由调用该运行时。 大多数异常类型,除和 concurrency::task_canceled concurrency::operation_timed_out,指示程序错误。 由于这些错误通常无法恢复,因此不应由应用程序代码来捕获或处理。 我们建议您在需要诊断编程错误时仅捕获或处理应用程序代码中不可恢复的错误。 但是,了解运行时所定义的异常类型可以帮助您诊断编程错误。

对于运行时引发的异常以及工作函数引发的异常,异常处理机制是相同的。 例如,如果 concurrency::receive 函数在指定的时间段内未接收消息,它将引发 operation_timed_out。 如果 receive 在您传递给任务组的工作函数中引发异常,则运行时将存储该异常,并将其封送到调用 task_group::waitstructured_task_group::waittask_group::run_and_waitstructured_task_group::run_and_wait 的上下文。

下面的示例使用 concurrency::parallel_invoke 算法以并行方式运行两个任务。 第一个任务等待五秒钟,然后将消息发送到消息缓冲区。 第二个任务使用 receive 函数等待三秒钟,以从相同的消息缓冲区中接收消息。 如果 receive 函数在该时间段中未收到消息,则它将引发 operation_timed_out

// eh-time-out.cpp 
// compile with: /EHsc
#include <agents.h>
#include <ppl.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   single_assignment<int> buffer;
   int result;

   try
   {
      // Run two tasks in parallel.
      parallel_invoke(
         // This task waits 5 seconds and then sends a message to  
         // the message buffer.
         [&] {
            wait(5000); 
            send(buffer, 42);
         },
         // This task waits 3 seconds to receive a message. 
         // The receive function throws operation_timed_out if it does  
         // not receive a message in the specified time period.
         [&] {
            result = receive(buffer, 3000);
         }
      );

      // Print the result.
      wcout << L"The result is " << result << endl;
   }
   catch (operation_timed_out&)
   {
      wcout << L"The operation timed out." << endl;
   }
}

该示例产生下面的输出。

  

若要阻止应用程序异常终止,请确保代码在调入运行时的时候对异常进行处理。 此外,在调入使用并发运行时的外部代码(如第三方库)时,也要处理异常。

[Top]

多个异常

如果任务或并行算法收到多个异常,则运行时仅将这些异常之一封送到调用上下文。 运行时不能保证它将封送哪个异常。

下面的示例使用 parallel_for 算法将数字打印到控制台。 如果输入值小于某最小值或大于某最大值,则它将引发异常。 在本示例中,多个工作函数可能会引发异常。

// eh-multiple.cpp 
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

int wmain()
{
   const int min = 0;
   const int max = 10;

   // Print values in a parallel_for loop. Use a try-catch block to  
   // handle any exceptions that occur in the loop. 
   try
   {
      parallel_for(-5, 20, [min,max](int i)
      {
         // Throw an exeception if the input value is less than the  
         // minimum or greater than the maximum. 

         // Otherwise, print the value to the console. 

         if (i < min)
         {
            stringstream ss;
            ss << i << ": the value is less than the minimum.";
            throw exception(ss.str().c_str());
         }
         else if (i > max)
         {
            stringstream ss;
            ss << i << ": the value is greater than than the maximum.";
            throw exception(ss.str().c_str());
         }
         else
         {
            wstringstream ss;
            ss << i << endl;
            wcout << ss.str();
         }
      });
   }
   catch (exception& e)
   {
      // Print the error to the console.
      wcerr << L"Caught exception: " << e.what() << endl;
   }  
}

以下显示的是此示例的示例输出。

  

[Top]

取消

不是所有异常都表示错误。 例如,搜索算法在找到结果时可能会使用异常处理来停止其关联任务。 有关如何在代码中使用取消机制的更多信息,请参见 PPL 中的取消操作

[Top]

轻量级任务

轻量级任务是直接从 concurrency::Scheduler 对象计划的任务。 轻量级任务所需的开销比普通任务小。 但是,轻量级任务引发的异常并不是由运行时来捕获, 而是由未经处理的异常处理程序来捕获,默认情况下该程序会终止进程。 因此,请在应用程序中使用适当的错误处理机制。 有关轻量级任务的更多信息,请参见任务计划程序(并发运行时)

[Top]

异步代理

与轻量级任务相似,运行时不会管理异步代理引发的异常。

下面的示例演示一种处理派生自 concurrency::agent 的类中异常的方法。 此示例定义了 points_agent 类。 points_agent::run 方法将读取消息缓冲区中的 point 对象,并将它们打印到控制台。 如果 run 方法收到 NULL 指针,则它将引发异常。

run 方法会将所有工作包含在 try-catch 块中。 catch 块将异常存储在消息缓冲区中。 在代理完成之后,应用程序通过从此缓冲区中进行读取来检查代理是否遇到了错误。

// eh-agents.cpp 
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Defines a point with x and y coordinates. 
struct point
{
   int X;
   int Y;
};

// Informs the agent to end processing.
point sentinel = {0,0};

// An agent that prints point objects to the console. 
class point_agent : public agent
{
public:
   explicit point_agent(unbounded_buffer<point*>& points)
      : _points(points)
   { 
   }

   // Retrieves any exception that occurred in the agent. 
   bool get_error(exception& e)
   {
      return try_receive(_error, e);
   }

protected:
   // Performs the work of the agent. 
   void run()
   {
      // Perform processing in a try block. 
      try
      {
         // Read from the buffer until we reach the sentinel value. 
         while (true)
         {
            // Read a value from the message buffer.
            point* r = receive(_points);

            // In this example, it is an error to receive a  
            // NULL point pointer. In this case, throw an exception. 
            if (r == NULL)
            {
               throw exception("point must not be NULL");
            }
            // Break from the loop if we receive the  
            // sentinel value. 
            else if (r == &sentinel)
            {
               break;
            }
            // Otherwise, do something with the point. 
            else
            {
               // Print the point to the console.
               wcout << L"X: " << r->X << L" Y: " << r->Y << endl;
            }
         }
      }
      // Store the error in the message buffer. 
      catch (exception& e)
      {
         send(_error, e);
      }

      // Set the agent status to done.
      done();
   }

private:
   // A message buffer that receives point objects.
   unbounded_buffer<point*>& _points;

   // A message buffer that stores error information.
   single_assignment<exception> _error;
};

int wmain()
{  
   // Create a message buffer so that we can communicate with 
   // the agent.
   unbounded_buffer<point*> buffer;

   // Create and start a point_agent object.
   point_agent a(buffer);
   a.start();

   // Send several points to the agent.
   point r1 = {10, 20};
   point r2 = {20, 30};
   point r3 = {30, 40};

   send(buffer, &r1);
   send(buffer, &r2);
   // To illustrate exception handling, send the NULL pointer to the agent.
   send(buffer, reinterpret_cast<point*>(NULL));
   send(buffer, &r3);
   send(buffer, &sentinel);

   // Wait for the agent to finish.
   agent::wait(&a);

   // Check whether the agent encountered an error.
   exception e;
   if (a.get_error(e))
   {
      cout << "error occurred in agent: " << e.what() << endl;
   }

   // Print out agent status.
   wcout << L"the status of the agent is: ";
   switch (a.status())
   {
   case agent_created:
      wcout << L"created";
      break;
   case agent_runnable:
      wcout << L"runnable";
      break;
   case agent_started:
      wcout << L"started";
      break;
   case agent_done:
      wcout << L"done";
      break;
   case agent_canceled:
      wcout << L"canceled";
      break;
   default:
      wcout << L"unknown";
      break;
   }
   wcout << endl;
}

该示例产生下面的输出。

  

因为 try-catch 块存在于 while 循环之外,所以代理在遇到第一个错误时将终止处理。 如果 try-catch 块在 while 循环内,则在出错后代理将继续处理。

本示例将异常存储在消息缓冲区中,以便另一个组件可以在代理运行时监视代理是否出错。 此示例使用 concurrency::single_assignment 对象存储错误。 如果代理处理多个异常,则 single_assignment 类只存储传递给它的第一条消息。 若要只存储最后一个异常,请使用 concurrency::overwrite_buffer 类。 若要存储所有异常,请使用 concurrency::unbounded_buffer 类。 有关这些消息块的更多信息,请参见异步消息块

有关异步代理的更多信息,请参见异步代理

[Top]

摘要

[Top]

请参见

概念

并发运行时

任务并行(并发运行时)

并行算法

PPL 中的取消操作

任务计划程序(并发运行时)

异步代理