コンカレンシー ランタイムでの例外処理

コンカレンシー ランタイムは、C++ 例外処理を使用してさまざまなエラーを通知します。 そのエラーには、ランタイムの不適切な使用、リソースの取得の失敗などのランタイム エラー、タスクおよびタスク グループに提供した処理関数で生じるエラーなどがあります。 タスクまたはタスク グループによって例外がスローされると、ランタイムはその例外を保持し、タスクまたはタスク グループの完了を待機するコンテキストにマーシャリングします。 軽量タスクやエージェントなどのコンポーネントの例外は、ランタイムによって自動的には管理されません。 そのため、独自の例外処理機構を実装する必要があります。 このトピックでは、タスク、タスク グループ、軽量タスク、および非同期エージェントによってスローされた例外をランタイムが処理するしくみと、アプリケーションで例外に応答する方法を説明します。

重要なポイント

  • タスクまたはタスク グループによって例外がスローされると、ランタイムはその例外を保持し、タスクまたはタスク グループの完了を待機するコンテキストにマーシャリングします。

  • 可能であれば、回復できるエラーを処理するために、concurrency::task::get および concurrency::task::wait の各呼び出しを try/catch ブロックで囲みます。 タスクが例外をスローし、その例外がそのタスク、その継続の 1 つ、またはメイン アプリケーションによってキャッチされない場合、ランタイムはアプリケーションを終了します。

  • タスク ベースの継続は常に実行されます。継続元タスクが正常に完了したかどうか、例外をスローしたかどうか、または取り消されたかどうかは、関係ありません。 値ベースの継続は、継続元タスクがスローまたは取り消すと、実行されません。

  • タスク ベースの継続は常に実行されるため、継続チェーンの末尾にタスク ベースの継続を追加することを検討してください。 これにより、コードですべての例外が確認されることを保証できます。

  • concurrency::task::get を呼び出し、そのタスクが取り消されると、ランタイムは concurrency::task_canceled をスローします。

  • ランタイムは、軽量タスクと軽量エージェントの例外を管理しません。

目次

タスクと継続

このセクションでは、concurrency::task オブジェクトおよびその継続によってスローされた例外をランタイムが処理するしくみについて説明します。 タスクおよび継続モデルの詳細については、タスクの並列処理に関する記事を参照してください。

task オブジェクトに渡す処理関数の本体で例外をスローすると、ランタイムはその例外を保存し、concurrency::task::get または concurrency::task::wait を呼び出すコンテキストにその例外をマーシャリングします。 タスクの並列処理に関するドキュメントでは、タスクベースの継続と値ベースの継続について説明しています。要約すると、値ベースの継続は T 型のパラメーターを受け取り、タスクベースの継続は task<T> 型のパラメーターを受け取ります。 スローするタスクに 1 つ以上の値ベースの継続がある場合、それらの継続を実行するスケジュールは設定されません。 この動作を次の例に示します。

// eh-task.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    wcout << L"Running a task..." << endl;
    // Create a task that throws.
    auto t = create_task([]
    {
        throw exception();
    });
    
    // Create a continuation that prints its input value.
    auto continuation = t.then([]
    {
        // We do not expect this task to run because
        // the antecedent task threw.
        wcout << L"In continuation task..." << endl;
    });

    // Wait for the continuation to finish and handle any 
    // error that occurs.
    try
    {
        wcout << L"Waiting for tasks to finish..." << endl;
        continuation.wait();

        // Alternatively, call get() to produce the same result.
        //continuation.get();
    }
    catch (const exception& e)
    {
        wcout << L"Caught exception." << endl;
    }
}
/* Output:
    Running a task...
    Waiting for tasks to finish...
    Caught exception.
*/

タスク ベースの継続では、継続元タスクによってスローされるすべての例外を処理することができます。 タスク ベースの継続は常に実行されます。そのタスクが正常に完了したかどうか、例外をスローしたかどうか、または取り消されたかどうかは、関係ありません。 タスクが例外をスローする場合、タスク ベースの継続を実行するようにスケジュールが設定されます。 次の例は、常にスローするタスクを示しています。 タスクには 2 つの継続があります。1 つが値ベースで他方がタスク ベースです。 タスク ベースの例外は、常に実行されるため、継続元タスクによってスローされる例外をキャッチすることができます。 task::get または task::wait が呼び出されると、タスクの例外が常にスローされます。そのため、例が両方の継続の完了を待機しているときに、例外が再度スローされます。

// eh-continuations.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{    
    wcout << L"Running a task..." << endl;
    // Create a task that throws.
    auto t = create_task([]() -> int
    {
        throw exception();
        return 42;
    });

    //
    // Attach two continuations to the task. The first continuation is  
    // value-based; the second is task-based.

    // Value-based continuation.
    auto c1 = t.then([](int n)
    {
        // We don't expect to get here because the antecedent 
        // task always throws.
        wcout << L"Received " << n << L'.' << endl;
    });

    // Task-based continuation.
    auto c2 = t.then([](task<int> previousTask)
    {
        // We do expect to get here because task-based continuations
        // are scheduled even when the antecedent task throws.
        try
        {
            wcout << L"Received " << previousTask.get() << L'.' << endl;
        }
        catch (const exception& e)
        {
            wcout << L"Caught exception from previous task." << endl;
        }
    });

    // Wait for the continuations to finish.
    try
    {
        wcout << L"Waiting for tasks to finish..." << endl;
        (c1 && c2).wait();
    }
    catch (const exception& e)
    {
        wcout << L"Caught exception while waiting for all tasks to finish." << endl;
    }
}
/* Output:
    Running a task...
    Waiting for tasks to finish...
    Caught exception from previous task.
    Caught exception while waiting for all tasks to finish.
*/

処理できる例外をキャッチするために、タスク ベースの継続を使用することをお勧めします。 タスク ベースの継続は常に実行されるため、継続チェーンの末尾にタスク ベースの継続を追加することを検討してください。 これにより、コードですべての例外が確認されることを保証できます。 次の例は、基本的な値ベースの継続チェーンを示しています。 このチェーン スローの 3 番目のタスクおよび後続の値ベースの継続は、実行されません。 ただし、最後の継続は、タスク ベースであるため常に実行されます。 この最後の継続は、3 番目のタスクによってスローされる例外を処理します。

できるだけ具体性の高い例外をキャッチすることをお勧めします。 キャッチする特定の例外がない場合は、この最後のタスクベースの継続を省略できます。 この場合、すべての例外が未処理になり、アプリケーションが停止する可能性があります。

// eh-task-chain.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    int n = 1;
    create_task([n]
    {
        wcout << L"In first task. n = ";
        wcout << n << endl;
        
        return n * 2;

    }).then([](int n)
    {
        wcout << L"In second task. n = ";
        wcout << n << endl;

        return n * 2;

    }).then([](int n)
    {
        wcout << L"In third task. n = ";
        wcout << n << endl;

        // This task throws.
        throw exception();
        // Not reached.
        return n * 2;

    }).then([](int n)
    {
        // This continuation is not run because the previous task throws.
        wcout << L"In fourth task. n = ";
        wcout << n << endl;

        return n * 2;

    }).then([](task<int> previousTask)
    {
        // This continuation is run because it is task-based.
        try
        {
            // The call to task::get rethrows the exception.
            wcout << L"In final task. result = ";
            wcout << previousTask.get() << endl;
        }
        catch (const exception&)
        {
            wcout << L"<exception>" << endl;
        }
    }).wait();
}
/* Output:
    In first task. n = 1
    In second task. n = 2
    In third task. n = 4
    In final task. result = <exception>
*/

ヒント

concurrency::task_completion_event::set_exception メソッドを使用して、例外をタスク完了イベントに関連付けることができます。 タスクの並列処理に関するドキュメントでは、concurrency::task_completion_event クラスについて詳しく説明しています。

concurrency::task_canceled は、task に関連する重要なランタイム例外の型です。 task_canceled が呼び出され、そのタスクが取り消された場合、ランライムは task::get をスローします (逆に、 task::wait task_status::canceled を返し、スローしません)。この例外は、タスクベースの継続から、または呼び出task::getすときにキャッチして処理できます。 タスクの取り消しの詳細については、「PPL における取り消し処理」を参照してください。

注意事項

コードから task_canceled をスローしないでください。 代わりに concurrency::cancel_current_task を呼び出してください。

タスクが例外をスローし、その例外がそのタスク、その継続の 1 つ、またはメイン アプリケーションによってキャッチされない場合、ランタイムはアプリケーションを終了します。 アプリケーションがクラッシュした場合、C++ の例外がスローされると中断するように Visual Studio を構成できます。 未処理の例外の場所を診断したら、タスク ベースの継続を使用してそれを処理します。

ランタイム例外を処理する方法については、このドキュメントの「ランタイムによってスローされる例外」で詳しく説明します。

[トップ]

タスク グループと並列アルゴリズム

ここでは、タスク グループによってスローされた例外をランタイムが処理するしくみについて説明します。 このセクションの内容は、concurrency::parallel_for をはじめとする並列アルゴリズムにも当てはまります。これらのアルゴリズムはタスク グループに基づいて作成されているためです。

注意事項

例外が依存タスクに及ぼす影響を十分に理解しておいてください。 タスクまたは並列アルゴリズムによる例外処理を使用する方法に関する推奨されるプラクティスについては、「並列パターン ライブラリに関するベスト プラクティス」の「取り消し処理および例外処理がオブジェクトの破棄に及ぼす影響について」を参照してください。

タスク グループの詳細については、タスクの並列処理に関する記事を参照してください。 並列アルゴリズムの詳細については、「並列アルゴリズム」を参照してください。

concurrency::task_group オブジェクトまたは concurrency::structured_task_group オブジェクトに渡す処理関数の本体で例外をスローすると、ランタイムはその例外を保存し、concurrency::task_group::waitconcurrency::structured_task_group::waitconcurrency::task_group::run_and_wait、または concurrency::structured_task_group::run_and_wait を呼び出すコンテキストにその例外をマーシャリングします。 また、ランタイムは、タスク グループ内のすべてのアクティブ タスク (子タスク グループ内のタスクも含む) を中止すると共に、開始されていないすべてのタスクを破棄します。

次の例は、例外をスローする処理関数の基本的な構造を示しています。 この例では、task_group オブジェクトを使用して 2 つの point オブジェクトの値を並列的に出力します。 print_point 処理関数は point オブジェクトの値をコンソールに出力します。 入力値が NULL の場合、処理関数は例外をスローします。 ランタイムはこの例外を保存し、task_group::wait を呼び出すコンテキストにその例外をマーシャリングします。

// eh-task-group.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// Defines a basic point with X and Y coordinates.
struct point
{
   int X;
   int Y;
};

// Prints the provided point object to the console.
void print_point(point* pt)
{
   // Throw an exception if the value is NULL.
   if (pt == NULL)
   {
      throw exception("point is NULL.");
   }

   // Otherwise, print the values of the point.
   wstringstream ss;
   ss << L"X = " << pt->X << L", Y = " << pt->Y << endl;
   wcout << ss.str();
}

int wmain()
{
   // Create a few point objects.
   point pt = {15, 30};
   point* pt1 = &pt;
   point* pt2 = NULL;

   // Use a task group to print the values of the points.
   task_group tasks;

   tasks.run([&] {
      print_point(pt1);
   });

   tasks.run([&] {
      print_point(pt2);
   });

   // Wait for the tasks to finish. If any task throws an exception,
   // the runtime marshals it to the call to wait.
   try
   {
      tasks.wait();
   }
   catch (const exception& e)
   {
      wcerr << L"Caught exception: " << e.what() << endl;
   }
}

この例を実行すると、次の出力が生成されます。

X = 15, Y = 30Caught exception: point is NULL.

タスク グループで例外処理を使用する完全な例については、「方法: 例外処理を使用して並列ループを中断する」を参照してください。

[トップ]

ランタイムによってスローされる例外

ランタイムの呼び出しから例外が発生する可能性があります。 concurrency::task_canceledconcurrency::operation_timed_out を除き、例外のほとんどの型は、プログラミング エラーを示します。 一般にこれらのエラーは回復不能であるため、アプリケーション コードではキャッチまたは処理できません。 プログラミング エラーを診断する必要がある場合にのみ、回復不能なエラーをアプリケーション コードでキャッチまたは処理することをお勧めします。 ただし、ランタイムで定義されている例外の種類を把握しておけば、プログラミング エラーを診断するときに役立ちます。

ランタイムによってスローされる例外の例外処理機構は、処理関数によってスローされる例外の例外処理機構と同じです。 たとえば、concurrency::receive 関数は、指定された時間内にメッセージを受信しなかった場合に operation_timed_out をスローします。 タスク グループに渡す処理関数で receive が例外をスローすると、ランタイムはその例外を保存して、task_group::waitstructured_task_group::waittask_group::run_and_wait、または structured_task_group::run_and_wait を呼び出すコンテキストにその例外をマーシャリングします。

次の例では、concurrency::parallel_invoke アルゴリズムを使用して、2 つのタスクを並列に実行します。 1 つ目のタスクは 5 秒間待機した後、メッセージをメッセージ バッファーに送信します。 2 つ目のタスクは receive 関数を使用して 3 秒間待機し、同じメッセージ バッファーからメッセージを受信します。 receive 関数は、時間内にメッセージを受信しなかった場合、operation_timed_out をスローします。

// eh-time-out.cpp
// compile with: /EHsc
#include <agents.h>
#include <ppl.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   single_assignment<int> buffer;
   int result;

   try
   {
      // Run two tasks in parallel.
      parallel_invoke(
         // This task waits 5 seconds and then sends a message to 
         // the message buffer.
         [&] {
            wait(5000); 
            send(buffer, 42);
         },
         // This task waits 3 seconds to receive a message.
         // The receive function throws operation_timed_out if it does 
         // not receive a message in the specified time period.
         [&] {
            result = receive(buffer, 3000);
         }
      );

      // Print the result.
      wcout << L"The result is " << result << endl;
   }
   catch (operation_timed_out&)
   {
      wcout << L"The operation timed out." << endl;
   }
}

この例を実行すると、次の出力が生成されます。

The operation timed out.

アプリケーションの異常終了を防ぐために、コードでランタイムを呼び出す場合は例外が処理されるようにしてください。 また、サードパーティのライブラリなど、コンカレンシー ランタイムを使用する外部コードを呼び出す場合にも、例外を処理する必要があります。

[トップ]

複数の例外

タスクまたは並列アルゴリズムが複数の例外を受け取った場合、ランタイムはそのいずれか 1 つだけを呼び出し元のコンテキストにマーシャリングします。 どの例外がマーシャリングされるかは、任意です。

次の例では、parallel_for アルゴリズムを使用して、数値をコンソールに出力します。 入力値が指定の最小値未満であるか、最大値を超えている場合、例外がスローされます。 この例では、複数の処理関数が例外をスローする可能性があります。

// eh-multiple.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

int wmain()
{
   const int min = 0;
   const int max = 10;
   
   // Print values in a parallel_for loop. Use a try-catch block to 
   // handle any exceptions that occur in the loop.
   try
   {
      parallel_for(-5, 20, [min,max](int i)
      {
         // Throw an exeception if the input value is less than the 
         // minimum or greater than the maximum.

         // Otherwise, print the value to the console.

         if (i < min)
         {
            stringstream ss;
            ss << i << ": the value is less than the minimum.";
            throw exception(ss.str().c_str());
         }
         else if (i > max)
         {
            stringstream ss;
            ss << i << ": the value is greater than than the maximum.";
            throw exception(ss.str().c_str());
         }
         else
         {
            wstringstream ss;
            ss << i << endl;
            wcout << ss.str();
         }
      });
   }
   catch (exception& e)
   {
      // Print the error to the console.
      wcerr << L"Caught exception: " << e.what() << endl;
   }  
}

この例のサンプル出力を次に示します。

8293104567Caught exception: -5: the value is less than the minimum.

[トップ]

キャンセル

すべての例外がエラーの存在を示すわけではありません。 たとえば、検索アルゴリズムは結果を検出したときに、例外処理を使用して、関連付けられているタスクを中止することがあります。 コードで取り消しの機構を使用する方法の詳細については、「PPL における取り消し処理」を参照してください。

[トップ]

軽量タスク

軽量タスクは、concurrency::Scheduler オブジェクトから直接スケジュールするタスクです。 軽量タスクは、通常のタスクよりもオーバーヘッドが小さくなります。 ただし、ランタイムは軽量タスクによってスローされた例外をキャッチしません。 代わりに、ハンドルされない例外のハンドラーが例外をキャッチし、既定ではプロセスを終了します。 したがって、アプリケーションで適切なエラー処理機構を使用する必要があります。 軽量タスクの詳細については、タスク スケジューラに関する記事を参照してください。

[トップ]

非同期エージェント

軽量タスクと同様に、ランタイムは非同期エージェントによってスローされた例外を管理しません。

concurrency::agent から派生するクラスで例外を処理する方法の一例を次に示します。 この例では、points_agent クラスを定義しています。 points_agent::run メソッドはメッセージ バッファーから point オブジェクトを読み取り、それをコンソールに出力します。 run メソッドは、NULL ポインターを受け取った場合に例外をスローします。

run メソッドでは、すべての処理が try-catch ブロックに内包されています。 catch ブロックは、例外をメッセージ バッファーに格納します。 アプリケーションは、エージェントの終了後にこのバッファーから例外を読み取ることで、エージェントでのエラーの有無をチェックします。

// eh-agents.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Defines a point with x and y coordinates.
struct point
{
   int X;
   int Y;
};

// Informs the agent to end processing.
point sentinel = {0,0};

// An agent that prints point objects to the console.
class point_agent : public agent
{
public:
   explicit point_agent(unbounded_buffer<point*>& points)
      : _points(points)
   { 
   }

   // Retrieves any exception that occurred in the agent.
   bool get_error(exception& e)
   {
      return try_receive(_error, e);
   }

protected:
   // Performs the work of the agent.
   void run()
   {
      // Perform processing in a try block.
      try
      {
         // Read from the buffer until we reach the sentinel value.
         while (true)
         {
            // Read a value from the message buffer.
            point* r = receive(_points);

            // In this example, it is an error to receive a 
            // NULL point pointer. In this case, throw an exception.
            if (r == NULL)
            {
               throw exception("point must not be NULL");
            }
            // Break from the loop if we receive the 
            // sentinel value.
            else if (r == &sentinel)
            {
               break;
            }
            // Otherwise, do something with the point.
            else
            {
               // Print the point to the console.
               wcout << L"X: " << r->X << L" Y: " << r->Y << endl;
            }
         }
      }
      // Store the error in the message buffer.
      catch (exception& e)
      {
         send(_error, e);
      }

      // Set the agent status to done.
      done();
   }

private:
   // A message buffer that receives point objects.
   unbounded_buffer<point*>& _points;

   // A message buffer that stores error information.
   single_assignment<exception> _error;
};

int wmain()
{  
   // Create a message buffer so that we can communicate with
   // the agent.
   unbounded_buffer<point*> buffer;

   // Create and start a point_agent object.
   point_agent a(buffer);
   a.start();

   // Send several points to the agent.
   point r1 = {10, 20};
   point r2 = {20, 30};
   point r3 = {30, 40};

   send(buffer, &r1);
   send(buffer, &r2);
   // To illustrate exception handling, send the NULL pointer to the agent.
   send(buffer, reinterpret_cast<point*>(NULL));
   send(buffer, &r3);
   send(buffer, &sentinel);

   // Wait for the agent to finish.
   agent::wait(&a);
  
   // Check whether the agent encountered an error.
   exception e;
   if (a.get_error(e))
   {
      cout << "error occurred in agent: " << e.what() << endl;
   }
   
   // Print out agent status.
   wcout << L"the status of the agent is: ";
   switch (a.status())
   {
   case agent_created:
      wcout << L"created";
      break;
   case agent_runnable:
      wcout << L"runnable";
      break;
   case agent_started:
      wcout << L"started";
      break;
   case agent_done:
      wcout << L"done";
      break;
   case agent_canceled:
      wcout << L"canceled";
      break;
   default:
      wcout << L"unknown";
      break;
   }
   wcout << endl;
}

この例を実行すると、次の出力が生成されます。

X: 10 Y: 20
X: 20 Y: 30
error occurred in agent: point must not be NULL
the status of the agent is: done

try-catch ブロックは while ループの外側にあるため、エージェントは最初のエラーを検出した時点で処理を終了します。 try-catch ブロックが while ループの内側にある場合、エージェントはエラーの発生後も処理を継続します。

この例では例外がメッセージ バッファーに格納されるため、別のコンポーネントが実行中のエージェントのエラーを監視できます。 この例では、concurrency::single_assignment オブジェクトを使用してエラーを保存します。 エージェントが複数の例外を処理する場合、single_assignment クラスは渡された最初のメッセージだけを保存します。 最後の例外だけを保存するには、concurrency::overwrite_buffer クラスを使用します。 すべての例外を保存するには、concurrency::unbounded_buffer クラスを使用します。 これらのメッセージ ブロックの詳細については、「非同期メッセージ ブロック」を参照してください。

非同期エージェントの詳細については、「非同期エージェント」を参照してください。

[トップ]

まとめ

[トップ]

関連項目

コンカレンシー ランタイム
タスクの並列処理
並列アルゴリズム
PPL における取り消し処理
タスク スケジューラ
非同期エージェント