任务并行(并发运行时)

在并发运行时中,任务是执行特定作业并通常与其他任务并行运行的工作单元。 任务可以分解为组织成任务组的其他更细化的任务。

编写异步代码,并希望在异步操作完成之后进行某种操作时,可使用任务。 例如,可以使用一个任务以异步方式从文件读取,然后使用另一个任务(延续任务,本文档稍后会对此进行说明)在数据可用之后处理数据。 相反,可以使用任务组将并行工作分解成较小的各部分。 例如,假设你有一个将剩余工作划分为两个分区的递归算法。 可以使用任务组并发运行这两个分区,然后等待划分的工作完成。

提示

要将相同例程并行应用于集合的每个元素时,可使用并行算法(如 concurrency::parallel_for),而不是任务或任务组。 有关并行算法的详细信息,请参阅并行算法

要点

  • 通过引用将变量传递到 Lambda 表达式时,必须保证该变量的生存期在任务完成之前一直保持。

  • 编写异步代码时可使用任务(concurrency::task 类)。 任务类使用 Windows 线程池作为其计划程序中,而不是并发运行时。

  • 要将并行工作分解成较小的各部分,然后等待这些较小部分完成时,可使用任务组(concurrency::task_group 类或 concurrency::parallel_invoke 算法)。

  • 使用 concurrency::task::then 方法可创建延续。 延续是在另一个任务完成之后异步运行的任务。 可以连接任意数量的延续以形成异步工作链。

  • 基于任务的延续始终计划为在先行任务完成时执行,甚至是在先行任务取消或引发异常时执行。

  • 使用 concurrency::when_all 可创建在任务集的所有成员都完成之后完成的任务。 使用 concurrency::when_any 可创建在任务集的一个成员完成之后完成的任务。

  • 任务和任务组可以参与并行模式库 (PPL) 取消机制。 有关详细信息,请参阅 PPL 中的取消操作

  • 若要了解运行时如何处理由任务和任务组引发的异常,请参阅异常处理

在本文档中

使用 Lambda 表达式

由于其语法简洁,因此 lambda 表达式是定义由任务和任务组执行的工作的常用方法。 下面是一些使用提示:

  • 因为任务通常在后台线程上运行,所以在 Lambda 表达式中捕获变量时请注意对象生存期。 如果通过值捕获变量,则会在 lambda 体中创建该变量的副本。 通过引用捕获时,不创建副本。 因此,请确保通过引用捕获的任何变量的生存期长于使用它的任务。

  • Lambda 表达式将 lambda 表达式传递给任务时,不捕获通过引用在堆栈上分配的变量。

  • 应明确在 lambda 表达式中捕获的变量,以便可以确定通过值与通过引用捕获的内容。 因此我们建议不要将 [=][&] 选项用于 lambda 表达式。

一种常用模式是将延续链中的一个任务分配给变量,而另一个任务读取该变量。 无法通过值捕获,因为每个延续任务会保存变量的不同副本。 对于堆栈分配的变量,也无法通过引用捕获,因为变量可能不再有效。

若要解决此问题,请使用智能指针(如 std::shared_ptr)来包装变量,并通过值传递智能指针。 这样,基础对象便可以进行分配和读取,并且生存期会长于使用它的任务。 即使在变量是 Windows 运行时对象的指针或引用计数的句柄 (^) 时,也可使用此技术。 下面是一个基本示例:

// lambda-task-lifetime.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>
#include <string>

using namespace concurrency;
using namespace std;

task<wstring> write_to_string()
{
    // Create a shared pointer to a string that is 
    // assigned to and read by multiple tasks.
    // By using a shared pointer, the string outlives
    // the tasks, which can run in the background after
    // this function exits.
    auto s = make_shared<wstring>(L"Value 1");

    return create_task([s] 
    {
        // Print the current value.
        wcout << L"Current value: " << *s << endl;
        // Assign to a new value.
        *s = L"Value 2";

    }).then([s] 
    {
        // Print the current value.
        wcout << L"Current value: " << *s << endl;
        // Assign to a new value and return the string.
        *s = L"Value 3";
        return *s;
    });
}

int wmain()
{
    // Create a chain of tasks that work with a string.
    auto t = write_to_string();

    // Wait for the tasks to finish and print the result.
    wcout << L"Final value: " << t.get() << endl;
}

/* Output:
    Current value: Value 1
    Current value: Value 2
    Final value: Value 3
*/

有关 lambda 表达式的详细信息,请参阅 Lambda 表达式

task 类

可以使用 concurrency::task 类将任务组合成相关操作集。 此组合模型通过延续来支持。 延续使代码可以在前面(或先行)任务完成时执行。 先行任务的结果会作为输入传递给一个或多个延续任务。 先行任务完成时,在等待它的所有延续任务都计划进行执行。 每个延续任务都会收到先行任务结果的副本。 这些延续任务进而也可能是其他延续的先行任务,从而形成任务链。 延续可帮助创建任务间具有特定依赖关系的任意长度的任务链。 此外,任务还可以在其他任务开始之前或是在其他任务运行期间以协作方式参与取消。 有关此取消模型的详细信息,请参阅PPL 中的取消操作

task 是模板类。 类型参数 T 是由任务生成的结果的类型。 如果任务不返回值,则此类型可以是 voidT 不能使用 const 修饰符。

创建任务时,需提供执行任务体的工作函数。 此工作函数采用 lambda 函数、函数指针或函数对象的形式。 若要等待任务完成而不获取结果,请调用 concurrency::task::wait 方法。 task::wait 方法会返回一个 concurrency::task_status 值,该值描述任务是已完成还是已取消。 若要获取任务的结果,请调用 concurrency::task::get 方法。 此方法调用 task::wait 以等待任务完成,因此会在结果可用之前阻止当前线程的执行。

下面的示例演示如何创建任务、等待其结果并显示其值。 本文档中的示例使用 lambda 函数,因为它们提供更简洁的语法。 不过你也可以在使用任务时使用函数指针和函数对象。

// basic-task.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Create a task.
    task<int> t([]()
    {
        return 42;
    });

    // In this example, you don't necessarily need to call wait() because
    // the call to get() also waits for the result.
    t.wait();

    // Print the result.
    wcout << t.get() << endl;
}

/* Output:
    42
*/

使用 concurrency::create_task 函数时,可以使用 auto 关键字而不是声明类型。 例如,请考虑创建和打印单位矩阵的以下代码:

// create-task.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <string>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

int wmain()
{
    task<array<array<int, 10>, 10>> create_identity_matrix([]
    {
        array<array<int, 10>, 10> matrix;
        int row = 0;
        for_each(begin(matrix), end(matrix), [&row](array<int, 10>& matrixRow) 
        {
            fill(begin(matrixRow), end(matrixRow), 0);
            matrixRow[row] = 1;
            row++;
        });
        return matrix;
    });

    auto print_matrix = create_identity_matrix.then([](array<array<int, 10>, 10> matrix)
    {
        for_each(begin(matrix), end(matrix), [](array<int, 10>& matrixRow) 
        {
            wstring comma;
            for_each(begin(matrixRow), end(matrixRow), [&comma](int n) 
            {
                wcout << comma << n;
                comma = L", ";
            });
            wcout << endl;
        });
    });

    print_matrix.wait();
}
/* Output:
    1, 0, 0, 0, 0, 0, 0, 0, 0, 0
    0, 1, 0, 0, 0, 0, 0, 0, 0, 0
    0, 0, 1, 0, 0, 0, 0, 0, 0, 0
    0, 0, 0, 1, 0, 0, 0, 0, 0, 0
    0, 0, 0, 0, 1, 0, 0, 0, 0, 0
    0, 0, 0, 0, 0, 1, 0, 0, 0, 0
    0, 0, 0, 0, 0, 0, 1, 0, 0, 0
    0, 0, 0, 0, 0, 0, 0, 1, 0, 0
    0, 0, 0, 0, 0, 0, 0, 0, 1, 0
    0, 0, 0, 0, 0, 0, 0, 0, 0, 1
*/

可以使用 create_task 函数创建等效操作。

auto create_identity_matrix = create_task([]
{
    array<array<int, 10>, 10> matrix;
    int row = 0;
    for_each(begin(matrix), end(matrix), [&row](array<int, 10>& matrixRow) 
    {
        fill(begin(matrixRow), end(matrixRow), 0);
        matrixRow[row] = 1;
        row++;
    });
    return matrix;
});

如果在任务执行期间引发异常,则运行时会在对 task::gettask::wait 或是基于任务的延续的后续调用中封送该异常。 有关任务异常处理机制的详细信息,请参阅异常处理

有关使用 taskconcurrency::task_completion_event、取消的示例,请参阅演练:使用任务和 XML HTTP 请求进行连接。 (本文档稍后会介绍 task_completion_event 类。)

提示

若要了解特定于 UWP 应用中的任务的详细信息,请参阅 C++ 中的异步编程用 C++ 为 UWP 应用创建异步操作

延续任务

在异步编程中,一个异步操作在完成时调用另一个操作并将数据传递到其中的情况非常常见。 传统上,这使用回调方法来完成。 在并发运行时中,延续任务提供了同样的功能。 延续任务(也简称为“延续”)是一个异步任务,由另一个任务(称为先行)在完成时调用。 使用延续可以:

  • 将数据从前面的任务传递到延续。

  • 指定调用或不调用延续所依据的精确条件。

  • 在延续启动之前取消延续,或在延续正在运行时以协作方式取消延续。

  • 提供有关应如何计划延续的提示。 (这仅适用于通用 Windows 平台 (UWP) 应用。 有关详细信息,请参阅用 C++ 为 UWP 应用创建异步操作。)

  • 从同一前面的任务中调用多个延续。

  • 在多个先行任务中的全部或任意任务完成时调用一个延续。

  • 将延续依次相连,形成任意长度。

  • 使用延续来处理先行引发的异常。

这些功能使你可以在第一个任务完成时执行一个或多个任务。 例如,可以创建在第一个任务从磁盘读取文件之后压缩文件的延续。

下面的示例将上面的示例修改为使用 concurrency::task::then 方法来计划在先行任务的值可用时打印该值的延续。

// basic-continuation.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    auto t = create_task([]() -> int
    {
        return 42;
    });

    t.then([](int result)
    {
        wcout << result << endl;
    }).wait();

    // Alternatively, you can chain the tasks directly and
    // eliminate the local variable.
    /*create_task([]() -> int
    {
        return 42;
    }).then([](int result)
    {
        wcout << result << endl;
    }).wait();*/
}

/* Output:
    42
*/

可以按任意长度链接和嵌套任务。 一个任务还可以具有多个延续。 下面的示例演示将上一个任务的值增加三倍的基本延续链。

// continuation-chain.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    auto t = create_task([]() -> int
    { 
        return 0;
    });
    
    // Create a lambda that increments its input value.
    auto increment = [](int n) { return n + 1; };

    // Run a chain of continuations and print the result.
    int result = t.then(increment).then(increment).then(increment).get();
    wcout << result << endl;
}

/* Output:
    3
*/

延续还可以返回另一个任务。 如果没有取消,则此任务会在后续延续之前执行。 此技术称为异步解包。 要在后台执行其他工作,但不想当前任务阻止当前线程时,异步解包会很有用。 (这在 UWP 应用中很常见,其中延续可以在 UI 线程上运行)。 下面的示例演示三个任务。 第一个任务返回在延续任务之前运行的另一个任务。

// async-unwrapping.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    auto t = create_task([]()
    {
        wcout << L"Task A" << endl;

        // Create an inner task that runs before any continuation
        // of the outer task.
        return create_task([]()
        {
            wcout << L"Task B" << endl;
        });
    });
  
    // Run and wait for a continuation of the outer task.
    t.then([]()
    {
        wcout << L"Task C" << endl;
    }).wait();
}

/* Output:
    Task A
    Task B
    Task C
*/

重要

当任务的延续返回 N 类型的嵌套任务时,生成的任务具有 N 类型(而不是 task<N>),会在嵌套任务完成时完成。 换句话说,延续会执行嵌套任务的解包。

基于值的延续与基于任务的延续

对于其返回类型是 Ttask 对象,可以向其延续任务提供 Ttask<T> 类型的值。 采用类型 T 的延续称为基于值的延续。 基于值的延续计划在先行任务完成而未出现错误并且未取消时执行。 采用类型 task<T> 作为其参数的延续称为基于任务的延续。 基于任务的延续始终计划为在先行任务完成时执行,甚至是在先行任务取消或引发异常时执行。 随后然后调用 task::get 以获取先行任务的结果。 如果先行任务已取消,则 task::get 会引发 concurrency::task_canceled。 如果先行任务引发了异常,则 task::get 会再次引发该异常。 基于任务的延续在先行任务取消时不会标记为已取消。

组合任务

此部分介绍 concurrency::when_allconcurrency::when_any 函数,它们可以帮助组合多个任务以实现常用模式。

when_all 函数

when_all 函数生成在任务集完成之后完成的任务。 此函数返回 std::vector 对象,其中包含集中每个任务的结果。 下面的基本示例使用 when_all 创建一个表示三个其他任务完成的任务。

// join-tasks.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <array>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Start multiple tasks.
    array<task<void>, 3> tasks = 
    {
        create_task([] { wcout << L"Hello from taskA." << endl; }),
        create_task([] { wcout << L"Hello from taskB." << endl; }),
        create_task([] { wcout << L"Hello from taskC." << endl; })
    };

    auto joinTask = when_all(begin(tasks), end(tasks));

    // Print a message from the joining thread.
    wcout << L"Hello from the joining thread." << endl;

    // Wait for the tasks to finish.
    joinTask.wait();
}

/* Sample output:
    Hello from the joining thread.
    Hello from taskA.
    Hello from taskC.
    Hello from taskB.
*/

注意

传递给 when_all 的任务必须统一。 换句话说,它们必须全部返回相同类型。

还可以使用 && 语法生成在任务集完成之后完成的任务,如下面的示例所示。

auto t = t1 && t2; // same as when_all

可将延续与 when_all 结合使用以在任务集完成之后执行操作,这十分常见。 下面的示例将上面的示例修改为打印各自生成 int 结果的三个任务的总和。

// Start multiple tasks.
array<task<int>, 3> tasks =
{
    create_task([]() -> int { return 88; }),
    create_task([]() -> int { return 42; }),
    create_task([]() -> int { return 99; })
};

auto joinTask = when_all(begin(tasks), end(tasks)).then([](vector<int> results)
{
    wcout << L"The sum is " 
          << accumulate(begin(results), end(results), 0)
          << L'.' << endl;
});

// Print a message from the joining thread.
wcout << L"Hello from the joining thread." << endl;

// Wait for the tasks to finish.
joinTask.wait();

/* Output:
    Hello from the joining thread.
    The sum is 229.
*/

在此示例中,还可以指定 task<vector<int>> 以生成基于任务的延续。

如果任务集中的任何任务取消或引发异常,则 when_all 会立即完成,不等待其余任务完成。 如果引发异常,则运行时会在你对 when_all 返回的任务对象调用 task::gettask::wait 时再次引发异常。 如果有多个任务引发,则运行时会选择其中之一。 因此,请确保在所有任务完成之后观察到所有异常;未经处理的任务异常会导致应用终止。

下面是可以用于确保程序观察到所有异常的实用工具函数。 对于处于提供的范围内的每个任务,observe_all_exceptions 会触发再次引发的任何异常,然后会吞并该异常。

// Observes all exceptions that occurred in all tasks in the given range.
template<class T, class InIt> 
void observe_all_exceptions(InIt first, InIt last) 
{
    std::for_each(first, last, [](concurrency::task<T> t)
    {
        t.then([](concurrency::task<T> previousTask)
        {
            try
            {
                previousTask.get();
            }
            // Although you could catch (...), this demonstrates how to catch specific exceptions. Your app
            // might handle different exception types in different ways.
            catch (Platform::Exception^)
            {
                // Swallow the exception.
            }
            catch (const std::exception&)
            {
                // Swallow the exception.
            }
        });
    });
}

请考虑一个使用 C++ 和 XAML 并将文件集写入磁盘的 UWP 应用。 下面的示例演示如何使用 when_allobserve_all_exceptions 确保该程序观察到所有异常。

// Writes content to files in the provided storage folder.
// The first element in each pair is the file name. The second element holds the file contents.
task<void> MainPage::WriteFilesAsync(StorageFolder^ folder, const vector<pair<String^, String^>>& fileContents)
{
    // For each file, create a task chain that creates the file and then writes content to it. Then add the task chain to a vector of tasks.
    vector<task<void>> tasks;
    for (auto fileContent : fileContents)
    {
        auto fileName = fileContent.first;
        auto content = fileContent.second;

        // Create the file. The CreationCollisionOption::FailIfExists flag specifies to fail if the file already exists.
        tasks.emplace_back(create_task(folder->CreateFileAsync(fileName, CreationCollisionOption::FailIfExists)).then([content](StorageFile^ file)
        {
            // Write its contents.
            return create_task(FileIO::WriteTextAsync(file, content));
        }));
    }

    // When all tasks finish, create a continuation task that observes any exceptions that occurred.
    return when_all(begin(tasks), end(tasks)).then([tasks](task<void> previousTask)
    {
        task_status status = completed;
        try
        {
            status = previousTask.wait();
        }
        catch (COMException^ e)
        {
            // We'll handle the specific errors below.
        }
        // TODO: If other exception types might happen, add catch handlers here.

        // Ensure that we observe all exceptions.
        observe_all_exceptions<void>(begin(tasks), end(tasks));

        // Cancel any continuations that occur after this task if any previous task was canceled.
        // Although cancellation is not part of this example, we recommend this pattern for cases that do.
        if (status == canceled)
        {
            cancel_current_task();
        }
    });
}
运行此示例
  1. 在 MainPage.xaml 中,添加一个 Button 控件。
<Button x:Name="Button1" Click="Button_Click">Write files</Button>
  1. 在 MainPage.xaml.h 中,将这些前向声明添加到 MainPage 类声明的 private 节。
void Button_Click(Platform::Object^ sender, Windows::UI::Xaml::RoutedEventArgs^ e);
concurrency::task<void> WriteFilesAsync(Windows::Storage::StorageFolder^ folder, const std::vector<std::pair<Platform::String^, Platform::String^>>& fileContents);
  1. 在 MainPage.xaml.cpp 中,实现 Button_Click 事件处理程序。
// A button click handler that demonstrates the scenario.
void MainPage::Button_Click(Object^ sender, RoutedEventArgs^ e)
{
    // In this example, the same file name is specified two times. WriteFilesAsync fails if one of the files already exists.
    vector<pair<String^, String^>> fileContents;
    fileContents.emplace_back(make_pair(ref new String(L"file1.txt"), ref new String(L"Contents of file 1")));
    fileContents.emplace_back(make_pair(ref new String(L"file2.txt"), ref new String(L"Contents of file 2")));
    fileContents.emplace_back(make_pair(ref new String(L"file1.txt"), ref new String(L"Contents of file 3")));

    Button1->IsEnabled = false; // Disable the button during the operation.
    WriteFilesAsync(ApplicationData::Current->TemporaryFolder, fileContents).then([this](task<void> previousTask)
    {
        try
        {
            previousTask.get();
        }
        // Although cancellation is not part of this example, we recommend this pattern for cases that do.
        catch (const task_canceled&)
        {
            // Your app might show a message to the user, or handle the error in some other way.
        }

        Button1->IsEnabled = true; // Enable the button.
    });
}
  1. 在 MainPage.xaml.cpp 中,实现 WriteFilesAsync,如示例所示。

提示

when_all 是生成 task 作为其结果的的非阻止函数。 与 task::wait 不同,可以安全地在 UWP 应用中在 ASTA(应用程序 STA)线程上调用此函数。

when_any 函数

when_any 函数生成在任务集中的第一个任务完成时完成的任务。 此函数返回 std::pair 对象,其中包含已完成任务的结果和该任务在集中的索引。

when_any 函数在以下情境中尤其有用:

  • 冗余运算。 请考虑可以用多种方式执行的算法或运算。 你可使用 when_any 函数来选择先完成的运算,然后取消剩余的运算。

  • 交叉运算。 你可启动必须全部完成的多项运算,并使用 when_any 函数在每项运算完成时处理结果。 在一项运算完成后,可以启动一个或多个其他任务。

  • 受限制的运算。 你可使用 when_any 函数通过限制并发运算的数量来扩展前面的情境。

  • 过期的运算。 你可使用 when_any 函数在一个或多个任务与特定时间后完成的任务间进行选择。

when_all 一样,可使用具有 when_any 的延续以在任务集中的第一个任务完成时执行操作,这十分常见。 下面的基本示例使用 when_any 创建一个在三个其他任务中的第一个任务完成时完成的任务。

// select-task.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <array>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Start multiple tasks.
    array<task<int>, 3> tasks = {
        create_task([]() -> int { return 88; }),
        create_task([]() -> int { return 42; }),
        create_task([]() -> int { return 99; })
    };

    // Select the first to finish.
    when_any(begin(tasks), end(tasks)).then([](pair<int, size_t> result)
    {
        wcout << "First task to finish returns "
              << result.first
              << L" and has index "
              << result.second
              << L'.' << endl;
    }).wait();
}

/* Sample output:
    First task to finish returns 42 and has index 1.
*/

在此示例中,还可以指定 task<pair<int, size_t>> 以生成基于任务的延续。

注意

when_all 一样,传递给 when_any 的任务必须返回相同类型。

还可以使用 || 语法生成在任务集中的第一个任务完成之后完成的任务,如下面的示例所示。

auto t = t1 || t2; // same as when_any

提示

when_all 一样,when_any 是非阻止的,可以安全地在 UWP 应用中在 ASTA 线程上进行调用。

延迟的任务执行

有时需要延迟任务的执行,直到满足条件,或开始一项任务来响应外部事件。 例如,在异步编程中,可能必须启动任务以响应 I/O 完成事件。

实现此目的的两种方法是使用延续,或启动任务并在任务的工作函数内等待某个事件。 但是在某些情况下,无法使用以上方法之一。 例如,若要创建延续,必须具有先行任务。 但是,如果没有先行任务,则可以创建任务完成事件,稍后在先行任务可用时将该完成事件链接到先行任务。 此外,因为正在等待的任务也会阻止线程,所以可以使用任务完成事件在异步操作完成时执行工作,从而释放线程。

concurrency::task_completion_event 类可帮助简化这种任务组合。 与 task 类一样,类型参数 T 是由任务生成的结果的类型。 如果任务不返回值,则此类型可以是 voidT 不能使用 const 修饰符。 通常会将 task_completion_event 对象提供给在它的值可用时向它发信号的线程或任务。 同时,一个或多个任务设置为该事件的侦听器。 设置事件时,侦听器任务完成,其延续会计划运行。

有关使用 task_completion_event 实现在延迟一段时间后完成的任务的示例,请参阅如何:创建在延迟一段时间后完成的任务

任务组

任务组可组织任务的集合。 任务组会将任务推送到工作窃取队列。 计划程序从此队列中删除任务,然后在可用计算资源上执行它们。 将任务添加到任务组之后,可以等待所有任务完成或取消尚未开始的任务。

PPL 使用 concurrency::task_groupconcurrency::structured_task_group 类表示任务组,使用 concurrency::task_handle 类表示在这些组中运行的任务。 task_handle 类封装执行工作的代码。 与 task 类一样,工作函数采用 lambda 函数、函数指针或函数对象的形式。 通常不需要直接使用 task_handle 对象。 而是将工作函数传递给任务组,然后任务组会创建和管理 task_handle 对象。

PPL 将任务组划分为这两个类别:非结构化任务组结构化任务组。 PPL 使用 task_group 类表示非结构化任务组,使用 structured_task_group 类表示结构化任务组。

重要

PPL 还定义了 concurrency::parallel_invoke 算法,该算法使用 structured_task_group 类并行执行任务集。 因为 parallel_invoke 算法具有更简洁的语法,所以我们建议尽可能使用它而不是 structured_task_group 类。 主题并行算法更加详细地介绍了 parallel_invoke

当你具有要同时执行的多个独立任务,并且你必须等待所有任务完成才能继续时,可使用 parallel_invoke。 此方法通常称为分叉和联接并行。 当你具有要同时执行的多个独立任务,但是要在以后等待任务完成时,可使用 task_group。 例如,可以将任务添加到 task_group 对象,并在另一个函数中或从另一个线程等待任务完成。

任务组支持取消概念。 取消使你可以向所有活动任务发出信号,表示你要取消整个操作。 取消还可以阻止尚未开始的任务开始。 有关取消的详细信息,请参阅 PPL 中的取消操作

运行时还提供了一个异常处理模型,使你可以从任务引发异常,并在等待关联任务组完成时处理该异常。 有关此异常处理模型的详细信息,请参阅异常处理

比较 task_group 与 structured_task_group

虽然我们建议你使用 task_groupparallel_invoke 而不是 structured_task_group 类,不过有一些你要使用 structured_task_group 的情况,例如当你编写执行可变数量的任务或需要取消支持的并行算法时。 此部分介绍 task_groupstructured_task_group 类之间的差异。

task_group 类是线程安全的。 因此,可以从多个线程将任务添加到 task_group 对象,然后从多个线程等待或取消 task_group 对象。 structured_task_group 对象的构造和析构必须在相同词法范围内进行。 此外,对 structured_task_group 对象执行的所有操作必须在相同线程上进行。 此规则的例外是 concurrency::structured_task_group::cancelconcurrency::structured_task_group::is_canceling 方法。 子任务可以随时调用这些方法以取消父任务组或检查是否存在取消。

可以在调用 concurrency::task_group::waitconcurrency::task_group::run_and_wait 方法之后对 task_group 对象运行其他任务。 相反,如果在调用 concurrency::structured_task_group::waitconcurrency::structured_task_group::run_and_wait 方法之后对 structured_task_group 对象运行其他任务,则行为不明确。

因为 structured_task_group 类不会在线程间同步,所以它的执行开销比 task_group 类更少。 因此,如果你的问题不需要从多个线程计划工作并且你无法使用 parallel_invoke 算法,则 structured_task_group 类可帮助你编写更好的执行代码。

如果在另一个 structured_task_group 对象内使用一个 structured_task_group 对象,则内部对象必须在外部对象完成之前完成并销毁。 task_group 类不要求嵌套任务组在外部组完成之前完成。

非结构化任务组和结构化任务组以不同方式处理任务句柄。 可以将工作函数直接传递给 task_group 对象;task_group 对象会为你创建并管理任务句柄。 structured_task_group 类要求你为每个任务管理 task_handle 对象。 每个 task_handle 对象必须在其关联 structured_task_group 对象的整个生存期内保持有效。 使用 concurrency::make_task 函数可创建 task_handle 对象,如下面的基本示例所示:

// make-task-structure.cpp
// compile with: /EHsc
#include <ppl.h>

using namespace concurrency;

int wmain()
{
   // Use the make_task function to define several tasks.
   auto task1 = make_task([] { /*TODO: Define the task body.*/ });
   auto task2 = make_task([] { /*TODO: Define the task body.*/ });
   auto task3 = make_task([] { /*TODO: Define the task body.*/ });

   // Create a structured task group and run the tasks concurrently.

   structured_task_group tasks;

   tasks.run(task1);
   tasks.run(task2);
   tasks.run_and_wait(task3);
}

若要在具有可变数量任务的情况下管理任务句柄,请使用堆栈分配例程(如 _malloca)或容器类(如 std::vector)。

task_groupstructured_task_group 都支持取消。 有关取消的详细信息,请参阅 PPL 中的取消操作

示例

下面的基本示例演示如何使用任务组。 此示例使用 parallel_invoke 算法并发执行两个任务。 每个任务都将子任务添加到一个 task_group 对象。 请注意,task_group 类允许多个任务同时向它添加任务。

// using-task-groups.cpp
// compile with: /EHsc
#include <ppl.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// Prints a message to the console.
template<typename T>
void print_message(T t)
{
   wstringstream ss;
   ss << L"Message from task: " << t << endl;
   wcout << ss.str(); 
}

int wmain()
{  
   // A task_group object that can be used from multiple threads.
   task_group tasks;

   // Concurrently add several tasks to the task_group object.
   parallel_invoke(
      [&] {
         // Add a few tasks to the task_group object.
         tasks.run([] { print_message(L"Hello"); });
         tasks.run([] { print_message(42); });
      },
      [&] {
         // Add one additional task to the task_group object.
         tasks.run([] { print_message(3.14); });
      }
   );

   // Wait for all tasks to finish.
   tasks.wait();
}

以下是此示例的示例输出:

Message from task: Hello
Message from task: 3.14
Message from task: 42

因为 parallel_invoke 算法并发运行任务,所以输出消息的顺序可能会有所不同。

有关演示如何使用 parallel_invoke 算法的完整示例,请参阅如何:使用 parallel_invoke 来编写并行排序例程如何:使用 parallel_invoke 来执行并行操作。 有关使用 task_group 类实现异步 Future 的完整示例,请参阅演练:实现 Future

可靠编程

请确保了解取消和异常处理在使用任务、任务组和并行算法时的角色。 例如,在并行工作树中,取消的任务会阻止子任务运行。 如果一个子任务执行的操作对于应用程序很重要(如释放资源),则这可能会导致问题。 此外,如果子任务引发异常,则该异常可以通过对象析构函数进行传播,在应用程序中导致不明确的行为。 有关说明这些要点的示例,请参阅并行模式库文档中“最佳做法”中的了解取消和异常处理如何影响对象析构部分。 有关 PPL 中的取消和异常处理模型的更多信息,请参阅取消操作异常处理

Title 说明
如何:使用 parallel_invoke 来编写并行排序例程 演示如何使用 parallel_invoke 算法提高双调排序算法的性能。
如何:使用 parallel_invoke 来执行并行操作 演示如何使用 parallel_invoke 算法提高对共享数据源执行多项操作的程序的性能。
如何:创建在延迟一段时间后完成的任务 演示如何使用 taskcancellation_token_sourcecancellation_tokentask_completion_event类创建在延迟一段时间后完成的任务。
演练:实现 Future 演示如何在并发运行时中将现有功能合并为执行更多操作的功能。
并行模式库 (PPL) 介绍 PPL,它提供用于开发并发应用程序的命令式编程模型。

参考

task 类(并发运行时)

task_completion_event 类

when_all 函数

when_any 函数

task_group 类

parallel_invoke 函数

structured_task_group 类