如何:使用 parallel_invoke 来编写并行排序例程

本文档介绍如何使用 parallel_invoke 算法来提高双调排序算法的性能。 双调排序算法以递归方式将输入序列划分为较小的排序分区。 双调排序算法可以并行运行,因为每个分区操作都独立于所有其他操作。

虽然双调排序是对输入序列的所有组合进行排序的排序网络的一个示例,但此示例对长度为 2 的幂的序列进行排序。

注意

为了便于说明,此示例使用并行排序例程。 还可以使用 PPL 提供的内置排序算法:concurrency::parallel_sortconcurrency::parallel_buffered_sortconcurrency::parallel_radixsort。 有关详细信息,请参阅并行算法

章节

本文档介绍以下任务:

串行执行双调排序

以下示例显示了双调排序算法的串行版本。 bitonic_sort 函数将序列划分为两个分区,对这些分区进行相反方向的排序,然后合并结果。 此函数以递归方式调用自身两次以对每个分区进行排序。

const bool INCREASING = true;
const bool DECREASING = false;

// Comparator function for the bitonic sort algorithm.
template <class T>
void compare(T* items, int i, int j, bool dir)
{
   if (dir == (items[i] > items[j]))
   {
      swap(items[i], items[j]);
   }
}

// Sorts a bitonic sequence in the specified order.
template <class T>
void bitonic_merge(T* items, int lo, int n, bool dir)
{
   if (n > 1)
   {
      int m = n / 2;
      for (int i = lo; i < lo + m; ++i)
      {
         compare(items, i, i + m, dir);
      }
      bitonic_merge(items, lo, m, dir);
      bitonic_merge(items, lo + m, m, dir);
   }
}

// Sorts the given sequence in the specified order.
template <class T>
void bitonic_sort(T* items, int lo, int n, bool dir)
{
   if (n > 1)
   {
      // Divide the array into two partitions and then sort 
      // the partitions in different directions.
      int m = n / 2;
      bitonic_sort(items, lo, m, INCREASING);
      bitonic_sort(items, lo + m, m, DECREASING);

      // Merge the results.
      bitonic_merge(items,lo, n, dir);
   }
}

// Sorts the given sequence in increasing order.
template <class T>
void bitonic_sort(T* items, int size)
{
    bitonic_sort(items, 0, size, INCREASING);
}

[返回页首]

使用 parallel_invoke 并行执行双调排序

本节介绍如何使用 parallel_invoke 算法并行执行双调排序算法。

并行执行双调排序算法

  1. 为头文件 ppl.h 添加 #include 指令。

    #include <ppl.h>
    
  2. concurrency 命名空间添加 using 指令。

    using namespace concurrency;
    
  3. 创建一个名为 parallel_bitonic_mege 的新函数,该函数使用 parallel_invoke 算法在有足够的工作量的情况下并行合并序列。 否则,调用 bitonic_merge 以串行方式合并序列。

    // Sorts a bitonic sequence in the specified order.
    template <class T>
    void parallel_bitonic_merge(T* items, int lo, int n, bool dir)
    {   
       // Merge the sequences concurrently if there is sufficient work to do.
       if (n > 500)
       {
          int m = n / 2;
          for (int i = lo; i < lo + m; ++i)
          {
             compare(items, i, i + m, dir);
          }
    
          // Use the parallel_invoke algorithm to merge the sequences in parallel.
          parallel_invoke(
             [&items,lo,m,dir] { parallel_bitonic_merge(items, lo, m, dir); },
             [&items,lo,m,dir] { parallel_bitonic_merge(items, lo + m, m, dir); }
          );
       }
       // Otherwise, perform the work serially.
       else if (n > 1)
       {
          bitonic_merge(items, lo, n, dir);
       }   
    }
    
  4. 但对 bitonic_sort 函数执行类似于上一步中的进程。

    // Sorts the given sequence in the specified order.
    template <class T>
    void parallel_bitonic_sort(T* items, int lo, int n, bool dir)
    {   
       if (n > 1)
       {
          // Divide the array into two partitions and then sort 
          // the partitions in different directions.
          int m = n / 2;
    
          // Sort the partitions in parallel.
          parallel_invoke(
             [&items,lo,m] { parallel_bitonic_sort(items, lo, m, INCREASING); },
             [&items,lo,m] { parallel_bitonic_sort(items, lo + m, m, DECREASING); }
          );
          
          // Merge the results.
          parallel_bitonic_merge(items, lo, n, dir);
       }
    }
    
  5. 创建按升序对数组进行排序的 parallel_bitonic_sort 函数的重载版本。

    // Sorts the given sequence in increasing order.
    template <class T>
    void parallel_bitonic_sort(T* items, int size)
    {
       parallel_bitonic_sort(items, 0, size, INCREASING);
    }
    

    parallel_invoke 算法通过在调用上下文中执行最后一系列任务来减少开销。 例如,在 parallel_bitonic_sort 函数中,第一个任务在单独的上下文中运行,第二个任务在调用上下文中运行。

    // Sort the partitions in parallel.
    parallel_invoke(
       [&items,lo,m] { parallel_bitonic_sort(items, lo, m, INCREASING); },
       [&items,lo,m] { parallel_bitonic_sort(items, lo + m, m, DECREASING); }
    );
    

以下完整示例执行双调排序算法的串行和并行版本。 示例还控制台输出了进行每种计算所需的时间。

// parallel-bitonic-sort.cpp
// compile with: /EHsc
#include <windows.h>
#include <algorithm>
#include <iostream>
#include <random>
#include <ppl.h>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

const bool INCREASING = true;
const bool DECREASING = false;

// Comparator function for the bitonic sort algorithm.
template <class T>
void compare(T* items, int i, int j, bool dir)
{
   if (dir == (items[i] > items[j]))
   {
      swap(items[i], items[j]);
   }
}

// Sorts a bitonic sequence in the specified order.
template <class T>
void bitonic_merge(T* items, int lo, int n, bool dir)
{
   if (n > 1)
   {
      int m = n / 2;
      for (int i = lo; i < lo + m; ++i)
      {
         compare(items, i, i + m, dir);
      }
      bitonic_merge(items, lo, m, dir);
      bitonic_merge(items, lo + m, m, dir);
   }
}

// Sorts the given sequence in the specified order.
template <class T>
void bitonic_sort(T* items, int lo, int n, bool dir)
{
   if (n > 1)
   {
      // Divide the array into two partitions and then sort 
      // the partitions in different directions.
      int m = n / 2;
      bitonic_sort(items, lo, m, INCREASING);
      bitonic_sort(items, lo + m, m, DECREASING);

      // Merge the results.
      bitonic_merge(items,lo, n, dir);
   }
}

// Sorts the given sequence in increasing order.
template <class T>
void bitonic_sort(T* items, int size)
{
    bitonic_sort(items, 0, size, INCREASING);
}

// Sorts a bitonic sequence in the specified order.
template <class T>
void parallel_bitonic_merge(T* items, int lo, int n, bool dir)
{   
   // Merge the sequences concurrently if there is sufficient work to do.
   if (n > 500)
   {
      int m = n / 2;
      for (int i = lo; i < lo + m; ++i)
      {
         compare(items, i, i + m, dir);
      }

      // Use the parallel_invoke algorithm to merge the sequences in parallel.
      parallel_invoke(
         [&items,lo,m,dir] { parallel_bitonic_merge(items, lo, m, dir); },
         [&items,lo,m,dir] { parallel_bitonic_merge(items, lo + m, m, dir); }
      );
   }
   // Otherwise, perform the work serially.
   else if (n > 1)
   {
      bitonic_merge(items, lo, n, dir);
   }   
}

// Sorts the given sequence in the specified order.
template <class T>
void parallel_bitonic_sort(T* items, int lo, int n, bool dir)
{   
   if (n > 1)
   {
      // Divide the array into two partitions and then sort 
      // the partitions in different directions.
      int m = n / 2;

      // Sort the partitions in parallel.
      parallel_invoke(
         [&items,lo,m] { parallel_bitonic_sort(items, lo, m, INCREASING); },
         [&items,lo,m] { parallel_bitonic_sort(items, lo + m, m, DECREASING); }
      );
      
      // Merge the results.
      parallel_bitonic_merge(items, lo, n, dir);
   }
}

// Sorts the given sequence in increasing order.
template <class T>
void parallel_bitonic_sort(T* items, int size)
{
   parallel_bitonic_sort(items, 0, size, INCREASING);
}

int wmain()
{  
   // For this example, the size must be a power of two.
   const int size = 0x200000;

   // Create two large arrays and fill them with random values.
   int* a1 = new int[size];
   int* a2 = new int[size];

   mt19937 gen(42);
   for(int i = 0; i < size; ++i)
   {
      a1[i] = a2[i] = gen();
   }

   __int64 elapsed;
   
   // Perform the serial version of the sort.
   elapsed = time_call([&] { bitonic_sort(a1, size); });
   wcout << L"serial time: " << elapsed << endl;

   // Now perform the parallel version of the sort.
   elapsed = time_call([&] { parallel_bitonic_sort(a2, size); });
   wcout << L"parallel time: " << elapsed << endl;
 
   delete[] a1;
   delete[] a2;
}

以下是具有四个处理器的计算机的输出示例。

serial time: 4353
parallel time: 1248

[返回页首]

编译代码

若要编译代码,请复制代码并将其粘贴到 Visual Studio 项目中,或粘贴到一个名为 parallel-bitonic-sort.cpp 的文件中,然后在 Visual Studio 命令提示符窗口中运行以下命令。

cl.exe /EHsc parallel-bitonic-sort.cpp

可靠编程

此示例使用 parallel_invoke 算法而不是 concurrency::task_group 类,因为每个任务组的生存期均未超出函数的生存期。 建议尽可能使用 parallel_invoke,因为它的执行开销比 task group 对象少,因此可以编写更好的执行代码。

仅当有足够的工作要做时,某些算法的并行版本才能更好地执行。 例如,如果序列中有 500 个或更少的元素,parallel_bitonic_merge 函数会调用串行版本 bitonic_merge。 还可以根据工作量规划总体排序策略。 例如,如果数组包含少于 500 个项目,则使用快速排序算法的串行版本可能更有效,如以下示例所示:

template <class T>
void quick_sort(T* items, int lo, int n)
{
   // TODO: The function body is omitted for brevity.
}

template <class T>
void parallel_bitonic_sort(T* items, int lo, int n, bool dir)
{
   // Use the serial quick sort algorithm if there are relatively few
   // items to sort. The associated overhead for running few tasks in 
   // parallel may not overcome the benefits of parallel processing.
   if (n - lo + 1 <= 500)
   {
      quick_sort(items, lo, n);
   }
   else if (n > 1)
   {
      // Divide the array into two partitions and then sort 
      // the partitions in different directions.
      int m = n / 2;

      // Sort the partitions in parallel.
      parallel_invoke(
         [&items,lo,m] { parallel_bitonic_sort(items, lo, m, INCREASING); },
         [&items,lo,m] { parallel_bitonic_sort(items, lo + m, m, DECREASING); }
      );
      
      // Merge the results.
      parallel_bitonic_merge(items, lo, n, dir);
   }
}

与任何并行算法一样,建议根据需要分析并优化代码。

另请参阅

任务并行
parallel_invoke 函数