并行模式库中的最佳做法

本文档介绍如何最佳有效地使用并行模式库 (PPL)。 PPL 提供通用的容器、对象和算法,用于执行细化并行。

有关 PPL 的详细信息,请参阅并行模式库 (PPL)

章节

本文档包含以下各节:

不要对小型循环体进行并行化

相对较小的循环体并行化可能会导致相关的计划开销超过并行处理的获益。 考虑下面的示例,它可将每对元素添加到两个数组。

// small-loops.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create three arrays that each have the same size.
   const size_t size = 100000;
   int a[size], b[size], c[size];

   // Initialize the arrays a and b.
   for (size_t i = 0; i < size; ++i)
   {
      a[i] = i;
      b[i] = i * 2;
   }

   // Add each pair of elements in arrays a and b in parallel 
   // and store the result in array c.
   parallel_for<size_t>(0, size, [&a,&b,&c](size_t i) {
      c[i] = a[i] + b[i];
   });

   // TODO: Do something with array c.
}

每个并行循环迭代的工作负荷太小,因而无法受益于并行处理的开销。 通过在循环体中执行更多的工作或通过按顺序执行循环,可以提高此循环的性能。

[返回页首]

表示最高可能级别的并行

当只在较低级别并行化代码时,可引入 fork-join 构造,它不随处理器数量的增加而变化。 在 fork-join 构造中,一个任务将其工作划分为较小的并行子任务,并等待这些子任务完成。 每个子任务可以递归方式将其自身划分为其他子任务。

尽管 fork-join 模型可用于解决各种问题,但也存在同步开销会降低可伸缩性的情况。 例如,下面处理图像数据的串行代码。

// Calls the provided function for each pixel in a Bitmap object.
void ProcessImage(Bitmap* bmp, const function<void (DWORD&)>& f)
{
   int width = bmp->GetWidth();
   int height = bmp->GetHeight();

   // Lock the bitmap.
   BitmapData bitmapData;
   Rect rect(0, 0, bmp->GetWidth(), bmp->GetHeight());
   bmp->LockBits(&rect, ImageLockModeWrite, PixelFormat32bppRGB, &bitmapData);

   // Get a pointer to the bitmap data.
   DWORD* image_bits = (DWORD*)bitmapData.Scan0;

   // Call the function for each pixel in the image.
   for (int y = 0; y < height; ++y)
   {      
      for (int x = 0; x < width; ++x)
      {
         // Get the current pixel value.
         DWORD* curr_pixel = image_bits + (y * width) + x;

         // Call the function.
         f(*curr_pixel);
      }
   }

   // Unlock the bitmap.
   bmp->UnlockBits(&bitmapData);
}

由于每个循环迭代是独立的,因此可以并行化大部分工作,如下面的示例所示。 此示例使用 concurrency::parallel_for 算法并行化外部循环。

// Calls the provided function for each pixel in a Bitmap object.
void ProcessImage(Bitmap* bmp, const function<void (DWORD&)>& f)
{
   int width = bmp->GetWidth();
   int height = bmp->GetHeight();

   // Lock the bitmap.
   BitmapData bitmapData;
   Rect rect(0, 0, bmp->GetWidth(), bmp->GetHeight());
   bmp->LockBits(&rect, ImageLockModeWrite, PixelFormat32bppRGB, &bitmapData);

   // Get a pointer to the bitmap data.
   DWORD* image_bits = (DWORD*)bitmapData.Scan0;

   // Call the function for each pixel in the image.
   parallel_for (0, height, [&, width](int y)
   {      
      for (int x = 0; x < width; ++x)
      {
         // Get the current pixel value.
         DWORD* curr_pixel = image_bits + (y * width) + x;

         // Call the function.
         f(*curr_pixel);
      }
   });

   // Unlock the bitmap.
   bmp->UnlockBits(&bitmapData);
}

下面的示例通过调用循环中的 ProcessImage 函数阐释 fork-join 构造。 每个对 ProcessImage 的调用直到每个子任务完成后才返回。

// Processes each bitmap in the provided vector.
void ProcessImages(vector<Bitmap*> bitmaps, const function<void (DWORD&)>& f)
{
   for_each(begin(bitmaps), end(bitmaps), [&f](Bitmap* bmp) {
      ProcessImage(bmp, f);
   });
}

如果并行循环的每次迭代几乎不执行任何工作,或执行非平衡并行循环执行的工作(即一些循环迭代比另一些花费更长的时间),那么频繁进行分叉和联接工作所需的计划开销可能超过并行执行的带来的好处。 此开销会随着处理器数量增加而增加。

要减少此示例中计划开销的数量,可在并行内部循环前先并行外部循环,或使用其他并行构造,如流水线结构。 以下示例修改了 ProcessImages 函数,以便使用 concurrency::parallel_for_each 算法并行化外部循环。

// Processes each bitmap in the provided vector.
void ProcessImages(vector<Bitmap*> bitmaps, const function<void (DWORD&)>& f)
{
   parallel_for_each(begin(bitmaps), end(bitmaps), [&f](Bitmap* bmp) {
      ProcessImage(bmp, f);
   });
}

有关使用管道并行执行图像处理的类似示例,请参阅演练:创建图像处理网络

[返回页首]

使用 parallel_invoke 来解决分治问题

分治问题是使用递归将任务细分为多个子任务的 fork-join 构造的一种形式。 除了 concurrency::task_groupconcurrency::structured_task_group 类之外,还可以使用 concurrency::parallel_invoke 算法来解决分治法问题。 parallel_invoke 算法具有比任务组对象更简洁的语法,并当具有固定数目的并行任务时非常有用。

下面的示例演示使用 parallel_invoke 算法来实现双调排序算法。

// Sorts the given sequence in the specified order.
template <class T>
void parallel_bitonic_sort(T* items, int lo, int n, bool dir)
{   
   if (n > 1)
   {
      // Divide the array into two partitions and then sort 
      // the partitions in different directions.
      int m = n / 2;

      parallel_invoke(
         [&] { parallel_bitonic_sort(items, lo, m, INCREASING); },
         [&] { parallel_bitonic_sort(items, lo + m, m, DECREASING); }
      );
      
      // Merge the results.
      parallel_bitonic_merge(items, lo, n, dir);
   }
}

为了降低开销,parallel_invoke 算法将在调用上下文时执行系列任务的最后一个。

有关此示例的完整版本,请参阅如何:使用 parallel_invoke 编写并行排序例程。 有关 parallel_invoke 算法的详细信息,请参阅并行算法

[返回页首]

使用取消或异常处理来中断并行循环

PPL 提供了两种方法来取消任务组或并行算法所执行的并行工作。 一种方法是使用 concurrency::task_groupconcurrency::structured_task_group 类提供的取消机制。 另一种方法是在任务工作函数体中引发异常。 当取消并行工作树时,取消机制比异常处理更有效。 并行工作树是一组相关的任务组,其中一些任务组包含其他任务组。 取消机制以自上而下的方式取消任务组及其子任务组。 相反,异常处理以自下而上的方式工作,并且必须在异常向上传播时单独取消每个子任务组。

当你直接使用任务组对象时,使用 concurrency::task_group::cancelconcurrency::structured_task_group::cancel 方法取消属于该任务组的工作。 若要取消并行算法(如 parallel_for),创建父任务组并取消该任务组。 例如,可使用以下函数,parallel_find_any,它以并行方式搜索数组中的值。

// Returns the position in the provided array that contains the given value, 
// or -1 if the value is not in the array.
template<typename T>
int parallel_find_any(const T a[], size_t count, const T& what)
{
   // The position of the element in the array. 
   // The default value, -1, indicates that the element is not in the array.
   int position = -1;

   // Call parallel_for in the context of a cancellation token to search for the element.
   cancellation_token_source cts;
   run_with_cancellation_token([count, what, &a, &position, &cts]()
   {
      parallel_for(std::size_t(0), count, [what, &a, &position, &cts](int n) {
         if (a[n] == what)
         {
            // Set the return value and cancel the remaining tasks.
            position = n;
            cts.cancel();
         }
      });
   }, cts.get_token());

   return position;
}

由于并行算法使用任务组,当一个并行迭代取消父任务组时,整个任务将被取消。 有关此示例的完整版本,请参阅如何:使用取消来中断并行循环

尽管异常处理是一种比取消机制效率更低的取消并行工作的方法,但在某些情况中异常处理也非常适用。 例如,下面的方法,for_all,以递归方式在 tree 结构的每个节点上执行工作函数。 在此示例中,_children 数据成员是包含 tree 对象的 std::list

// Performs the given work function on the data element of the tree and
// on each child.
template<class Function>
void tree::for_all(Function& action)
{
   // Perform the action on each child.
   parallel_for_each(begin(_children), end(_children), [&](tree& child) {
      child.for_all(action);
   });

   // Perform the action on this node.
   action(*this);
}

如果不要求对树的每个元素调用工作函数,则 tree::for_all 方法的调用方可以引发异常。 下面的介绍了 search_for_value 函数,它可在所提供的 tree 对象中搜索值。 search_for_value 函数使用工作函数,可在树的当前元素匹配提供的值时引发异常。 search_for_value 函数使用 try-catch 块来捕获异常,并将结果打印到控制台。

// Searches for a value in the provided tree object.
template <typename T>
void search_for_value(tree<T>& t, int value)
{
   try
   {
      // Call the for_all method to search for a value. The work function
      // throws an exception when it finds the value.
      t.for_all([value](const tree<T>& node) {
         if (node.get_data() == value)
         {
            throw &node;
         }
      });
   }
   catch (const tree<T>* node)
   {
      // A matching node was found. Print a message to the console.
      wstringstream ss;
      ss << L"Found a node with value " << value << L'.' << endl;
      wcout << ss.str();
      return;
   }

   // A matching node was not found. Print a message to the console.
   wstringstream ss;
   ss << L"Did not find node with value " << value << L'.' << endl;
   wcout << ss.str();   
}

有关此示例的完整版本,请参阅如何:使用异常处理中断并行循环

有关 PPL 提供的取消和异常处理机制的更多一般信息,请参阅PPL 中的取消异常处理

[返回页首]

了解取消和异常处理如何影响对象销毁

在并行工作树中,取消的任务会阻止子任务运行。 如果一个子任务执行的操作对于应用程序很重要(如释放资源),则这可能会导致问题。 此外,任务取消可能导致异常通过对象析构函数进行传播,并在应用程序中导致不明确的行为。

在下面的示例中,Resource 类描述资源,Container 类描述保存资源的容器。 在其析构函数中,Container 类在它两个 Resource 成员上并行调用 cleanup 方法,然后在它第三个 Resource 成员上调用 cleanup 方法。

// parallel-resource-destruction.h
#pragma once
#include <ppl.h>
#include <sstream>
#include <iostream>

// Represents a resource.
class Resource
{
public:
   Resource(const std::wstring& name)
      : _name(name)
   {
   }

   // Frees the resource.
   void cleanup()
   {
      // Print a message as a placeholder.
      std::wstringstream ss;
      ss << _name << L": Freeing..." << std::endl;
      std::wcout << ss.str();
   }
private:
   // The name of the resource.
   std::wstring _name;
};

// Represents a container that holds resources.
class Container
{
public:
   Container(const std::wstring& name)
      : _name(name)
      , _resource1(L"Resource 1")
      , _resource2(L"Resource 2")
      , _resource3(L"Resource 3")
   {
   }

   ~Container()
   {
      std::wstringstream ss;
      ss << _name << L": Freeing resources..." << std::endl;
      std::wcout << ss.str();

      // For illustration, assume that cleanup for _resource1
      // and _resource2 can happen concurrently, and that 
      // _resource3 must be freed after _resource1 and _resource2.

      concurrency::parallel_invoke(
         [this]() { _resource1.cleanup(); },
         [this]() { _resource2.cleanup(); }
      );

      _resource3.cleanup();
   }

private:
   // The name of the container.
   std::wstring _name;

   // Resources.
   Resource _resource1;
   Resource _resource2;
   Resource _resource3;
};

尽管这种模式在其自身上没有任何问题,但建议使用下面并行运行两个任务的代码。 第一个任务创建 Container 对象,第二个任务取消整个任务。 举例来说,该示例使用两个 concurrency::event 对象来确保在创建 Container 对象后发生取消,并在取消操作发生后销毁 Container 对象。

// parallel-resource-destruction.cpp
// compile with: /EHsc
#include "parallel-resource-destruction.h"

using namespace concurrency;
using namespace std;

static_assert(false, "This example illustrates a non-recommended practice.");

int main()
{  
   // Create a task_group that will run two tasks.
   task_group tasks;

   // Used to synchronize the tasks.
   event e1, e2;

   // Run two tasks. The first task creates a Container object. The second task
   // cancels the overall task group. To illustrate the scenario where a child 
   // task is not run because its parent task is cancelled, the event objects 
   // ensure that the Container object is created before the overall task is 
   // cancelled and that the Container object is destroyed after the overall 
   // task is cancelled.
   
   tasks.run([&tasks,&e1,&e2] {
      // Create a Container object.
      Container c(L"Container 1");
      
      // Allow the second task to continue.
      e2.set();

      // Wait for the task to be cancelled.
      e1.wait();
   });

   tasks.run([&tasks,&e1,&e2] {
      // Wait for the first task to create the Container object.
      e2.wait();

      // Cancel the overall task.
      tasks.cancel();      

      // Allow the first task to continue.
      e1.set();
   });

   // Wait for the tasks to complete.
   tasks.wait();

   wcout << L"Exiting program..." << endl;
}

该示例产生下面的输出:

Container 1: Freeing resources...Exiting program...

此代码示例包含的以下问题可能导致其与预期行为不同:

  • 父任务的取消会导致子任务(对 concurrency::parallel_invoke 的调用)也被取消。 因此,不会释放这两个资源。

  • 父任务的取消将导致子任务引发内部异常。 由于 Container 析构函数不会处理此异常,因此异常会向上传播并且不会释放第三个资源。

  • 子任务引发的异常通过 Container 析构函数进行传播。 从析构函数引发会将应用程序置于未定义的状态。

我们建议不在任务中执行关键操作(如释放资源),除非可以保证这些任务不会被取消。 我们还建议不使用可在类型的析构函数中引发的运行时功能。

[返回页首]

不要在并行循环中反复阻止

并行循环(例如通过停滞操作进行控制的 concurrency::parallel_forconcurrency::parallel_for_each)可能导致运行时在很短的时间内创建许多线程。

当任务完成,或以协作方式停滞或让出时,并发运行时将执行其他工作。 当一个并行循环迭代停滞时,运行时可能会开始另一个迭代。 当不存在可用的空闲线程时,运行时将创建一个新线程。

当并行循环体偶尔停滞时,此机制可帮助最大化整体任务吞吐量。 但当多个迭代停滞时,运行时可能会创建多个线程来运行其他工作。 这可能导致内存不足的情况或较差的硬件资源使用情况。

请考虑以下示例,该示例在 parallel_for 循环的每次迭代中调用 concurrency::send 函数。 由于 send 以协作方式停滞,所以每次调用 send 时运行时都会创建一个新线程来运行其他工作。

// repeated-blocking.cpp
// compile with: /EHsc
#include <ppl.h>
#include <agents.h>

using namespace concurrency;

static_assert(false, "This example illustrates a non-recommended practice.");

int main()
{
   // Create a message buffer.
   overwrite_buffer<int> buffer;
  
   // Repeatedly send data to the buffer in a parallel loop.
   parallel_for(0, 1000, [&buffer](int i) {
      
      // The send function blocks cooperatively. 
      // We discourage the use of repeated blocking in a parallel
      // loop because it can cause the runtime to create 
      // a large number of threads over a short period of time.
      send(buffer, i);
   });
}

我们建议你重构代码来避免这种模式。 在此示例中,可通过在串行 for 循环中调用 send 来避免其他线程的创建。

[返回页首]

当取消并行工作时,不要执行阻止操作

如果可能,在调用 concurrency::task_group::cancelconcurrency::structured_task_group::cancel 方法取消并行工作之前不要执行阻止操作。

在任务执行协作停滞操作时,运行时可在第一个任务等待数据时执行其他工作。 当运行时取消停滞时,它将重新安排正在等待的任务。 通常运行时会先重新安排最近取消停滞的任务,然后再重新安排后来取消停滞的任务。 因此,运行时在停滞操作期间可能安排不必要的工作,这会导致性能下降。 相应地,如果在取消并行工作前执行停滞操作,则停滞操作会延迟对 cancel 的调用。 这将导致其他任务执行不必要的工作。

请看下面定义 parallel_find_answer 函数的示例,该函数搜索所提供的满足提供的谓词函数的数组的元素。 当谓词函数返回 true 时,并行工作函数会创建一个 Answer 对象并取消整个任务。

// blocking-cancel.cpp
// compile with: /c /EHsc
#include <windows.h>
#include <ppl.h>

using namespace concurrency;

// Encapsulates the result of a search operation.
template<typename T>
class Answer
{
public:
   explicit Answer(const T& data)
      : _data(data)
   {
   }

   T get_data() const
   {
      return _data;
   }

   // TODO: Add other methods as needed.

private:
   T _data;

   // TODO: Add other data members as needed.
};

// Searches for an element of the provided array that satisfies the provided
// predicate function.
template<typename T, class Predicate>
Answer<T>* parallel_find_answer(const T a[], size_t count, const Predicate& pred)
{
   // The result of the search.
   Answer<T>* answer = nullptr;
   // Ensures that only one task produces an answer.
   volatile long first_result = 0;

   // Use parallel_for and a task group to search for the element.
   structured_task_group tasks;
   tasks.run_and_wait([&]
   {
      // Declare the type alias for use in the inner lambda function.
      typedef T T;

      parallel_for<size_t>(0, count, [&](const T& n) {
         if (pred(a[n]) && InterlockedExchange(&first_result, 1) == 0)
         {
            // Create an object that holds the answer.
            answer = new Answer<T>(a[n]);
            // Cancel the overall task.
            tasks.cancel();
         }
      });
   });

   return answer;
}

new 运算符可执行可能会停滞的堆分配。 仅当任务执行协作阻止调用(例如对 concurrency::critical_section::lock 的调用)时,运行时才会执行其他工作。

下面的示例介绍如何防止不必要的工作,从而提高性能。 此示例在为 Answer 对象分配存储前先取消任务组。

// Searches for an element of the provided array that satisfies the provided
// predicate function.
template<typename T, class Predicate>
Answer<T>* parallel_find_answer(const T a[], size_t count, const Predicate& pred)
{
   // The result of the search.
   Answer<T>* answer = nullptr;
   // Ensures that only one task produces an answer.
   volatile long first_result = 0;

   // Use parallel_for and a task group to search for the element.
   structured_task_group tasks;
   tasks.run_and_wait([&]
   {
      // Declare the type alias for use in the inner lambda function.
      typedef T T;

      parallel_for<size_t>(0, count, [&](const T& n) {
         if (pred(a[n]) && InterlockedExchange(&first_result, 1) == 0)
         {
            // Cancel the overall task.
            tasks.cancel();
            // Create an object that holds the answer.
            answer = new Answer<T>(a[n]);            
         }
      });
   });

   return answer;
}

[返回页首]

不要在并行循环中写入共享数据

并发运行时提供了多种数据结构,例如 concurrency::critical_section,它们可以同步对共享数据的并发访问。 很多情况下这些数据结构非常有用,例如,当多个任务很少需要对资源的共享访问时。

请考虑以下示例,该示例使用 concurrency::parallel_for_each 算法和 critical_section 对象来计算 std::array 对象中的素数计数。 此示例不会进行扩展,因为每个线程必须等待以访问共享的变量 prime_sum

critical_section cs;
prime_sum = 0;
parallel_for_each(begin(a), end(a), [&](int i) {
   cs.lock();
   prime_sum += (is_prime(i) ? i : 0);
   cs.unlock();
});

此示例也可能导致性能不佳,因为频繁的锁定操作有力地对循环进行了序列化。 此外,当并发运行时对象执行停滞操作时,计划程序可能会在第一个线程等待数据时创建其他线程来执行其他工作。 如果运行时因许多任务正等待共享数据而创建许多线程,那么应用程序可能不会很好地运行,或进入资源不足的状态。

PPL 定义了 concurrency::combinable 类,它通过以无锁方式提供对共享资源的访问来帮助消除共享状态。 combinable 类提供了线程本地存储,该存储允许你执行细化的计算,然后将这些计算合并为最终的结果。 你可以将 combinable 对象当作 reduction 变量。

下面的示例使用 combinable 对象而不是 critical_section 对象来计算总和,从而修改前一个结果。 此示例可进行缩放,因为每个线程都会保存自己的总和本地副本。 此示例使用 concurrency::combinable::combine 方法将本地计算合并为最终结果。

combinable<int> sum;
parallel_for_each(begin(a), end(a), [&](int i) {
   sum.local() += (is_prime(i) ? i : 0);
});
prime_sum = sum.combine(plus<int>());

有关此示例的完整版本,请参阅如何:使用 combinable 提高性能。 有关 combinable 类的详细信息,请参阅并行容器和对象

[返回页首]

如果可能,避免错误共享

当在不同处理器上运行的多个并发任务写入位于同一高速缓存行上的变量时,会发生错误共享。 当一个任务写入一个变量时,这两个变量的缓存行将会失效。 每当缓存行失效时,每个处理器必须重新加载缓存行。 因此,错误共享会导致应用程序中的性能降低。

以下基本示例介绍了两个并发任务,每个任务都增加了共享的计数器变量。

volatile long count = 0L;
concurrency::parallel_invoke(
   [&count] {
      for(int i = 0; i < 100000000; ++i)
         InterlockedIncrement(&count);
   },
   [&count] {
      for(int i = 0; i < 100000000; ++i)
         InterlockedIncrement(&count);
   }
);

若要消除两个任务间的数据共享,可修改该示例以使用两个计数器变量。 任务完成后,此示例将计算最终的计数器值。 但此示例还说明了错误共享,因为变量 count1count2 可能位于同一缓存行。

long count1 = 0L;
long count2 = 0L;
concurrency::parallel_invoke(
   [&count1] {
      for(int i = 0; i < 100000000; ++i)
         ++count1;
   },
   [&count2] {
      for(int i = 0; i < 100000000; ++i)
         ++count2;
   }
);
long count = count1 + count2;

消除错误共享的一种方法是确保计数器变量位于不同的缓存行。 下面的示例将对齐 64 字节边界上的 count1count2 变量。

__declspec(align(64)) long count1 = 0L;      
__declspec(align(64)) long count2 = 0L;      
concurrency::parallel_invoke(
   [&count1] {
      for(int i = 0; i < 100000000; ++i)
         ++count1;
   },
   [&count2] {
      for(int i = 0; i < 100000000; ++i)
         ++count2;
   }
);
long count = count1 + count2;

此示例假定内存缓存的大小为 64 个或更少的字节。

当必须在任务之间共享数据时,我们建议使用 concurrency::combinable 类。 combinable 类以不太可能发生错误共享的方式创建线程本地变量。 有关 combinable 类的详细信息,请参阅并行容器和对象

[返回页首]

确保变量在任务的整个生存期内有效

如果向任务组或并行算法提供 Lambda 表达式,capture 子句将指定 Lambda 表达式的主体是否通过值或引用访问封闭范围中的变量。 通过引用将变量传递到 Lambda 表达式时,必须保证该变量的生存期在任务完成之前一直保持。

请看下面的示例,该示例定义了 object 类和 perform_action 函数。 perform_action 函数创建 object 变量,并在该变量中以异步方式执行某项操作。 由于不能保证在 perform_action 函数返回前完成任务,因此,如果 object 变量在任务运行时被销毁,则程序将崩溃或发生未指定的行为。

// lambda-lifetime.cpp
// compile with: /c /EHsc
#include <ppl.h>

using namespace concurrency;

// A type that performs an action.
class object
{
public:
   void action() const
   {
      // TODO: Details omitted for brevity.
   }
};

// Performs an action asynchronously.
void perform_action(task_group& tasks)
{
   // Create an object variable and perform some action on 
   // that variable asynchronously.
   object obj;
   tasks.run([&obj] {
      obj.action();
   });

   // NOTE: The object variable is destroyed here. The program
   // will crash or exhibit unspecified behavior if the task
   // is still running when this function returns.
}

具体取决于应用程序的要求,可使用以下方法之一来保证变量在每项任务的整个生存期保持有效。

下面的示例通过值将 object 变量传入任务。 因此,任务可在自己的变量副本上进行操作。

// Performs an action asynchronously.
void perform_action(task_group& tasks)
{
   // Create an object variable and perform some action on 
   // that variable asynchronously.
   object obj;
   tasks.run([obj] {
      obj.action();
   });
}

由于 object 变量通过值进行传递,因此,此变量发生的任何状态更改不会出现在原始副本。

以下示例使用 concurrency::task_group::wait 方法确保在 perform_action 函数返回以前完成任务。

// Performs an action.
void perform_action(task_group& tasks)
{
   // Create an object variable and perform some action on 
   // that variable.
   object obj;
   tasks.run([&obj] {
      obj.action();
   });

   // Wait for the task to finish. 
   tasks.wait();
}

由于函数返回前任务现在完成了,因此,perform_action 函数不再以异步方式进行。

下面的示例修改 perform_action 函数以获取对 object 变量的引用。 调用方必须保证 object 变量的生存期在任务完成前是有效的。

// Performs an action asynchronously.
void perform_action(object& obj, task_group& tasks)
{
   // Perform some action on the object variable.
   tasks.run([&obj] {
      obj.action();
   });
}

也可使用指针来控制传入任务组或并行算法的对象的生存期。

有关 lambda 表达式的详细信息,请参阅 Lambda 表达式

[返回页首]

另请参阅

并发运行时最佳做法
并行模式库 (PPL)
并行容器和对象
并行算法
PPL 中的取消操作
异常处理
演练:创建图像处理网络
如何:使用 parallel_invoke 来编写并行排序例程
如何:使用取消来中断并行循环
如何:使用 combinable 提高性能
异步代理库中的最佳做法
并发运行时中的常规最佳做法