如何:使用 parallel_invoke 来编写并行排序运行时

本文档介绍如何使用 parallel_invoke 算法提升 bitonic 排序算法的性能。 bitonic 排序算法以递归方式将输入序列分为较小的排序分区。 bitonic 排序算法可以并行方式运行,因为每个分区操作均独立于所有其他操作。

尽管 bitonic 排序是对输入序列所有组合进行排序的排序网络的一个示例,但此示例将对其长度为 2 的幂的序列进行排序。

备注

此示例使用并行排序图的实例。还可以使用内置排序PPL提供的算法的操作: concurrency::parallel_sortconcurrency::parallel_buffered_sortconcurrency::parallel_radixsort。有关更多信息,请参见 并行算法

各节内容

本文档介绍了以下任务:

  • 按顺序执行 Bitonic 排序

  • 使用 parallel_invoke 并行执行 bitonic 排序

按顺序执行 Bitonic 排序

下面的示例演示了 bitonic 排序算法的序列化版本。 bitonic_sort 函数将序列分成两个分区,以相反方向对这些分区进行排序,然后合并结果。 此函数以递归方式调用自身两次以对每个分区进行排序。

const bool INCREASING = true;
const bool DECREASING = false;

// Comparator function for the bitonic sort algorithm.
template <class T>
void compare(T* items, int i, int j, bool dir)
{
   if (dir == (items[i] > items[j]))
   {
      swap(items[i], items[j]);
   }
}

// Sorts a bitonic sequence in the specified order.
template <class T>
void bitonic_merge(T* items, int lo, int n, bool dir)
{
   if (n > 1)
   {
      int m = n / 2;
      for (int i = lo; i < lo + m; ++i)
      {
         compare(items, i, i + m, dir);
      }
      bitonic_merge(items, lo, m, dir);
      bitonic_merge(items, lo + m, m, dir);
   }
}

// Sorts the given sequence in the specified order.
template <class T>
void bitonic_sort(T* items, int lo, int n, bool dir)
{
   if (n > 1)
   {
      // Divide the array into two partitions and then sort 
      // the partitions in different directions.
      int m = n / 2;
      bitonic_sort(items, lo, m, INCREASING);
      bitonic_sort(items, lo + m, m, DECREASING);

      // Merge the results.
      bitonic_merge(items,lo, n, dir);
   }
}

// Sorts the given sequence in increasing order.
template <class T>
void bitonic_sort(T* items, int size)
{
    bitonic_sort(items, 0, size, INCREASING);
}

[顶级]

使用 parallel_invoke 并行执行 bitonic 排序

本节介绍如何使用 parallel_invoke 算法并行执行 bitonic 排序算法。

Dd728066.collapse_all(zh-cn,VS.110).gif过程

以并行方式执行 bitonic 排序算法

  1. 为头文件 ppl.h 添加 #include 指令。

    #include <ppl.h>
    
  2. concurrency命名空间添加 using 指令。

    using namespace concurrency;
    
  3. 创建名为 parallel_bitonic_mege 的新函数,此函数使用 parallel_invoke 算法在具有要处理的大量工作时并行合并序列。 否则,调用 bitonic_merge 以连续合并序列。

    // Sorts a bitonic sequence in the specified order.
    template <class T>
    void parallel_bitonic_merge(T* items, int lo, int n, bool dir)
    {   
       // Merge the sequences concurrently if there is sufficient work to do.
       if (n > 500)
       {
          int m = n / 2;
          for (int i = lo; i < lo + m; ++i)
          {
             compare(items, i, i + m, dir);
          }
    
          // Use the parallel_invoke algorithm to merge the sequences in parallel.
          parallel_invoke(
             [&items,lo,m,dir] { parallel_bitonic_merge(items, lo, m, dir); },
             [&items,lo,m,dir] { parallel_bitonic_merge(items, lo + m, m, dir); }
          );
       }
       // Otherwise, perform the work serially.
       else if (n > 1)
       {
          bitonic_merge(items, lo, n, dir);
       }   
    }
    
  4. 执行与前面步骤中的进程类似但却适用于 bitonic_sort 函数的进程。

    // Sorts the given sequence in the specified order.
    template <class T>
    void parallel_bitonic_sort(T* items, int lo, int n, bool dir)
    {   
       if (n > 1)
       {
          // Divide the array into two partitions and then sort 
          // the partitions in different directions.
          int m = n / 2;
    
          // Sort the partitions in parallel.
          parallel_invoke(
             [&items,lo,m] { parallel_bitonic_sort(items, lo, m, INCREASING); },
             [&items,lo,m] { parallel_bitonic_sort(items, lo + m, m, DECREASING); }
          );
    
          // Merge the results.
          parallel_bitonic_merge(items, lo, n, dir);
       }
    }
    
  5. 创建以递增的顺序排序数组的 parallel_bitonic_sort 函数的重载版本。

    // Sorts the given sequence in increasing order.
    template <class T>
    void parallel_bitonic_sort(T* items, int size)
    {
       parallel_bitonic_sort(items, 0, size, INCREASING);
    }
    

parallel_invoke 算法通过对调用上下文执行一系列任务中的最后一项任务来减少开销。 例如在 parallel_bitonic_sort 函数中,第一个任务在单独的上下文中运行,而第二个任务则在调用上下文中运行。

// Sort the partitions in parallel.
parallel_invoke(
   [&items,lo,m] { parallel_bitonic_sort(items, lo, m, INCREASING); },
   [&items,lo,m] { parallel_bitonic_sort(items, lo + m, m, DECREASING); }
);

下面的完整示例执行 bitonic 排序算法的序列化版本和并行版本。 此示例还会将执行每个计算所需的时间打印到控制台。

// parallel-bitonic-sort.cpp
// compile with: /EHsc
#include <windows.h>
#include <algorithm>
#include <iostream>
#include <random>
#include <ppl.h>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

const bool INCREASING = true;
const bool DECREASING = false;

// Comparator function for the bitonic sort algorithm.
template <class T>
void compare(T* items, int i, int j, bool dir)
{
   if (dir == (items[i] > items[j]))
   {
      swap(items[i], items[j]);
   }
}

// Sorts a bitonic sequence in the specified order.
template <class T>
void bitonic_merge(T* items, int lo, int n, bool dir)
{
   if (n > 1)
   {
      int m = n / 2;
      for (int i = lo; i < lo + m; ++i)
      {
         compare(items, i, i + m, dir);
      }
      bitonic_merge(items, lo, m, dir);
      bitonic_merge(items, lo + m, m, dir);
   }
}

// Sorts the given sequence in the specified order.
template <class T>
void bitonic_sort(T* items, int lo, int n, bool dir)
{
   if (n > 1)
   {
      // Divide the array into two partitions and then sort 
      // the partitions in different directions.
      int m = n / 2;
      bitonic_sort(items, lo, m, INCREASING);
      bitonic_sort(items, lo + m, m, DECREASING);

      // Merge the results.
      bitonic_merge(items,lo, n, dir);
   }
}

// Sorts the given sequence in increasing order.
template <class T>
void bitonic_sort(T* items, int size)
{
    bitonic_sort(items, 0, size, INCREASING);
}

// Sorts a bitonic sequence in the specified order.
template <class T>
void parallel_bitonic_merge(T* items, int lo, int n, bool dir)
{   
   // Merge the sequences concurrently if there is sufficient work to do.
   if (n > 500)
   {
      int m = n / 2;
      for (int i = lo; i < lo + m; ++i)
      {
         compare(items, i, i + m, dir);
      }

      // Use the parallel_invoke algorithm to merge the sequences in parallel.
      parallel_invoke(
         [&items,lo,m,dir] { parallel_bitonic_merge(items, lo, m, dir); },
         [&items,lo,m,dir] { parallel_bitonic_merge(items, lo + m, m, dir); }
      );
   }
   // Otherwise, perform the work serially.
   else if (n > 1)
   {
      bitonic_merge(items, lo, n, dir);
   }   
}

// Sorts the given sequence in the specified order.
template <class T>
void parallel_bitonic_sort(T* items, int lo, int n, bool dir)
{   
   if (n > 1)
   {
      // Divide the array into two partitions and then sort 
      // the partitions in different directions.
      int m = n / 2;

      // Sort the partitions in parallel.
      parallel_invoke(
         [&items,lo,m] { parallel_bitonic_sort(items, lo, m, INCREASING); },
         [&items,lo,m] { parallel_bitonic_sort(items, lo + m, m, DECREASING); }
      );

      // Merge the results.
      parallel_bitonic_merge(items, lo, n, dir);
   }
}

// Sorts the given sequence in increasing order.
template <class T>
void parallel_bitonic_sort(T* items, int size)
{
   parallel_bitonic_sort(items, 0, size, INCREASING);
}

int wmain()
{  
   // For this example, the size must be a power of two.
   const int size = 0x200000;

   // Create two large arrays and fill them with random values.
   int* a1 = new int[size];
   int* a2 = new int[size];

   mt19937 gen(42);
   for(int i = 0; i < size; ++i)
   {
      a1[i] = a2[i] = gen();
   }

   __int64 elapsed;

   // Perform the serial version of the sort.
   elapsed = time_call([&] { bitonic_sort(a1, size); });
   wcout << L"serial time: " << elapsed << endl;

   // Now perform the parallel version of the sort.
   elapsed = time_call([&] { parallel_bitonic_sort(a2, size); });
   wcout << L"parallel time: " << elapsed << endl;

   delete[] a1;
   delete[] a2;
}

下例是四处理器计算机的输出结果。

serial time: 4353
parallel time: 1248

[顶级]

编译代码

若要编译代码,请将其复制并粘贴到Visual Studio项目或一个名为 并行bitonic sort.cpp 然后运行在Visual Studio命令提示符窗口中运行以下命令的文件。

cl.exe /EHsc parallel-bitonic-sort.cpp

可靠编程

因为每个任务的生存期组不在函数外,扩展此示例使用 parallel_invoke 算法而不是 concurrency::task_group 选件类。 建议您尽可能使用 parallel_invoke,因为它的执行开销比 task group 对象少,因此可使您编写性能更佳的代码。

仅当有足够的工作要做时,某些算法的并行版本的性能才会较佳。 例如,如果序列中有 500 个或更少的元素,则 parallel_bitonic_merge 函数将调用序列化版本 bitonic_merge。 您还可以根据工作量计划整体的排序策略。 例如,如果数组包含的项目少于 500 个,则使用快速排序算法的串行版本可能更为高效,如下面的示例所示:

template <class T>
void quick_sort(T* items, int lo, int n)
{
   // TODO: The function body is omitted for brevity.
}

template <class T>
void parallel_bitonic_sort(T* items, int lo, int n, bool dir)
{
   // Use the serial quick sort algorithm if there are relatively few
   // items to sort. The associated overhead for running few tasks in 
   // parallel may not overcome the benefits of parallel processing.
   if (n - lo + 1 <= 500)
   {
      quick_sort(items, lo, n);
   }
   else if (n > 1)
   {
      // Divide the array into two partitions and then sort 
      // the partitions in different directions.
      int m = n / 2;

      // Sort the partitions in parallel.
      parallel_invoke(
         [&items,lo,m] { parallel_bitonic_sort(items, lo, m, INCREASING); },
         [&items,lo,m] { parallel_bitonic_sort(items, lo + m, m, DECREASING); }
      );

      // Merge the results.
      parallel_bitonic_merge(items, lo, n, dir);
   }
}

与使用任何并行算法一样,建议您根据需要分析和调整代码。

请参见

参考

parallel_invoke 函数

概念

任务并行(并发运行时)