并行模式库中的最佳做法

更新:2011 年 3 月

本文档描述如何更好地有效利用并行模式库 (PPL)。 PPL 可为执行细粒度并行操作提供所需的通用容器、对象和算法。

有关 PPL 的更多信息,请参见并行模式库 (PPL)

各节内容

本文档包含以下几节:

  • 不要并行化小型循环体

  • 在尽可能高的级别表示并行度

  • 使用 parallel_invoke 解决分治问题

  • 使用取消或异常处理中断并行循环

  • 了解取消和异常处理如何影响对象销毁

  • 不要在并行循环中进行反复停滞

  • 不要在取消并行工作时执行停滞操作

  • 不要在并行循环中写入共享数据

  • 请尽可能避免伪共享

  • 确保变量在任务的整个生存期内有效

不要并行化小型循环体

并行化相对较小的循环体可能导致相关的计划开销超过并行处理所带来的好处。 请考虑以下示例,此示例在两个数组中分别添加了一对元素。

// small-loops.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

int wmain()
{
   // Create three arrays that each have the same size.
   const size_t size = 100000;
   int a[size], b[size], c[size];

   // Initialize the arrays a and b.
   for (size_t i = 0; i < size; ++i)
   {
      a[i] = i;
      b[i] = i * 2;
   }

   // Add each pair of elements in arrays a and b in parallel 
   // and store the result in array c.
   parallel_for<size_t>(0, size, [&a,&b,&c](size_t i) {
      c[i] = a[i] + b[i];
   });

   // TODO: Do something with array c.
}

每个并行循环迭代的工作负载太小,以至于不能从并行处理的开销中受益。 您可以通过在循环体中执行更多工作或连续执行循环来提高此循环的性能。

[转到页首]

在尽可能高的级别表示并行度

当您仅在低级别并行化代码时,随着处理器数量的增加,您可以引入不进行扩展的分叉联接构造。 在“分叉联接”构造中,一个任务会将其工作划分为几个较小的并行子任务并等待这些子任务完成。 每个子任务都以递归方式将其自身划分为更多子任务。

虽然分叉联接模型对解决各种问题很有用,但在某些情况下会使同步开销降低可缩放性。 例如,考虑以下处理图像数据的串行代码。

// Calls the provided function for each pixel in a Bitmap object.
void ProcessImage(Bitmap* bmp, const function<void (DWORD&)>& f)
{
   int width = bmp->GetWidth();
   int height = bmp->GetHeight();

   // Lock the bitmap.
   BitmapData bitmapData;
   Rect rect(0, 0, bmp->GetWidth(), bmp->GetHeight());
   bmp->LockBits(&rect, ImageLockModeWrite, PixelFormat32bppRGB, &bitmapData);

   // Get a pointer to the bitmap data.
   DWORD* image_bits = (DWORD*)bitmapData.Scan0;

   // Call the function for each pixel in the image.
   for (int y = 0; y < height; ++y)
   {      
      for (int x = 0; x < width; ++x)
      {
         // Get the current pixel value.
         DWORD* curr_pixel = image_bits + (y * width) + x;

         // Call the function.
         f(*curr_pixel);
      }
   }

   // Unlock the bitmap.
   bmp->UnlockBits(&bitmapData);
}

因为每个循环迭代都是独立的,所以您可以并行化大部分工作,如以下示例所示。 此示例使用 Concurrency::parallel_for 算法并行化外部循环。

// Calls the provided function for each pixel in a Bitmap object.
void ProcessImage(Bitmap* bmp, const function<void (DWORD&)>& f)
{
   int width = bmp->GetWidth();
   int height = bmp->GetHeight();

   // Lock the bitmap.
   BitmapData bitmapData;
   Rect rect(0, 0, bmp->GetWidth(), bmp->GetHeight());
   bmp->LockBits(&rect, ImageLockModeWrite, PixelFormat32bppRGB, &bitmapData);

   // Get a pointer to the bitmap data.
   DWORD* image_bits = (DWORD*)bitmapData.Scan0;

   // Call the function for each pixel in the image.
   parallel_for (0, height, [&, width](int y)
   {      
      for (int x = 0; x < width; ++x)
      {
         // Get the current pixel value.
         DWORD* curr_pixel = image_bits + (y * width) + x;

         // Call the function.
         f(*curr_pixel);
      }
   });

   // Unlock the bitmap.
   bmp->UnlockBits(&bitmapData);
}

通过在循环中调用 ProcessImage 函数,以下示例阐释了分叉联接构造。 在每个子任务完成之前,对 ProcessImage 的每次调用不会返回值。

// Processes each bitmap in the provided vector.
void ProcessImages(vector<Bitmap*> bitmaps, const function<void (DWORD&)>& f)
{
   for_each(bitmaps.begin(), bitmaps.end(), [&f](Bitmap* bmp) {
      ProcessImage(bmp, f);
   });
}

如果并行循环的每个迭代几乎都不执行工作或者并行循环所执行的工作不平衡(也就是说,一些循环迭代需要比其他循环迭代更长的时间),那么频繁执行的分叉和联接工作所需的计划开销会超过并行执行所带来的好处。 此开销会随着处理器数量的增加而增加。

若要在此示例中减少计划开销量,您可以在并行化内部循环之前并行化外部循环,或者使用另一个并行构造(如流水线)。 以下示例修改了 ProcessImages 函数,以便使用 Concurrency::parallel_for_each 算法并行化外部循环。

// Processes each bitmap in the provided vector.
void ProcessImages(vector<Bitmap*> bitmaps, const function<void (DWORD&)>& f)
{
   parallel_for_each(bitmaps.begin(), bitmaps.end(), [&f](Bitmap* bmp) {
      ProcessImage(bmp, f);
   });
}

有关使用管道并行执行图像处理操作的类似示例,请参见演练:创建图像处理网络

[转到页首]

使用 parallel_invoke 解决分治问题

分治问题是一种使用递归将一项任务分成若干子任务的分叉联接构造形式。 除了 Concurrency::task_groupConcurrency::structured_task_group 类以外,您还可以使用 Concurrency::parallel_invoke 算法解决分治问题。 parallel_invoke 算法的语法比任务组对象更简洁,如果您具有固定数量的并行任务,则此算法很有用。

以下示例说明如何使用 parallel_invoke 算法实现 bitonic 排序算法。

// Sorts the given sequence in the specified order.
template <class T>
void parallel_bitonic_sort(T* items, int lo, int n, bool dir)
{   
   if (n > 1)
   {
      // Divide the array into two partitions and then sort 
      // the partitions in different directions.
      int m = n / 2;

      parallel_invoke(
         [&] { parallel_bitonic_sort(items, lo, m, INCREASING); },
         [&] { parallel_bitonic_sort(items, lo + m, m, DECREASING); }
      );

      // Merge the results.
      parallel_bitonic_merge(items, lo, n, dir);
   }
}

为降低开销,parallel_invoke 算法会对调用上下文执行系列任务中的最后一项任务。

有关此示例的完整版本,请参见如何:使用 parallel_invoke 来编写并行排序运行时。 有关 parallel_invoke 算法的更多信息,请参见并行算法

[转到页首]

使用取消或异常处理中断并行循环

PPL 提供两种方式来取消由任务组或并行算法执行的并行工作。 其中一种方式是使用由 Concurrency::task_groupConcurrency::structured_task_group 类提供的取消机制。 另一种方式是在任务工作函数体中引发异常。 在取消并行工作树时,取消机制比异常处理的效率高。 “并行工作树”组包含相关任务组,其中一些任务组包含其他任务组。 取消机制可以自上而下的方式取消任务组及其子任务组。 相反,异常处理以自下而上的方式工作,并且必须在异常向上传播时单独取消每个子任务组。

当您直接使用任务组对象时,可使用 Concurrency::task_group::cancelConcurrency::structured_task_group::cancel 方法取消属于该任务组的工作。 若要取消并行算法(例如,parallel_for),可创建一个父任务组并取消该任务组。 例如,请考虑下面的 parallel_find_any 函数,该函数在数组中并行搜索值。

// Returns the position in the provided array that contains the given value, 
// or -1 if the value is not in the array.
template<typename T>
int parallel_find_any(const T a[], size_t count, const T& what)
{
   // The position of the element in the array. 
   // The default value, -1, indicates that the element is not in the array.
   int position = -1;

   // Use parallel_for to search for the element. 
   // The task group enables a work function to cancel the overall 
   // operation when it finds the result.

   structured_task_group tasks;
   tasks.run_and_wait([&]
   {
      parallel_for(std::size_t(0), count, [&](int n) {
         if (a[n] == what)
         {
            // Set the return value and cancel the remaining tasks. 
            position = n;            
            tasks.cancel();
         }
      });
   });

   return position;
}

因为并行算法使用任务组,所以当其中一个并行迭代取消父任务组时,整个任务也将被取消。 有关此示例的完整版本,请参见如何:使用取消中断 Parallel 循环

虽然异常处理方式在取消并行工作方面比取消机制效率低,但在某些情况下适合使用异常处理机制。 例如,以下方法 for_alltree 结构的每个节点上递归执行工作函数。 在此示例中,_children 数据成员是一个包含 tree 对象的 std::list

// Performs the given work function on the data element of the tree and
// on each child.
template<class Function>
void tree::for_all(Function& action)
{
   // Perform the action on each child.
   parallel_for_each(_children.begin(), _children.end(), [&](tree& child) {
      child.for_all(action);
   });

   // Perform the action on this node.
   action(*this);
}

如果 tree::for_all 方法的调用方不需要对树的每个元素都调用工作函数,则调用方可引发异常。 下面的示例演示了 search_for_value 函数,该函数在所提供的 tree 对象中搜索值。 search_for_value 函数使用在树的当前元素匹配所提供的值时引发异常的工作函数。 search_for_value 函数使用 try-catch 块捕获该异常,并将结果输出到控制台。

// Searches for a value in the provided tree object.
template <typename T>
void search_for_value(tree<T>& t, int value)
{
   try
   {
      // Call the for_all method to search for a value. The work function
      // throws an exception when it finds the value.
      t.for_all([value](const tree<T>& node) {
         if (node.get_data() == value)
         {
            throw &node;
         }
      });
   }
   catch (const tree<T>* node)
   {
      // A matching node was found. Print a message to the console.
      wstringstream ss;
      ss << L"Found a node with value " << value << L'.' << endl;
      wcout << ss.str();
      return;
   }

   // A matching node was not found. Print a message to the console.
   wstringstream ss;
   ss << L"Did not find node with value " << value << L'.' << endl;
   wcout << ss.str();   
}

有关此示例的完整版本,请参见如何:使用异常处理中断 Parallel 循环

有关 PPL 提供的取消和异常处理机制的更多常规信息,请参见 PPL 中的取消操作并发运行时中的异常处理

[转到页首]

了解取消和异常处理如何影响对象销毁

在并行工作树中,取消的任务会阻止子任务运行。 如果某个子任务执行的操作对应用程序很重要(例如释放资源),则这可能引发问题。 另外,任务取消可导致异常在对象析构函数中传播并导致您的应用程序发生未定义的行为。

在以下示例中,Resource 类描述了资源,而 Container 类描述了保留资源的容器。 在此析构函数中,Container 类在其两个 Resource 成员上并行调用 cleanup 方法,然后在其第三个 Resource 成员上调用 cleanup 方法。

// parallel-resource-destruction.h
#pragma once
#include <ppl.h>
#include <sstream>
#include <iostream>

// Represents a resource.
class Resource
{
public:
   Resource(const std::wstring& name)
      : _name(name)
   {
   }

   // Frees the resource.
   void cleanup()
   {
      // Print a message as a placeholder.
      std::wstringstream ss;
      ss << _name << L": Freeing..." << std::endl;
      std::wcout << ss.str();
   }
private:
   // The name of the resource.
   std::wstring _name;
};

// Represents a container that holds resources.
class Container
{
public:
   Container(const std::wstring& name)
      : _name(name)
      , _resource1(L"Resource 1")
      , _resource2(L"Resource 2")
      , _resource3(L"Resource 3")
   {
   }

   ~Container()
   {
      std::wstringstream ss;
      ss << _name << L": Freeing resources..." << std::endl;
      std::wcout << ss.str();

      // For illustration, assume that cleanup for _resource1
      // and _resource2 can happen concurrently, and that 
      // _resource3 must be freed after _resource1 and _resource2.

      Concurrency::parallel_invoke(
         [this]() { _resource1.cleanup(); },
         [this]() { _resource2.cleanup(); }
      );

      _resource3.cleanup();
   }

private:
   // The name of the container.
   std::wstring _name;

   // Resources.
   Resource _resource1;
   Resource _resource2;
   Resource _resource3;
};

虽然此模式自身不存在问题,但请考虑以下并行运行两个任务的代码。 第一个任务创建一个 Container 对象,第二个任务取消整个任务。 为进行说明,此示例使用两个 Concurrency::event 对象来确保在创建 Container 对象后执行取消操作,并确保在执行取消操作后销毁 Container 对象。

// parallel-resource-destruction.cpp
// compile with: /EHsc
#include "parallel-resource-destruction.h"

using namespace Concurrency;
using namespace std;

static_assert(false, "This example illustrates a non-recommended practice.");

int main()
{  
   // Create a task_group that will run two tasks.
   task_group tasks;

   // Used to synchronize the tasks.
   event e1, e2;

   // Run two tasks. The first task creates a Container object. The second task
   // cancels the overall task group. To illustrate the scenario where a child 
   // task is not run because its parent task is cancelled, the event objects 
   // ensure that the Container object is created before the overall task is 
   // cancelled and that the Container object is destroyed after the overall 
   // task is cancelled.

   tasks.run([&tasks,&e1,&e2] {
      // Create a Container object.
      Container c(L"Container 1");

      // Allow the second task to continue.
      e2.set();

      // Wait for the task to be cancelled.
      e1.wait();
   });

   tasks.run([&tasks,&e1,&e2] {
      // Wait for the first task to create the Container object.
      e2.wait();

      // Cancel the overall task.
      tasks.cancel();      

      // Allow the first task to continue.
      e1.set();
   });

   // Wait for the tasks to complete.
   tasks.wait();

   wcout << L"Exiting program..." << endl;
}

该示例产生下面的输出:

Container 1: Freeing resources...
Exiting program...

此代码示例包含以下可导致其出现意外行为的问题。

  • 取消父任务会导致取消子任务,并且也会取消对 Concurrency::parallel_invoke 的调用。 因此,不会释放这两种资源。

  • 取消父任务会导致子任务引发内部异常。 因为 Container 析构函数无法处理此异常,所以此异常会向上传播且不会释放第三个资源。

  • 由子任务引发的异常会在整个 Container 析构函数中传播。 从析构函数引发会使应用程序处于未定义的状态。

建议您不要在任务中执行关键操作(如释放资源),除非您可以保证这些任务不会被取消。 还建议您不要使用可在类型的析构函数中引发的运行时功能。

[转到页首]

不要在并行循环中进行反复停滞

并行循环(例如通过停滞操作进行控制的 Concurrency::parallel_forConcurrency::parallel_for_each)可能导致运行时在很短的时间内创建许多线程。

并发运行时可在任务完成或进行协作停滞或让步时执行额外工作。 当一个并行循环迭代停滞时,运行时会开始另一个迭代。 在没有可用的空闲线程时,运行时会创建一个新线程。

当并行循环体偶尔停滞时,此机制可帮助最大化总任务吞吐量。 但是,如果有许多迭代停滞,则运行时会创建线程来运行额外工作。 这会导致内存降低或硬件资源的利用率降低。

请考虑以下在 parallel_for 循环的每个迭代中调用 Concurrency::send 函数的示例。 因为 send 以协作方式停滞,所以运行时会创建一个新线程来在每次调用 send 时都运行额外工作。

// repeated-blocking.cpp
// compile with: /EHsc
#include <ppl.h>
#include <agents.h>

using namespace Concurrency;

static_assert(false, "This example illustrates a non-recommended practice.");

int main()
{
   // Create a message buffer.
   overwrite_buffer<int> buffer;

   // Repeatedly send data to the buffer in a parallel loop.
   parallel_for(0, 1000, [&buffer](int i) {

      // The send function blocks cooperatively. 
      // We discourage the use of repeated blocking in a parallel
      // loop because it can cause the runtime to create 
      // a large number of threads over a short period of time.
      send(buffer, i);
   });
}

建议您重构您的代码以避免此模式。 在此示例中,可以通过在串行 for 循环中调用 send 来避免创建额外线程。

[转到页首]

不要在取消并行工作时执行停滞操作

在调用 Concurrency::task_group::cancelConcurrency::structured_task_group::cancel 方法来取消并行工作之前,尽可能不要执行停滞操作。

在任务执行停滞操作时,运行时可在第一个任务等待数据时执行其他工作。 在启用用户模式计划 (UMS) 时,运行时会在任务执行协作停滞操作或涉及内核切换的停滞操作时执行其他工作。 在启用默认的正常线程计划时,运行时仅在任务执行协作停滞操作时执行其他工作。 运行时会在其恢复运行时重新计划等待任务。 运行时通常会在其重新计划那些不是最近恢复运行的任务以前重新计划最近才恢复运行的任务。 因此,运行时在执行停滞操作期间会计划不必要的工作,这会降低性能。 因此,如果在取消并行工作之前执行停滞操作,该停滞操作会延迟调用 cancel。 这会导致其他任务执行不必要的工作。

请考虑以下定义了 parallel_find_answer 函数的示例,此示例搜索满足已提供谓词函数的已提供数组的元素。 在谓词函数返回 true 时,并行工作函数会创建一个 Answer 对象并取消整个任务。

// blocking-cancel.cpp
// compile with: /c /EHsc
#include <windows.h>
#include <ppl.h>

using namespace Concurrency;

// Encapsulates the result of a search operation.
template<typename T>
class Answer
{
public:
   explicit Answer(const T& data)
      : _data(data)
   {
   }

   T get_data() const
   {
      return _data;
   }

   // TODO: Add other methods as needed.

private:
   T _data;

   // TODO: Add other data members as needed.
};

// Searches for an element of the provided array that satisfies the provided
// predicate function.
template<typename T, class Predicate>
Answer<T>* parallel_find_answer(const T a[], size_t count, const Predicate& pred)
{
   // The result of the search.
   Answer<T>* answer = nullptr;
   // Ensures that only one task produces an answer.
   volatile long first_result = 0;

   // Use parallel_for and a task group to search for the element.
   structured_task_group tasks;
   tasks.run_and_wait([&]
   {
      // Declare the type alias for use in the inner lambda function.
      typedef T T;

      parallel_for<size_t>(0, count, [&](const T& n) {
         if (pred(a[n]) && InterlockedExchange(&first_result, 1) == 0)
         {
            // Create an object that holds the answer.
            answer = new Answer<T>(a[n]);
            // Cancel the overall task.
            tasks.cancel();
         }
      });
   });

   return answer;
}

new 运算符可执行可能停滞的堆分配。 在启用用户模式计划 (UMS) 时,运行时会在执行停滞操作期间执行其他工作。 在启用正常线程计划时,运行时仅在任务执行协作停滞调用(如调用 Concurrency::critical_section::lock)时执行其他工作。

以下示例显示如何防止不必要的工作,从而提高性能。 此示例在为 Answer 对象分配存储以前取消任务组。

// Searches for an element of the provided array that satisfies the provided
// predicate function.
template<typename T, class Predicate>
Answer<T>* parallel_find_answer(const T a[], size_t count, const Predicate& pred)
{
   // The result of the search.
   Answer<T>* answer = nullptr;
   // Ensures that only one task produces an answer.
   volatile long first_result = 0;

   // Use parallel_for and a task group to search for the element.
   structured_task_group tasks;
   tasks.run_and_wait([&]
   {
      // Declare the type alias for use in the inner lambda function.
      typedef T T;

      parallel_for<size_t>(0, count, [&](const T& n) {
         if (pred(a[n]) && InterlockedExchange(&first_result, 1) == 0)
         {
            // Cancel the overall task.
            tasks.cancel();
            // Create an object that holds the answer.
            answer = new Answer<T>(a[n]);            
         }
      });
   });

   return answer;
}

[转到页首]

不要在并行循环中写入共享数据

并发运行时提供了几个同步对共享数据进行的并发访问的数据结构,例如 Concurrency::critical_section。 这些数据结构在许多情况下(例如,当多个任务不经常需要对资源进行共享访问时)都很有用。

请考虑以下示例,此示例使用 Concurrency::parallel_for_each 算法和 critical_section 对象来计算 std::array 对象中素数的计数。 此示例不进行扩展,因为每个线程都必须进行等待,然后才能访问共享变量 prime_sum

critical_section cs;
prime_sum = 0;
parallel_for_each(a.begin(), a.end(), [&](int i) {
   cs.lock();
   prime_sum += (is_prime(i) ? i : 0);
   cs.unlock();
});

此示例还会导致性能下降,因为频繁的锁定操作有效地序列化了循环。 此外,当并发运行时对象执行停滞操作时,计划程序会创建另一个线程以便在第一个线程等待数据时执行其他工作。 如果运行时因许多任务正在等待共享数据而创建了许多线程,那么应用程序的执行效率会很差或者会进入低资源状态。

PPL 定义了 Concurrency::combinable 类,后者通过以无锁方式提供对共享资源的访问权限来帮助您取消共享状态。 combinable 类提供了线程本地存储,以使您能够执行精细计算,然后将这些计算合并为最终结果。 可以将 combinable 对象视为缩减变量。

通过使用 combinable 对象而不是 critical_section 对象计算总和,以下示例修改了前面的示例。 此示例进行了扩展,因为每个线程都保留其自己的本地副本总和。 此示例使用 Concurrency::combinable::combine 方法将本地计算合并为最终结果。

combinable<int> sum;
parallel_for_each(a.begin(), a.end(), [&](int i) {
   sum.local() += (is_prime(i) ? i : 0);
});
prime_sum = sum.combine(plus<int>());

有关此示例的完整版本,请参见如何:使用 combinable 提高性能。 有关 combinable 类的更多信息,请参见并行容器和对象

[转到页首]

请尽可能避免伪共享

当多个运行在独立处理器上的并发任务写入位于同一缓存行上的变量时,会发生“伪共享”。 当一项任务写入其中一个变量时,两个变量的缓存行均无效。 每个处理器都必须在缓存行无效时重新加载该缓存行。 因此,伪共享可导致应用程序性能降低。

以下基本示例演示两个并发任务,每个并发任务均递增一个共享计数器变量。

volatile long count = 0L;
Concurrency::parallel_invoke(
   [&count] {
      for(int i = 0; i < 100000000; ++i)
         InterlockedIncrement(&count);
   },
   [&count] {
      for(int i = 0; i < 100000000; ++i)
         InterlockedIncrement(&count);
   }
);

若要在这两项任务之间取消共享数据,您可以修改此示例以使用两个计数器变量。 在任务完成后,此示例会计算最终计数器值。 但是,此示例阐释了伪共享,因为 count1count2 变量很可能位于同一缓存行上。

long count1 = 0L;
long count2 = 0L;
Concurrency::parallel_invoke(
   [&count1] {
      for(int i = 0; i < 100000000; ++i)
         ++count1;
   },
   [&count2] {
      for(int i = 0; i < 100000000; ++i)
         ++count2;
   }
);
long count = count1 + count2;

取消伪共享的一种方式是确保计数器变量位于单独的缓存行上。 以下示例在 64 字节边界上对齐 count1count2 变量。

__declspec(align(64)) long count1 = 0L;      
__declspec(align(64)) long count2 = 0L;      
Concurrency::parallel_invoke(
   [&count1] {
      for(int i = 0; i < 100000000; ++i)
         ++count1;
   },
   [&count2] {
      for(int i = 0; i < 100000000; ++i)
         ++count2;
   }
);
long count = count1 + count2;

此示例假设内存缓存的大小是 64 或更低字节。

如果必须在任务之间共享数据,建议您使用 Concurrency::combinable 类。 combinable 类用这种不太可能实现伪共享的方式创建线程本地变量。 有关 combinable 类的更多信息,请参见并行容器和对象

[转到页首]

确保变量在任务的整个生存期内有效

在为任务组或并行算法提供 lambda 表达式时,capture 从句会指定 lambda 表达式的主体是通过值还是引用来访问封闭范围内的变量。 如果通过引用将变量传递到 lambda 表达式,您必须保证此变量的生存期能够一直持续到任务完成。

请考虑以下定义了 object 类和 perform_action 函数的示例。 perform_action 函数创建 object 变量并对此变量异步执行一些操作。 由于在 perform_action 函数返回以前不能保证完成任务,因此,如果在任务运行时销毁 object 变量,那么程序将崩溃或发生未指定的行为。

// lambda-lifetime.cpp
// compile with: /c /EHsc
#include <ppl.h>

using namespace Concurrency;

// A type that performs an action.
class object
{
public:
   void action() const
   {
      // TODO: Details omitted for brevity.
   }
};

// Performs an action asynchronously.
void perform_action(task_group& tasks)
{
   // Create an object variable and perform some action on 
   // that variable asynchronously.
   object obj;
   tasks.run([&obj] {
      obj.action();
   });

   // NOTE: The object variable is destroyed here. The program
   // will crash or exhibit unspecified behavior if the task
   // is still running when this function returns.
}

根据您的应用程序的要求,您可以使用以下技术之一确保变量在每项任务的整个生存期内始终有效。

以下示例通过值将 object 变量传递到任务。 因此,该任务会对其自己的副本执行操作。

// Performs an action asynchronously.
void perform_action(task_group& tasks)
{
   // Create an object variable and perform some action on 
   // that variable asynchronously.
   object obj;
   tasks.run([obj] {
      obj.action();
   });
}

因为 object 变量通过值进行传递,所以此变量发生的任何状态更改都不会出现在原始副本中。

以下示例使用 Concurrency::task_group::wait 方法确保在 perform_action 函数返回以前完成任务。

// Performs an action.
void perform_action(task_group& tasks)
{
   // Create an object variable and perform some action on 
   // that variable.
   object obj;
   tasks.run([&obj] {
      obj.action();
   });

   // Wait for the task to finish. 
   tasks.wait();
}

因为在函数返回以前完成了任务,所以 perform_action 函数不在异步执行操作。

以下示例修改了 perform_action 函数,以便引用 object 变量。 调用方必须确保 object 变量的生存期能够持续到任务完成。

// Performs an action asynchronously.
void perform_action(object& obj, task_group& tasks)
{
   // Perform some action on the object variable.
   tasks.run([&obj] {
      obj.action();
   });
}

您还可以使用指针来控制传递到任务组或并行算法的对象的生存期。

有关 lambda 表达式的更多信息,请参见 Lambda Expressions in C++

[转到页首]

请参见

任务

如何:使用 parallel_invoke 来编写并行排序运行时

如何:使用取消中断 Parallel 循环

如何:使用 combinable 提高性能

概念

并发运行时最佳做法

并行模式库 (PPL)

并行容器和对象

并行算法

PPL 中的取消操作

并发运行时中的异常处理

其他资源

演练:创建图像处理网络

异步代理库中的最佳做法

并发运行时中的常规最佳做法

修订记录

日期

修订记录

原因

2011 年 3 月

添加了有关如何在并行循环中避免重复停滞,以及取消和异常处理如何影响对象销毁的信息。

信息补充。

2010 年 5 月

扩充了指导内容。

信息补充。