並列パターン ライブラリに関するベスト プラクティス

ここでは、並列パターン ライブラリ (PPL) を効果的に使用する方法について説明します。 PPL は、粒度の細かい並列化を実行するための汎用的なコンテナー、オブジェクト、およびアルゴリズムを提供します。

PPL の詳細については、「並列パターン ライブラリ (PPL)」を参照してください。

セクション

このドキュメントは、次のトピックに分かれています。

小さなループ本体を並列化しない

比較的小さなループ本体を並列化すると、関連するスケジューリング オーバーヘッドが並列処理のメリットを上回る可能性があります。 次の例について考えます。この例では、2 つの配列に各要素ペアを追加しています。

// small-loops.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create three arrays that each have the same size.
   const size_t size = 100000;
   int a[size], b[size], c[size];

   // Initialize the arrays a and b.
   for (size_t i = 0; i < size; ++i)
   {
      a[i] = i;
      b[i] = i * 2;
   }

   // Add each pair of elements in arrays a and b in parallel 
   // and store the result in array c.
   parallel_for<size_t>(0, size, [&a,&b,&c](size_t i) {
      c[i] = a[i] + b[i];
   });

   // TODO: Do something with array c.
}

並列ループの各反復の作業負荷が小さすぎるため、並列処理のオーバーヘッドに勝る利益が得られません。 ループ本体での処理量を増やすか、ループを逐次的に実行すると、ループのパフォーマンスが向上します。

[トップ]

可能な限り高いレベルで並列処理を表現する

低いレベルでのみコードを並列化する場合は、プロセッサの数が増えても拡張されない fork-join コンストラクトを導入できます。 fork-join コンストラクトでは、タスクはその処理を複数の小さな並列サブタスクに分割し、それらのサブタスクが終了するまで待機します。 サブタスクの場合も、それぞれの処理を再帰的に別のサブタスクに分割できます。

fork-join モデルはさまざまな問題を解決するのに役立ちますが、同期のオーバーヘッドに伴ってスケーラビリティが下がるような状況もあります。 たとえば、イメージ データを逐次的に処理する次のコードについて考えます。

// Calls the provided function for each pixel in a Bitmap object.
void ProcessImage(Bitmap* bmp, const function<void (DWORD&)>& f)
{
   int width = bmp->GetWidth();
   int height = bmp->GetHeight();

   // Lock the bitmap.
   BitmapData bitmapData;
   Rect rect(0, 0, bmp->GetWidth(), bmp->GetHeight());
   bmp->LockBits(&rect, ImageLockModeWrite, PixelFormat32bppRGB, &bitmapData);

   // Get a pointer to the bitmap data.
   DWORD* image_bits = (DWORD*)bitmapData.Scan0;

   // Call the function for each pixel in the image.
   for (int y = 0; y < height; ++y)
   {      
      for (int x = 0; x < width; ++x)
      {
         // Get the current pixel value.
         DWORD* curr_pixel = image_bits + (y * width) + x;

         // Call the function.
         f(*curr_pixel);
      }
   }

   // Unlock the bitmap.
   bmp->UnlockBits(&bitmapData);
}

各ループ反復は独立しているため、次の例に示すように処理の大部分を並列化できます。 この例では、外側のループを並列化するために concurrency::parallel_for アルゴリズムを使用します。

// Calls the provided function for each pixel in a Bitmap object.
void ProcessImage(Bitmap* bmp, const function<void (DWORD&)>& f)
{
   int width = bmp->GetWidth();
   int height = bmp->GetHeight();

   // Lock the bitmap.
   BitmapData bitmapData;
   Rect rect(0, 0, bmp->GetWidth(), bmp->GetHeight());
   bmp->LockBits(&rect, ImageLockModeWrite, PixelFormat32bppRGB, &bitmapData);

   // Get a pointer to the bitmap data.
   DWORD* image_bits = (DWORD*)bitmapData.Scan0;

   // Call the function for each pixel in the image.
   parallel_for (0, height, [&, width](int y)
   {      
      for (int x = 0; x < width; ++x)
      {
         // Get the current pixel value.
         DWORD* curr_pixel = image_bits + (y * width) + x;

         // Call the function.
         f(*curr_pixel);
      }
   });

   // Unlock the bitmap.
   bmp->UnlockBits(&bitmapData);
}

次の例では、ループで ProcessImage 関数を呼び出す fork-join コンストラクトを示します。 ProcessImage を呼び出すたびに制御が戻されるのではなく、各サブタスクが終了してから戻されます。

// Processes each bitmap in the provided vector.
void ProcessImages(vector<Bitmap*> bitmaps, const function<void (DWORD&)>& f)
{
   for_each(begin(bitmaps), end(bitmaps), [&f](Bitmap* bmp) {
      ProcessImage(bmp, f);
   });
}

並列ループの各反復でほとんど処理が実行されないか、並列ループで実行される処理が不均衡の場合 (つまり、ループ反復ごとに所要時間が異なる場合)、処理のフォークと結合を頻繁に行うことによるスケジューリング オーバーヘッドが並列実行のメリットを上回ることがあります。 プロセッサの数が増えると、このオーバーヘッドも大きくなります。

この例のスケジューリング オーバーヘッドの量を低減するには、内側のループを並列化する前に外側のループを並列化するか、パイプライン処理などの別の parallel コンストラクトを使用します。 次の例では、外側のループを並列化するために concurrency::parallel_for_each のアルゴリズムを使用するように ProcessImages 関数を変更します。

// Processes each bitmap in the provided vector.
void ProcessImages(vector<Bitmap*> bitmaps, const function<void (DWORD&)>& f)
{
   parallel_for_each(begin(bitmaps), end(bitmaps), [&f](Bitmap* bmp) {
      ProcessImage(bmp, f);
   });
}

パイプラインを使用してイメージ処理を並列に実行する類似の例については、「チュートリアル: イメージ処理ネットワークの作成」を参照してください。

[トップ]

parallel_invokeを使用して分割と征服の問題を解決する

分割と整理 (divide-and-conquer) の問題は、再帰を使用してタスクをサブタスクに分割する fork-join コンストラクトの一種です。 分割と整理の問題の解決に、concurrency::task_groupconcurrency::structured_task_group クラスに加えて、concurrency::parallel_invoke アルゴリズムも使用できます。 parallel_invoke アルゴリズムでは、タスク グループ オブジェクトよりも簡単な構文を使用できます。このアルゴリズムは、並列タスクの数が固定されている場合に役立ちます。

次の例では、parallel_invoke アルゴリズムを使用して、バイトニック ソート アルゴリズムを実装します。

// Sorts the given sequence in the specified order.
template <class T>
void parallel_bitonic_sort(T* items, int lo, int n, bool dir)
{   
   if (n > 1)
   {
      // Divide the array into two partitions and then sort 
      // the partitions in different directions.
      int m = n / 2;

      parallel_invoke(
         [&] { parallel_bitonic_sort(items, lo, m, INCREASING); },
         [&] { parallel_bitonic_sort(items, lo + m, m, DECREASING); }
      );
      
      // Merge the results.
      parallel_bitonic_merge(items, lo, n, dir);
   }
}

parallel_invoke アルゴリズムでは、オーバーヘッドを低減するために、一連のタスクの最後のタスクを呼び出し元のコンテキストで実行します。

この例の完全なバージョンについては、parallel_invoke を使用して並列並べ替えルーチンを記述する方法に関するページをご覧ください。 parallel_invoke アルゴリズムの詳細については、「並列アルゴリズム」を参照してください。

[トップ]

キャンセルまたは例外処理を使用して並列ループから中断する

PPL では、2 とおりの方法を使用して、タスク グループまたは並列アルゴリズムによって実行される並列処理を取り消すことができます。 1 番目の方法は concurrency::task_group クラスと concurrency::structured_task_group クラスが提供する取り消し機構を使用する方法です。 もう 1 つは、タスクの処理関数の本体で例外をスローする方法です。 並列処理ツリーを取り消す場合は、例外処理を行うよりも取り消し機構を使用する方が効率的です。 "並列処理ツリー" は、関連するタスク グループのグループです。つまり、タスク グループの中に他のタスク グループが含まれます。 取り消し機構では、タスク グループとその子タスク グループを上位から順に取り消します。 反対に、例外処理では下位から順に処理されるため、例外が上位へ移動するたびに、子タスク グループを個別に取り消す必要があります。

タスク グループ オブジェクトを直接操作する場合は、concurrency::task_group::cancel メソッドまたは concurrency::structured_task_group::cancel メソッドを使用して、そのタスク グループに属する作業を取り消します。 たとえば、parallel_for では、並列アルゴリズムを取り消すために、親タスク グループを作成し、そのタスク グループを取り消します。 たとえば、次の関数 parallel_find_any について考えます。この関数では、配列内の値を並列に検索します。

// Returns the position in the provided array that contains the given value, 
// or -1 if the value is not in the array.
template<typename T>
int parallel_find_any(const T a[], size_t count, const T& what)
{
   // The position of the element in the array. 
   // The default value, -1, indicates that the element is not in the array.
   int position = -1;

   // Call parallel_for in the context of a cancellation token to search for the element.
   cancellation_token_source cts;
   run_with_cancellation_token([count, what, &a, &position, &cts]()
   {
      parallel_for(std::size_t(0), count, [what, &a, &position, &cts](int n) {
         if (a[n] == what)
         {
            // Set the return value and cancel the remaining tasks.
            position = n;
            cts.cancel();
         }
      });
   }, cts.get_token());

   return position;
}

並列アルゴリズムではタスク グループが使用されるため、並列反復処理のいずれかによって親タスク グループが取り消されると、タスク全体が取り消されます。 この例の完全なバージョンについては、「方法: キャンセル処理を使用して並列ループを中断する」を参照してください。

例外処理を使用して並列処理を取り消す方法は、取り消し機構を使用する方法よりも効率の面で劣りますが、例外処理の方が適している状況もあります。 たとえば、次のメソッド for_all では、tree 構造体の各ノードに対して処理関数を再帰的に実行します。 この例では、_children データ メンバーは、tree オブジェクトを含む std::list です。

// Performs the given work function on the data element of the tree and
// on each child.
template<class Function>
void tree::for_all(Function& action)
{
   // Perform the action on each child.
   parallel_for_each(begin(_children), end(_children), [&](tree& child) {
      child.for_all(action);
   });

   // Perform the action on this node.
   action(*this);
}

ツリーの各要素に対して処理関数を呼び出す必要がない場合、tree::for_all メソッドの呼び出し元は例外をスローすることができます。 次の例は search_for_value 関数で、提供された tree オブジェクト内で値を検索します。 search_for_value 関数で使用される処理関数は、ツリーの現在の要素と提供された値が一致する場合に例外をスローします。 search_for_value 関数は、try-catch ブロックを使用してこの例外をキャプチャし、結果をコンソールに出力します。

// Searches for a value in the provided tree object.
template <typename T>
void search_for_value(tree<T>& t, int value)
{
   try
   {
      // Call the for_all method to search for a value. The work function
      // throws an exception when it finds the value.
      t.for_all([value](const tree<T>& node) {
         if (node.get_data() == value)
         {
            throw &node;
         }
      });
   }
   catch (const tree<T>* node)
   {
      // A matching node was found. Print a message to the console.
      wstringstream ss;
      ss << L"Found a node with value " << value << L'.' << endl;
      wcout << ss.str();
      return;
   }

   // A matching node was not found. Print a message to the console.
   wstringstream ss;
   ss << L"Did not find node with value " << value << L'.' << endl;
   wcout << ss.str();   
}

この例の完全なバージョンについては、「方法: 例外処理を使用して並列ループを中断する」を参照してください。

PPL に用意されている取り消し機構と例外処理機構に関する一般的な情報については、「PPL における取り消し処理」および例外処理に関するページを参照してください。

[トップ]

キャンセルと例外処理がオブジェクトの破棄にどのように影響するかを理解する

並列処理ツリーでタスクを取り消すと、子タスクも実行されなくなります。 そのため、アプリケーションで重要となる操作 (リソースの解放など) が子タスクのいずれかで実行されるような場合に問題となります。 また、タスクを取り消すと、例外がオブジェクトのデストラクターを通じて反映され、アプリケーションで未定義の動作が発生する可能性があります。

次の例では、Resource クラスはリソースを表し、Container クラスはリソースを保持するコンテナーを表します。 そのデストラクターで、Container クラスは Resource メンバーの 2 つに対して cleanup メソッドを並列に呼び出し、3 つ目の Resource メンバーに対して cleanup メソッドを呼び出します。

// parallel-resource-destruction.h
#pragma once
#include <ppl.h>
#include <sstream>
#include <iostream>

// Represents a resource.
class Resource
{
public:
   Resource(const std::wstring& name)
      : _name(name)
   {
   }

   // Frees the resource.
   void cleanup()
   {
      // Print a message as a placeholder.
      std::wstringstream ss;
      ss << _name << L": Freeing..." << std::endl;
      std::wcout << ss.str();
   }
private:
   // The name of the resource.
   std::wstring _name;
};

// Represents a container that holds resources.
class Container
{
public:
   Container(const std::wstring& name)
      : _name(name)
      , _resource1(L"Resource 1")
      , _resource2(L"Resource 2")
      , _resource3(L"Resource 3")
   {
   }

   ~Container()
   {
      std::wstringstream ss;
      ss << _name << L": Freeing resources..." << std::endl;
      std::wcout << ss.str();

      // For illustration, assume that cleanup for _resource1
      // and _resource2 can happen concurrently, and that 
      // _resource3 must be freed after _resource1 and _resource2.

      concurrency::parallel_invoke(
         [this]() { _resource1.cleanup(); },
         [this]() { _resource2.cleanup(); }
      );

      _resource3.cleanup();
   }

private:
   // The name of the container.
   std::wstring _name;

   // Resources.
   Resource _resource1;
   Resource _resource2;
   Resource _resource3;
};

このパターン自体には問題はありませんが、2 つのタスクを並列に実行する次のコードについて考えてみてください。 1 つ目のタスクでは Container オブジェクトを作成し、2 つ目のタスクではタスク全体を取り消します。 わかりやすいように、この例では、2 つの concurrency::event オブジェクトを使用して、Container オブジェクトが作成された後で取り消し操作が発生し、取り消し操作が発生した後で Container オブジェクトが破棄されることを確認します。

// parallel-resource-destruction.cpp
// compile with: /EHsc
#include "parallel-resource-destruction.h"

using namespace concurrency;
using namespace std;

static_assert(false, "This example illustrates a non-recommended practice.");

int main()
{  
   // Create a task_group that will run two tasks.
   task_group tasks;

   // Used to synchronize the tasks.
   event e1, e2;

   // Run two tasks. The first task creates a Container object. The second task
   // cancels the overall task group. To illustrate the scenario where a child 
   // task is not run because its parent task is cancelled, the event objects 
   // ensure that the Container object is created before the overall task is 
   // cancelled and that the Container object is destroyed after the overall 
   // task is cancelled.
   
   tasks.run([&tasks,&e1,&e2] {
      // Create a Container object.
      Container c(L"Container 1");
      
      // Allow the second task to continue.
      e2.set();

      // Wait for the task to be cancelled.
      e1.wait();
   });

   tasks.run([&tasks,&e1,&e2] {
      // Wait for the first task to create the Container object.
      e2.wait();

      // Cancel the overall task.
      tasks.cancel();      

      // Allow the first task to continue.
      e1.set();
   });

   // Wait for the tasks to complete.
   tasks.wait();

   wcout << L"Exiting program..." << endl;
}

この例を実行すると、次の出力が生成されます。

Container 1: Freeing resources...Exiting program...

このコード例には次の問題が含まれているため、期待どおりの動作が得られない可能性があります。

  • 親タスクを取り消すと、子タスク (concurrency::parallel_invoke への呼び出し) も取り消されます。 そのため、これら 2 つのリソースは解放されません。

  • 親タスクを取り消すと、子タスクが内部例外をスローします。 Container デストラクターではこの例外は処理されないため、例外が上位に伝達され、3 つ目のリソースは解放されません。

  • 子タスクによってスローされた例外が、Container デストラクターを通じて伝達されます。 デストラクターからスローされることにより、アプリケーションが未定義の状態になります。

タスクが取り消されないことを保証できる場合を除き、タスクでは重要な操作 (リソースの解放など) を実行しないことをお勧めします。 また、採用している型のデストラクターでスローを行うようなランタイム機能は使用しないようにしてください。

[トップ]

並列ループで繰り返しブロックしない

ブロック操作によって制御される concurrency::parallel_forconcurrency::parallel_for_each などの並列ループにより、ランタイムが短期間に多数のスレッドを作成する場合があります。

コンカレンシー ランタイムでは、タスクが終了するか、協調的にブロックまたは譲渡すると、追加の処理が実行されます。 並列ループの 1 回の反復がブロックすると、別の反復が開始されることがあります。 使用可能なアイドル スレッドが存在しない場合は、新しいスレッドが作成されます。

並列ループの本体がときどきブロックする程度であれば、この機構によりタスクの全体的なスループットが最大限に高まります。 ただし、反復処理で頻繁にブロックが発生する場合は、追加の処理を実行するために多数のスレッドが作成される可能性があります。 それに伴って、メモリ不足の状態に陥ったり、ハードウェア リソースが不適切に使用されたりする場合があります。

parallel_for ループの各反復で concurrency::send 関数を呼び出す例を次に示します。 send は協調的にブロックするため、send を呼び出すたびに、追加の処理を実行するための新しいスレッドが作成されます。

// repeated-blocking.cpp
// compile with: /EHsc
#include <ppl.h>
#include <agents.h>

using namespace concurrency;

static_assert(false, "This example illustrates a non-recommended practice.");

int main()
{
   // Create a message buffer.
   overwrite_buffer<int> buffer;
  
   // Repeatedly send data to the buffer in a parallel loop.
   parallel_for(0, 1000, [&buffer](int i) {
      
      // The send function blocks cooperatively. 
      // We discourage the use of repeated blocking in a parallel
      // loop because it can cause the runtime to create 
      // a large number of threads over a short period of time.
      send(buffer, i);
   });
}

このパターンを回避するようにコードをリファクタリングすることをお勧めします。 この例では、逐次的な for ループで send を呼び出すことによって、追加のスレッドの作成を回避できます。

[トップ]

並列作業を取り消すときにブロック操作を実行しない

可能であれば、concurrency::task_group::cancel メソッドまたは concurrency::structured_task_group::cancel メソッドを呼び出して並列処理を取り消す前に、ブロッキング操作を実行しないでください。

タスクが協調的ブロッキング操作を実行すると、ランタイムは最初のタスクがデータを待っている間に他の処理を実行できます。 待機中のタスクがブロック解除すると、ランタイムはそのタスクを再スケジュールします。 ランタイムは、通常、直近にブロック解除したタスクから順に再スケジュールします。 そのため、ブロッキング操作中に不要な処理がスケジュールされ、パフォーマンスが低下します。 したがって、並列処理を取り消す前にブロッキング操作を実行すると、そのブロッキング操作が原因で cancel の呼び出しが遅れる可能性があります。 それにより、他のタスクで不要な処理が実行されるようになります。

次の例について考えます。この例では、提供された述語関数を満たす指定の配列の要素を検索する parallel_find_answer 関数を定義します。 述語関数が true を返すと、並列処理関数は Answer オブジェクトを作成し、タスク全体を取り消します。

// blocking-cancel.cpp
// compile with: /c /EHsc
#include <windows.h>
#include <ppl.h>

using namespace concurrency;

// Encapsulates the result of a search operation.
template<typename T>
class Answer
{
public:
   explicit Answer(const T& data)
      : _data(data)
   {
   }

   T get_data() const
   {
      return _data;
   }

   // TODO: Add other methods as needed.

private:
   T _data;

   // TODO: Add other data members as needed.
};

// Searches for an element of the provided array that satisfies the provided
// predicate function.
template<typename T, class Predicate>
Answer<T>* parallel_find_answer(const T a[], size_t count, const Predicate& pred)
{
   // The result of the search.
   Answer<T>* answer = nullptr;
   // Ensures that only one task produces an answer.
   volatile long first_result = 0;

   // Use parallel_for and a task group to search for the element.
   structured_task_group tasks;
   tasks.run_and_wait([&]
   {
      // Declare the type alias for use in the inner lambda function.
      typedef T T;

      parallel_for<size_t>(0, count, [&](const T& n) {
         if (pred(a[n]) && InterlockedExchange(&first_result, 1) == 0)
         {
            // Create an object that holds the answer.
            answer = new Answer<T>(a[n]);
            // Cancel the overall task.
            tasks.cancel();
         }
      });
   });

   return answer;
}

new 演算子は、ブロッキングの対象となる可能性のあるヒープ割り当てを実行します。 ランタイムは、タスクが concurrency::critical_section::lock への呼び出しなどの協調的ブロッキング呼び出しを実行したときにだけ、他の作業を実行します。

次の例では、不要な処理を回避し、それによってパフォーマンスを向上させる方法を示します。 この例では、Answer オブジェクト用のストレージを割り当てる前に、タスク グループを取り消します。

// Searches for an element of the provided array that satisfies the provided
// predicate function.
template<typename T, class Predicate>
Answer<T>* parallel_find_answer(const T a[], size_t count, const Predicate& pred)
{
   // The result of the search.
   Answer<T>* answer = nullptr;
   // Ensures that only one task produces an answer.
   volatile long first_result = 0;

   // Use parallel_for and a task group to search for the element.
   structured_task_group tasks;
   tasks.run_and_wait([&]
   {
      // Declare the type alias for use in the inner lambda function.
      typedef T T;

      parallel_for<size_t>(0, count, [&](const T& n) {
         if (pred(a[n]) && InterlockedExchange(&first_result, 1) == 0)
         {
            // Cancel the overall task.
            tasks.cancel();
            // Create an object that holds the answer.
            answer = new Answer<T>(a[n]);            
         }
      });
   });

   return answer;
}

[トップ]

並列ループで共有データに書き込まない

同時実行ランタイムは、共有データへのコンカレンシー アクセスを同期するいくつかのデータ構造体 (たとえば、concurrency::critical_section) を提供します。 これらのデータ構造体は、複数のタスクでリソースへの共有アクセスがまれに必要になる場合など、さまざまな状況で役立ちます。

std::array オブジェクト内の素数の数を計算するために concurrency::parallel_for_each アルゴリズムと critical_section オブジェクトを使用する、次の例を考えてみます。 この例では、各スレッドが共有変数 prime_sum にアクセスするのを待つ必要があるため、効率は改善されません。

critical_section cs;
prime_sum = 0;
parallel_for_each(begin(a), end(a), [&](int i) {
   cs.lock();
   prime_sum += (is_prime(i) ? i : 0);
   cs.unlock();
});

また、頻繁なロック操作によってループが事実上シリアル化されるため、パフォーマンスが低下する可能性もあります。 さらに、コンカレンシー ランタイム オブジェクトがブロック操作を実行すると、スケジューラによって追加のスレッドが作成され、最初のスレッドがデータを待っている間に他の処理が実行される可能性があります。 共有データを待つタスクが多数存在し、それに対処するためにランタイムで多数のスレッドが作成されると、アプリケーションのパフォーマンス低下やリソース不足状態が発生することがあります。

PPL では concurrency::combinable クラスが定義されます。これは、ロックせずに共有リソースにアクセスできるようにすることで共有状態を回避します。 combinable クラスにはスレッド ローカル ストレージが用意されており、詳細な計算を実行した後、その計算を最終結果にマージできます。 combinable オブジェクトは減少変数と考えることができます。

次の例では、前の例を変更し、合計を計算するときに critical_section オブジェクトの代わりに combinable オブジェクトを使用するようにしています。 この例では、各スレッドが合計のローカル コピーをそれぞれ保持するため、効率が改善されます。 この例では、concurrency::combinable::combine メソッドを使用して、ローカルの計算を最終結果にマージします。

combinable<int> sum;
parallel_for_each(begin(a), end(a), [&](int i) {
   sum.local() += (is_prime(i) ? i : 0);
});
prime_sum = sum.combine(plus<int>());

この例の完全なバージョンについては、「方法: combinable を使用してパフォーマンスを向上させる」を参照してください。 combinable クラスの詳細については、「並列コンテナーと並列オブジェクト」を参照してください。

[トップ]

可能な場合は、誤った共有を避けてください

別々のプロセッサで実行されている複数の同時実行タスクが、同じキャッシュ ラインに配置されている変数に書き込みを行うと、"偽共有" が発生します。 1 つのタスクが変数のいずれかに書き込みを行うと、両方の変数のキャッシュ ラインが無効化されます。 キャッシュ ラインが無効化されるたびに、各プロセッサでキャッシュ ラインの再読み込みを行う必要があります。 したがって、偽共有が発生すると、アプリケーションでパフォーマンスが低下する可能性があります。

次の基本的な例では、2 つの同時実行タスクがそれぞれ共有カウンター変数をインクリメントする場合を示します。

volatile long count = 0L;
concurrency::parallel_invoke(
   [&count] {
      for(int i = 0; i < 100000000; ++i)
         InterlockedIncrement(&count);
   },
   [&count] {
      for(int i = 0; i < 100000000; ++i)
         InterlockedIncrement(&count);
   }
);

2 つのタスク間でデータが共有されないようにするには、2 つのカウンター変数を使用するように例を変更します。 この例では、タスクの終了後に最終的なカウンター値を計算します。 ただし、count1 変数と count2 変数が同じキャッシュ ラインに配置される可能性があるため、偽共有となります。

long count1 = 0L;
long count2 = 0L;
concurrency::parallel_invoke(
   [&count1] {
      for(int i = 0; i < 100000000; ++i)
         ++count1;
   },
   [&count2] {
      for(int i = 0; i < 100000000; ++i)
         ++count2;
   }
);
long count = count1 + count2;

偽共有をなくす方法の 1 つは、カウンター変数を必ず別々のキャッシュ ラインに配置することです。 次の例では、64 バイト境界上に count1 変数と count2 変数を配置します。

__declspec(align(64)) long count1 = 0L;      
__declspec(align(64)) long count2 = 0L;      
concurrency::parallel_invoke(
   [&count1] {
      for(int i = 0; i < 100000000; ++i)
         ++count1;
   },
   [&count2] {
      for(int i = 0; i < 100000000; ++i)
         ++count2;
   }
);
long count = count1 + count2;

この例では、メモリ キャッシュのサイズが 64 バイト以下であると仮定しています。

タスク間でデータを共有する必要がある場合は、concurrency::combinable クラスを使用することをお勧めします。 combinable クラスは、偽共有が発生する可能性を減らすようにスレッド ローカル変数を作成します。 combinable クラスの詳細については、「並列コンテナーと並列オブジェクト」を参照してください。

[トップ]

タスクの有効期間中に変数が有効であることを確認する

タスク グループまたは並列アルゴリズムにラムダ式を渡す場合、ラムダ式の本体で外側のスコープ内の変数を値でアクセスするのか参照でアクセスするのかは、capture 句で指定します。 ラムダ式に変数を渡すときに参照渡しを使用する場合、タスクが終了するまでその変数の有効期間が続くようにする必要があります。

次の例について考えます。この例では、object クラスと perform_action 関数を定義します。 perform_action 関数は、object 変数を作成し、その変数に対していくつかのアクションを非同期的に実行します。 perform_action 関数から制御が戻される前にタスクが終了するとは限らないため、タスクの実行中に object 変数が破棄されると、プログラムのクラッシュや未定義の動作が発生します。

// lambda-lifetime.cpp
// compile with: /c /EHsc
#include <ppl.h>

using namespace concurrency;

// A type that performs an action.
class object
{
public:
   void action() const
   {
      // TODO: Details omitted for brevity.
   }
};

// Performs an action asynchronously.
void perform_action(task_group& tasks)
{
   // Create an object variable and perform some action on 
   // that variable asynchronously.
   object obj;
   tasks.run([&obj] {
      obj.action();
   });

   // NOTE: The object variable is destroyed here. The program
   // will crash or exhibit unspecified behavior if the task
   // is still running when this function returns.
}

アプリケーションの要件に応じて、次の手法のいずれかを使用して、変数が各タスクの有効期間を通じて有効のまま維持されるようにすることができます。

次の例では、タスクに対して object 変数の値渡しを行います。 そのため、タスクは変数の個別コピーに対して作用します。

// Performs an action asynchronously.
void perform_action(task_group& tasks)
{
   // Create an object variable and perform some action on 
   // that variable asynchronously.
   object obj;
   tasks.run([obj] {
      obj.action();
   });
}

object 変数は値渡しで渡されるため、この変数の状態変化が発生しても、元のコピーには反映されません。

次の例では、perform_action 関数から制御が戻される前にタスクが確実に終了するように、concurrency::task_group::wait メソッドを使用します。

// Performs an action.
void perform_action(task_group& tasks)
{
   // Create an object variable and perform some action on 
   // that variable.
   object obj;
   tasks.run([&obj] {
      obj.action();
   });

   // Wait for the task to finish. 
   tasks.wait();
}

関数から制御が戻される前にタスクが終了するようになったため、perform_action 関数が非同期に動作することがなくなりました。

次の例では、perform_action 関数を変更して、object 変数の参照が使用されるようにします。 呼び出し元は、タスクが終了するまで object 変数の有効期間が続くようにする必要があります。

// Performs an action asynchronously.
void perform_action(object& obj, task_group& tasks)
{
   // Perform some action on the object variable.
   tasks.run([&obj] {
      obj.action();
   });
}

また、タスク グループまたは並列アルゴリズムに渡すオブジェクトの有効期間をポインターで制御することもできます。

ラムダ式について詳しくは、「ラムダ式」をご覧ください。

[トップ]

関連項目

コンカレンシー ランタイムに関するベスト プラクティス
並列パターン ライブラリ (PPL)
並列コンテナーと並列オブジェクト
並列アルゴリズム
PPL における取り消し処理
例外処理
チュートリアル: イメージ処理ネットワークの作成
方法: 並列呼び出しを使用して並列並べ替えルーチンを記述する
方法: キャンセル処理を使用して並列ループを中断する
方法: combinable を使用してパフォーマンスを向上させる
非同期エージェント ライブラリに関するベスト プラクティス
コンカレンシー ランタイムに関する全般的なベスト プラクティス