如何:转换使用取消的 OpenMP 循环以使用并发运行时

某些并行循环不需要执行所有迭代。 例如,搜索值的算法可以在找到值后终止。 OpenMP 不提供算法以跳出并行循环。 但是,您可以使用布尔值或标志,以允许循环的迭代指示已找到解决方案。 并发运行时提供允许一个任务取消其他尚未开始任务的功能。

本示例演示如何转换不需要运行所有迭代的 OpenMP parallel for 循环,以使用并发运行时取消机制。

示例

本示例使用 OpenMP 和并发运行时实现 std::any_of 算法的并行版本。 本示例的 OpenMP 版本使用标志协调所有满足条件的并行循环迭代。 使用并发运行时的版本使用 Concurrency::structured_task_group::cancel 方法在满足条件时停止全部操作。

// concrt-omp-parallel-any-of.cpp
// compile with: /EHsc /openmp
#include <ppl.h>
#include <array>
#include <random>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Uses OpenMP to determine whether a condition exists in 
// the specified range of elements.
template <class InIt, class Predicate>
bool omp_parallel_any_of(InIt first, InIt last, const Predicate& pr)
{
   typedef typename std::iterator_traits<InIt>::value_type item_type;

   // A flag that indicates that the condition exists.
   bool found = false;

   #pragma omp parallel for
      for (int i = 0; i < static_cast<int>(last-first); ++i)
      {
         if (!found)
         {
            item_type& cur = *(first + i);

            // If the element satisfies the condition, set the flag to 
            // cancel the operation.
            if (pr(cur)) {
               found = true;
            }
         }
      }

   return found;
}

// Uses the Concurrency Runtime to determine whether a condition exists in 
// the specified range of elements.
template <class InIt, class Predicate>
bool concrt_parallel_any_of(InIt first, InIt last, const Predicate& pr)
{
   typedef typename std::iterator_traits<InIt>::value_type item_type;

   structured_task_group tasks;

   // Create a predicate function that cancels the task group when
   // an element satisfies the condition.
   auto for_each_predicate = [&pr, &tasks](const item_type& cur) {
      if (pr(cur)) {
         tasks.cancel();
      }
   };

   // Create a task that calls the predicate function in parallel on each
   // element in the range.
   auto task = make_task([&]() {
       parallel_for_each(first, last, for_each_predicate);
   });

   // The condition is satisfied if the task group is in the cancelled state.
   return tasks.run_and_wait(task) == canceled;
}

int wmain()
{
   // The length of the array.
   const size_t size = 100000;

   // Create an array and initialize it with random values.
   array<int, size> a;   
   generate(a.begin(), a.end(), mt19937(42));

   // Search for a value in the array by using OpenMP and the Concurrency Runtime.

   const int what = 9114046;
   auto predicate = [what](int n) -> bool { 
      return (n == what);
   };

   wcout << L"Using OpenMP..." << endl;
   if (omp_parallel_any_of(a.begin(), a.end(), predicate))
   {
      wcout << what << L" is in the array." << endl;
   }
   else
   {
      wcout << what << L" is not in the array." << endl;
   }

   wcout << L"Using the Concurrency Runtime..." << endl;
   if (concrt_parallel_any_of(a.begin(), a.end(), predicate))
   {
      wcout << what << L" is in the array." << endl;
   }
   else
   {
      wcout << what << L" is not in the array." << endl;
   }
}

该示例产生下面的输出。

Using OpenMP...
9114046 is in the array.
Using the Concurrency Runtime...
9114046 is in the array.

在使用 OpenMP 的版本中,即使设置了标志,也会执行循环的所有迭代。 另外,如果任务具有任何子任务,则标志也必须可用于这些子任务以完成通信取消。 在并发运行时中,当取消任务组时,运行时会取消整个工作树,包括子任务。 Concurrency::parallel_for_each 算法使用任务执行工作。 因此,当循环的一个迭代取消根任务时,整个计算树也将被取消。 当取消工作树时,运行时不会开始新的任务。 但是,运行时允许已开始的任务完成。 因此,在执行 parallel_for_each 算法时,活动循环迭代可以清除它们的资源。

在本示例的两个版本中,如果数组包含多个要搜索的值的副本,则多个循环迭代可以各自同时设置结果并取消全部操作。 如果您的问题要求在满足条件时仅一个任务执行工作,则可以使用同步基元(例如临界区)。

您还可以使用异常处理取消使用 PPL 的任务。 有关取消操作的更多信息,请参见 PPL 中的取消操作

有关 parallel_for_each 和其他并行算法的更多信息,请参见并行算法

编译代码

复制示例代码并将其粘贴到 Visual Studio 项目中,或将其粘贴到名为 concrt-omp-parallel-any-of.cpp 的文件中,然后在 Visual Studio 2010 命令提示窗口中运行以下命令。

cl.exe /EHsc /openmp concrt-omp-parallel-any-of.cpp

请参见

概念

PPL 中的取消操作

并行算法

其他资源

从 OpenMP 迁移至并发运行时