任务并行(并发运行时)

在“并发运行时”中,任务 是执行特定工作且通常与其他任务并行运行的工作单元。 一个任务可以分解成更多且更细化的组织在任务组中的任务。

当您编写异步代码并需要在完成异步操作后发生某操作时,您可使用任务。 例如,可以使用一项任务异步读取文件,然后使用另一个任务 — 延续任务(本文档稍后将对其进行介绍)— 在数据变为可用后对其进行处理。 相反,您可以使用任务组将并行工作分解为更小的部分。 例如,假设您使用递归算法将剩余工时分成两部分。 您可使用任务组来同时运行这些分区,然后等待分工作完成。

提示

当您想要对集合中的每个元素并行应用相同的例程时,可使用并行算法,例如 concurrency::parallel_for,而不是任务或任务组。有关并行算法的更多信息,请参见并行算法

关键点

  • 如果通过引用将变量传递到 lambda 表达式,您必须保证此变量的生存期能够一直持续到任务完成。

  • 当您编写异步代码时使用任务(concurrency::task 类)。

  • 当您想把并行工作分解成较小的部分时,可使用任务组(concurrency::task_group 类或 concurrency::parallel_invoke 算法),然后等待这些较小的部分完成。

  • 使用 concurrency::task::then 方法可创建延续。 继续是其他任务完成后,以异步方式运行的任务。 可连接任意数量的连续来形成异步工作链。

  • 当前面的任务完成,甚至在前面的任务已取消或引发异常时,基于任务的继续始终计划进行执行。

  • 使用 concurrency::when_all 可创建一个任务,该任务在一组任务的每个成员完成后完成。 使用 concurrency::when_any 可创建一个任务,该任务在一组任务的一个成员完成后完成。

  • 任务和任务组可以参与并行模式库 (PPL) 取消机制。 有关详细信息,请参阅 PPL 中的取消操作

  • 若想要了解运行时如何处理任务组引发的异常,请查看 并发运行时中的异常处理

在本文档中

  • 使用 Lambda 表达式

  • task 类

  • 延续任务

  • 基于值与基于任务的继续

  • 组合任务

    • when_all 函数

    • when_any 函数

  • 延迟的任务执行

  • 任务组

  • task_group 与 structured_task_group 的比较

  • 示例

  • 可靠编程

使用 Lambda 表达式

由于语法简洁,lambda 表达式是一种用来定义由任务和任务组执行的工作的常用方法。 以下为若干用法提示:

  • 因为任务通常在背景线程上运行,所以您在 lambda 表达式中捕获变量时要注意对象生存期。 当您按值获取变量时,将在 lambda 主体中形成该变量的副本。 当您按引用获取时,不创建任何副本。 因此,请确保通过引用捕获的任何变量的生存期比使用变量的任务长。

  • 当您将 Lambda 表达式传递给任务时,请勿用引用捕获分配在堆栈上的变量。

  • 明确您在 lambda 表达式中捕获的变量,以便您可以识别您通过值或引用所获取的变量。 因此,我们不建议对 lambda 表达式采用 [=] 或 [&] 选项。

一个常见模式是继续链中的任务分配到一个变量,另一个任务读取该变量。 您无法按值获取,因为每个延续任务会拥有变量的不同副本。 对于堆栈分配的变量,由于变量可能不再有效,因此无法通过引用来获取。

若要解决此问题,请使用智能指针(例如 std::shared_ptr)包装该变量并通过值传递智能指针。 这样,基础对象可分配到任务中,从任务中读取,并活动得比使用它的任务长。 请使用此方法,即使该变量是 Windows 运行时对象的指针或引用计数句柄 (^)。 以下为基本示例:

// lambda-task-lifetime.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>
#include <string>

using namespace concurrency;
using namespace std;

task<wstring> write_to_string()
{
    // Create a shared pointer to a string that is  
    // assigned to and read by multiple tasks. 
    // By using a shared pointer, the string outlives 
    // the tasks, which can run in the background after 
    // this function exits.
    auto s = make_shared<wstring>(L"Value 1");

    return create_task([s] 
    {
        // Print the current value.
        wcout << L"Current value: " << *s << endl;
        // Assign to a new value.
        *s = L"Value 2";

    }).then([s] 
    {
        // Print the current value.
        wcout << L"Current value: " << *s << endl;
        // Assign to a new value and return the string.
        *s = L"Value 3";
        return *s;
    });
}

int wmain()
{
    // Create a chain of tasks that work with a string.
    auto t = write_to_string();

    // Wait for the tasks to finish and print the result.
    wcout << L"Final value: " << t.get() << endl;
}

/* Output:
    Current value: Value 1
    Current value: Value 2
    Final value: Value 3
*/

有关 lambda 表达式的更多信息,请参见 C++ 中的 Lambda 表达式

[Top]

task 类

您可使用 concurrency::task 类来将任务构成到一组依赖操作中。 此组合模型受“延续”概念支持。 当前面的任务完成时,继续使代码可执行。 前面的任务结果将作为输入传递给一个或多个延续任务。 当一个前面的任务完成时,在其后等待的继续任务将计划进行执行。 每个继续任务都收到了前面的任务结果的副本。 反过来,这些延续任务也可以是前面的任务的其他延续,从而创建任务链。 继续帮助您创建其中有特定依赖项的任意长度的任务链。 另外,任务在运行时可以在其启动前或以协作的方式参与取消。 有关此取消模型的更多信息,请参阅 PPL 中的取消操作

task 是模板类。 T 类型参数是任务生成的结果的类型。 如果任务不返回值,此类型可以是 void。 T 不能使用 const 修饰符。

创建任务后,您要提供执行任务正文的“工作函数”。 此工作函数是 Lambda 函数、函数指针或函数对象的形式。 若要等待任务完成而不获取结果,则调用 concurrency::task::wait 方法。 task::wait 方法返回 concurrency::task_status 值,该值描述了任务是否已完成或已取消。 若要获取任务的结果,请调用 concurrency::task::get 方法。 此方法可调用 task::wait 等待任务完成,从而阻止执行当前线程,直到结果可用。

以下示例演示如何创建任务,等待其结果并显示其值。 本文档中的示例使用 lambda 函数,因为它们的语法更为简洁。 但是,在您使用任务时,也可以使用函数指针和函数对象。

// basic-task.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Create a task.
    task<int> t([]()
    {
        return 42;
    });

    // In this example, you don't necessarily need to call wait() because 
    // the call to get() also waits for the result.
    t.wait();

    // Print the result.
    wcout << t.get() << endl;
}

/* Output:
    42
*/

当使用 concurrency::create_task 功能时,可以使用 auto 关键字,而不是声明该类型。 例如,请考虑创建并打印单位矩阵的代码:

// create-task.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <string>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

int wmain()
{
    task<array<array<int, 10>, 10>> create_identity_matrix([]
    {
        array<array<int, 10>, 10> matrix;
        int row = 0;
        for_each(begin(matrix), end(matrix), [&row](array<int, 10>& matrixRow) 
        {
            fill(begin(matrixRow), end(matrixRow), 0);
            matrixRow[row] = 1;
            row++;
        });
        return matrix;
    });

    auto print_matrix = create_identity_matrix.then([](array<array<int, 10>, 10> matrix)
    {
        for_each(begin(matrix), end(matrix), [](array<int, 10>& matrixRow) 
        {
            wstring comma;
            for_each(begin(matrixRow), end(matrixRow), [&comma](int n) 
            {
                wcout << comma << n;
                comma = L", ";
            });
            wcout << endl;
        });
    });

    print_matrix.wait();
}
/* Output:
    1, 0, 0, 0, 0, 0, 0, 0, 0, 0
    0, 1, 0, 0, 0, 0, 0, 0, 0, 0
    0, 0, 1, 0, 0, 0, 0, 0, 0, 0
    0, 0, 0, 1, 0, 0, 0, 0, 0, 0
    0, 0, 0, 0, 1, 0, 0, 0, 0, 0
    0, 0, 0, 0, 0, 1, 0, 0, 0, 0
    0, 0, 0, 0, 0, 0, 1, 0, 0, 0
    0, 0, 0, 0, 0, 0, 0, 1, 0, 0
    0, 0, 0, 0, 0, 0, 0, 0, 1, 0
    0, 0, 0, 0, 0, 0, 0, 0, 0, 1
*/

您可使用 create_task 函数来创建等效操作。

auto create_identity_matrix = create_task([]
{
    array<array<int, 10>, 10> matrix;
    int row = 0;
    for_each(begin(matrix), end(matrix), [&row](array<int, 10>& matrixRow) 
    {
        fill(begin(matrixRow), end(matrixRow), 0);
        matrixRow[row] = 1;
        row++;
    });
    return matrix;
});

如果执行任务时引发异常,运行时将把后续调用的异常封送到 task::gettask::wait 或基于任务的继续中。 有关该任务异常处理机制的更多信息,请参阅 并发运行时中的异常处理

关于使用 taskconcurrency::task_completion_event、取消的示例,请参阅 演练:使用任务和 XML HTTP 请求进行连接。(本文档稍后将会说明 task_completion_event 类。)

提示

若要了解特定于 Windows 应用商店 应用程序中的任务详细信息,请参见 Asynchronous programming in C++用 C++ 为 Windows 应用商店应用程序创建异步操作

[Top]

延续任务

在异步编程中,一个异步操作在完成时调用另一个操作并将数据传递到其中的情况非常常见。 传统上,此过程是通过使用回调方法完成的。 在“并发运行时”中,延续任务提供了相同功能。 延续任务(也简称为“延续”)是一个异步任务,由另一个任务(称为“前面的任务”)在完成时调用。 通过使用延续,您可以:

  • 将数据从前面的任务传递到延续。

  • 指定调用或不调用延续所依据的精确条件。

  • 在延续启动之前取消延续,或在延续正在运行时以协作方式取消延续。

  • 提供有关应如何安排延续的提示。(这仅适用于 Windows 应用商店 应用程序。) 有关更多信息,请参见 用 C++ 为 Windows 应用商店应用程序创建异步操作。)

  • 从同一前面的任务中调用多个延续。

  • 在多个前面的任务中的全部或任意任务完成时调用一个延续。

  • 将延续依次相连,形成任意长度。

  • 使用延续来处理前面的任务所引发的异常。

通过这些功能,您可以完成第一个任务后执行一个或多个任务。 例如,可以创建一个在第一个任务从磁盘中对其进行读取后将压缩文件的继续。

以下示例修改了以前的示例,进而使用 concurrency::task::then 方法计划某继续,该继续在可行的情况下,打印以前的任务。

// basic-continuation.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    auto t = create_task([]() -> int
    {
        return 42;
    });

    t.then([](int result)
    {
        wcout << result << endl;
    }).wait();

    // Alternatively, you can chain the tasks directly and 
    // eliminate the local variable. 
    /*create_task([]() -> int
    {
        return 42;
    }).then([](int result)
    {
        wcout << result << endl;
    }).wait();*/
}

/* Output:
    42
*/

可链锁和嵌套任务到任意长度。 一个任务还可以具有多个继续。 以下示例演示一基本延续链,该延续链将先前任务的价值增加了三倍。

// continuation-chain.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    auto t = create_task([]() -> int
    { 
        return 0;
    });

    // Create a lambda that increments its input value.
    auto increment = [](int n) { return n + 1; };

    // Run a chain of continuations and print the result. 
    int result = t.then(increment).then(increment).then(increment).get();
    wcout << result << endl;
}

/* Output:
    3
*/

继续也能返回另一个任务。 如果未取消,则在后续继续前执行该任务。 这项技术称作“异步展开”。 当您想在背景中执行额外工作但又不想让当前任务阻止当前线程时,异步展开就能派上用场。(Windows 应用商店 应用程序中,继续能在 UI 线程上运行,这很常见)。 以下示例显示了三个任务。 第一个任务返回在延续任务之前运行的另一个任务。

// async-unwrapping.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    auto t = create_task([]()
    {
        wcout << L"Task A" << endl;

        // Create an inner task that runs before any continuation 
        // of the outer task. 
        return create_task([]()
        {
            wcout << L"Task B" << endl;
        });
    });

    // Run and wait for a continuation of the outer task.
    t.then([]()
    {
        wcout << L"Task C" << endl;
    }).wait();
}

/* Output:
    Task A
    Task B
    Task C
*/

重要

当任务的继续返回 N 类型的嵌套任务时,结果任务具有类型 N 而非 task<N>,并在嵌套任务完成时完成。换言之,继续展开嵌套任务。

[Top]

基于值与基于任务的继续

给定返回类型为 T 的 task 对象,您可向其延续任务提供 Ttask<T> 类型的值。 采用类型 T 的继续称为基于值的继续。 当前面的任务无错的完成并且不会被取消时,基于值的继续计划进行执行。 采用类型 task<T> 作为参数的继续称为基于任务的继续。 当前面的任务完成,甚至在前面的任务已取消或引发异常时,基于任务的继续始终计划进行执行。 您可调用 task::get 以获取前面任务的结果。 如果前面的任务已取消,task::get 将引发 concurrency::task_canceled。 如果前面的任务引发了异常,task::get 将再次引发异常。 在前面的任务取消时,基于任务的继续未标记为已取消。

[Top]

组合任务

本节描述了 concurrency::when_allconcurrency::when_any 函数,这些函数有助于您撰写多个任务以实现常见模式。

when_all 函数

when_all 函数会生成一个任务,该任务可在完成一组任务之后完成。 此函数可返回一个 std::vector 对象,该对象包含集合中每个任务的结果。 以下基本示例使用 when_all 创建表示完成其他三个任务的任务。

// join-tasks.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <array>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Start multiple tasks. 
    array<task<void>, 3> tasks = 
    {
        create_task([] { wcout << L"Hello from taskA." << endl; }),
        create_task([] { wcout << L"Hello from taskB." << endl; }),
        create_task([] { wcout << L"Hello from taskC." << endl; })
    };

    auto joinTask = when_all(begin(tasks), end(tasks));

    // Print a message from the joining thread.
    wcout << L"Hello from the joining thread." << endl;

    // Wait for the tasks to finish.
    joinTask.wait();
}

/* Sample output:
    Hello from the joining thread.
    Hello from taskA.
    Hello from taskC.
    Hello from taskB.
*/

备注

传递给 when_all 的任务必须是统一的。换言之,它们必须都返回相同类型。

也可使用 && 语法来生成任务,此任务在一组任务完成后完成,如以下示例所示。

auto t = t1 && t2; // same as when_all

在一组任务完成后,通常会使用延续以及 when_all 以执行操作。 以下示例修改了前面的示例,进而打印三个任务的总和,每个任务生成 int 结果。

// Start multiple tasks. 
array<task<int>, 3> tasks =
{
    create_task([]() -> int { return 88; }),
    create_task([]() -> int { return 42; }),
    create_task([]() -> int { return 99; })
};

auto joinTask = when_all(begin(tasks), end(tasks)).then([](vector<int> results)
{
    wcout << L"The sum is " 
          << accumulate(begin(results), end(results), 0)
          << L'.' << endl;
});

// Print a message from the joining thread.
wcout << L"Hello from the joining thread." << endl;

// Wait for the tasks to finish.
joinTask.wait();

/* Output:
    Hello from the joining thread.
    The sum is 229.
*/

在此示例中,还可以指定 task<vector<int>> 以产生基于任务的延续。

如果取消任务组中的任意任务或引发异常,when_all 将立即完成,并且不等待剩余任务完成。 如果引发了异常,当您调用 when_all 返回的任务对象上的 task::gettask::wait 时,运行时将再次引发异常。 如果引发多个任务,运行时将选择其中之一。 因此,请确保完成所有任务后观察所有异常;未经处理的任务异常会导致应用程序终止。

以下为可用于确保程序发现所有异常的实用工具函数。 对于所提供的范围内的每个任务,observe_all_exceptions 将触发所有再次引发的异常,并将吞并该异常。

// Observes all exceptions that occurred in all tasks in the given range. 
template<class T, class InIt> 
void observe_all_exceptions(InIt first, InIt last) 
{
    std::for_each(first, last, [](concurrency::task<T> t)
    {
        t.then([](concurrency::task<T> previousTask)
        {
            try
            {
                previousTask.get();
            }
            // Although you could catch (...), this demonstrates how to catch specific exceptions. Your app 
            // might handle different exception types in different ways. 
            catch (Platform::Exception^)
            {
                // Swallow the exception.
            }
            catch (const std::exception&)
            {
                // Swallow the exception.
            }
        });
    });
}

考虑使用 C++ 和 XAML 并将一组文件写入磁盘的 Windows 应用商店 应用程序。 以下示例演示如何使用 when_allobserve_all_exceptions 确保程序观察所有异常。

// Writes content to files in the provided storage folder. 
// The first element in each pair is the file name. The second element holds the file contents.
task<void> MainPage::WriteFilesAsync(StorageFolder^ folder, const vector<pair<String^, String^>>& fileContents)
{
    // For each file, create a task chain that creates the file and then writes content to it. Then add the task chain to a vector of tasks.
    vector<task<void>> tasks;
    for (auto fileContent : fileContents)
    {
        auto fileName = fileContent.first;
        auto content = fileContent.second;

        // Create the file. The CreationCollisionOption::FailIfExists flag specifies to fail if the file already exists.
        tasks.emplace_back(create_task(folder->CreateFileAsync(fileName, CreationCollisionOption::FailIfExists)).then([content](StorageFile^ file)
        {
            // Write its contents. 
            return create_task(FileIO::WriteTextAsync(file, content));
        }));
    }

    // When all tasks finish, create a continuation task that observes any exceptions that occurred. 
    return when_all(begin(tasks), end(tasks)).then([tasks](task<void> previousTask)
    {
        task_status status = completed;
        try
        {
            status = previousTask.wait();
        }
        catch (COMException^ e)
        {
            // We'll handle the specific errors below.
        }
        // TODO: If other exception types might happen, add catch handlers here. 

        // Ensure that we observe all exceptions.
        observe_all_exceptions<void>(begin(tasks), end(tasks));

        // Cancel any continuations that occur after this task if any previous task was canceled. 
        // Although cancellation is not part of this example, we recommend this pattern for cases that do. 
        if (status == canceled)
        {
            cancel_current_task();
        }
    });
}

运行此示例

  1. 在 MainPage.xaml 中,添加 Button 控件。

    <Button x:Name="Button1" Click="Button_Click">Write files</Button>
    
  2. 在 MainPage.xaml.h 中,将这些前向声明添加到 MainPage 类声明中的 private 部分。

    void Button_Click(Platform::Object^ sender, Windows::UI::Xaml::RoutedEventArgs^ e);
    concurrency::task<void> WriteFilesAsync(Windows::Storage::StorageFolder^ folder, const std::vector<std::pair<Platform::String^, Platform::String^>>& fileContents);
    
  3. 在 MainPage.xaml.cpp 中,实现 Button_Click 事件处理程序。

    // A button click handler that demonstrates the scenario. 
    void MainPage::Button_Click(Object^ sender, RoutedEventArgs^ e)
    {
        // In this example, the same file name is specified two times. WriteFilesAsync fails if one of the files already exists.
        vector<pair<String^, String^>> fileContents;
        fileContents.emplace_back(make_pair(ref new String(L"file1.txt"), ref new String(L"Contents of file 1")));
        fileContents.emplace_back(make_pair(ref new String(L"file2.txt"), ref new String(L"Contents of file 2")));
        fileContents.emplace_back(make_pair(ref new String(L"file1.txt"), ref new String(L"Contents of file 3")));
    
        Button1->IsEnabled = false; // Disable the button during the operation.
        WriteFilesAsync(ApplicationData::Current->TemporaryFolder, fileContents).then([this](task<void> previousTask)
        {
            try
            {
                previousTask.get();
            }
            // Although cancellation is not part of this example, we recommend this pattern for cases that do. 
            catch (const task_canceled&)
            {
                // Your app might show a message to the user, or handle the error in some other way.
            }
    
            Button1->IsEnabled = true; // Enable the button.
        });
    }
    
  4. 在 MainPage.xaml.cpp 中,按示例所示实现 WriteFilesAsync

提示

when_all 是一种非阻塞函数,其生成结果为 task。不同于 task::wait,在 ASTA(应用程序 STA)线程上的 Windows 应用商店 应用程序中调用此函数是安全的。

[Top]

when_any 函数

when_any 函数会生成一个任务,该任务可在完成一组任务的第一个任务之后完成。 此函数可返回一个 std::pair 对象,该对象包含已完成任务的结果和集合中任务的索引。

when_any 函数在下列情境中尤其有用:

  • 冗余操作。 考虑可以用许多方式执行的算法或操作。 您可使用 when_any 函数来选择先完成的操作,然后取消剩下的操作。

  • 交错操作。 您可开始必须全部完成的多项操作,并使用 when_any 函数以在每项操作完成时处理结果。 在一个操作完成后,可以开始一个或多个其他任务。

  • 受限制的操作。 您可使用 when_any 函数来扩展以前的方案,方法是限制并发操作的数量。

  • 已过期的操作。 您可使用 when_any 函数在一个或多个任务之间进行选择,并选择在特定时间后完成的任务。

按照 when_all,通常会使用含有 when_any 的继续符去在任务集中的第一个任务完成时执行操作。 以下基本示例使用 when_any 创建在完成其他三个任务的第一个任务时完成的任务。

// select-task.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <array>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Start multiple tasks. 
    array<task<int>, 3> tasks = {
        create_task([]() -> int { return 88; }),
        create_task([]() -> int { return 42; }),
        create_task([]() -> int { return 99; })
    };

    // Select the first to finish.
    when_any(begin(tasks), end(tasks)).then([](pair<int, size_t> result)
    {
        wcout << "First task to finish returns "
              << result.first
              << L" and has index "
              << result.second
              << L'.' << endl;
    }).wait();
}

/* Sample output:
    First task to finish returns 42 and has index 1.
*/

在此示例中,还可以指定 task<pair<int, size_t>> 以产生基于任务的延续。

备注

按照 when_all,您传递给 when_any 的任务必须全部返回相同的类型。

也可使用 || 语法生成任务,此任务在一组任务中的第一个任务完成后完成,如以下示例所示。

auto t = t1 || t2; // same as when_any

提示

按照 when_allwhen_any 是非阻塞的,并且能安全地调用 ASTA 线程上的 Windows 应用商店 应用程序。

[Top]

延迟的任务执行

有时必须延迟任务的执行,直到满足条件,或启动任务以响应外部事件。 例如,在异步编程中,您可能必须启动任务以响应 I/O 完成事件。

完成这一点的两个方式是:使用继续或者启动一个任务然后在任务的工作函数中等待事件。 但是,有可能会出现无法使用以下方法之一的情况。 例如,要创建继续,必须要具备前面的任务。 但是,如果没有前面的任务,可在其可用时创建任务完成事件并随后将完成事件链接到前面的任务。 此外,因为正在等待的任务也会阻止线程,因此可以在异步操作完成时使用任务完成事件以执行工作,从而释放线程。

concurrency::task_completion_event 类有助于简化这样的任务组合。 与 task 类一样,类型参数 T 是由任务产生的结果类型。 如果任务不返回值,此类型可以是 void。 T 不能使用 const 修饰符。 通常,task_completion_event 会给线程或任务提供 task_completion_event 对象,在线程或任务可用时向其发送信号。 同时,一个或多个任务被设置为该事件的侦听器。 设置事件后,侦听器任务完成,它们的继续按计划运行。

有关使用 task_completion_event 来实现于延迟后完成的任务的示例,请参阅 如何:创建在延迟一段时间后完成的任务

[Top]

任务组

“任务组”可组织任务的集合。 任务组会将任务推入工作窃取队列。 计划程序从该队列中移除任务,并用可用的计算资源执行任务。 在将任务添加到任务组之后,您可以等待所有任务完成或取消尚未开始的任务。

PPL 使用 Concurrency::task_groupConcurrency::structured_task_group 类表示任务组,Concurrency::task_handle 类表示在这些任务组中运行的任务。 task_handle 类封装执行工作的代码。 与 task 类一样,工作函数是 Lambda 函数、函数指针或函数对象的形式。 通常不需要直接使用 task_handle 对象。 而是可以将工作函数传递给任务组,然后任务组会创建和管理 task_handle 对象。

PPL 将任务组分为以下两类:非结构化的任务组和结构化的任务组。 PPL 使用 task_group 类来表示非结构化任务组,使用 structured_task_group 类来表示结构化任务组。

重要

PPL 还定义 Concurrency::parallel_invoke 算法,后者使用 structured_task_group 类并行执行一组任务。由于 parallel_invoke 算法的语法比较简单,因此建议您尽可能使用该算法而非 structured_task_group 类。主题并行算法更加详细地介绍了parallel_invoke

当您需要同时执行多个独立任务并且必须等待所有任务完成才能继续时,可使用 parallel_invoke。 该项技术通常称为“分叉和联接”并行。 当您需要同时执行多个独立任务但希望稍后再等待任务完成时,可使用 task_group。 例如,您可以将任务添加到 task_group 对象中,然后在另一个函数中或从另一个线程等待任务完成。

任务组支持取消这一概念。 取消允许您向要取消整体操作的所有活动任务发送信号。 取消还阻止尚未开始的任务开始执行。 有关取消操作的更多信息,请参见 PPL 中的取消操作

运行时还提供异常处理模型,以允许您从任务引发异常并在等待关联任务组完成时处理该异常。 有关该异常处理模型的更多信息,请参见并发运行时中的异常处理

[Top]

task_group 与 structured_task_group 的比较

尽管建议您使用 task_groupparallel_invoke 替代 structured_task_group 类,但是在某些情况下您可能还是希望使用 structured_task_group,例如在编写执行可变数量任务或需要取消支持的算法的情况下。 本节说明 task_groupstructured_task_group 类之间的区别。

task_group 类是线程安全的。 因此,您可以通过多个线程将任务添加到 task_group 对象中并等待,或者通过多个线程取消 task_group 对象。 structured_task_group 对象的构造和析构必须使用相同的词法范围。 另外,对 structured_task_group 对象执行的所有操作必须在同一线程上进行。 此规则的例外情况是 concurrency::structured_task_group::cancelconcurrency::structured_task_group::is_canceling 方法。 子任务随时可以调用这些方法来取消父任务组或检查取消。

您可以在调用 concurrency::task_group::waitconcurrency::task_group::run_and_wait 方法之后,在 task_group 对象上运行其他任务。 相反,在调用 concurrency::structured_task_group::waitconcurrency:: structured_task_group::run_and_wait 方法之后,运行 structured_task_group 对象上的其他任务,则该行为不确定。

因为 structured_task_group 类不能跨线程进行同步,所以其执行开销比 task_group 类要小。 因此,如果您的问题不要求您利用多个线程计划工作并且您无法使用 parallel_invoke 算法,则 structured_task_group 类可以帮助您编写性能更佳的代码。

如果您在某个 structured_task_group 对象内使用另一个 structured_task_group 对象,则在外部对象完成之前,必须先完成并销毁内部对象。 task_group 类不要求在外部组完成之前先完成嵌套的任务组。

非结构化任务组和结构化任务组以不同方式使用任务句柄。 您可以直接将工作函数传递给 task_group 对象;task_group 对象将为您创建和管理任务句柄。 structured_task_group 类要求您管理每个任务的 task_handle 对象。 每个 task_handle 对象必须在其关联的 structured_task_group 对象的整个生命周期内保持有效。 可使用 concurrency::make_task 函数创建 task_handle 对象,如下面的基本示例所示:

// make-task-structure.cpp 
// compile with: /EHsc
#include <ppl.h>

using namespace concurrency;

int wmain()
{
   // Use the make_task function to define several tasks.
   auto task1 = make_task([] { /*TODO: Define the task body.*/ });
   auto task2 = make_task([] { /*TODO: Define the task body.*/ });
   auto task3 = make_task([] { /*TODO: Define the task body.*/ });

   // Create a structured task group and run the tasks concurrently.

   structured_task_group tasks;

   tasks.run(task1);
   tasks.run(task2);
   tasks.run_and_wait(task3);
}

若要在具有可变数量任务的情况下管理任务句柄,请使用堆栈分配例程(例如 _malloca)或容器类(例如 std::vector)。

task_groupstructured_task_group 都支持取消操作。 有关取消操作的更多信息,请参见 PPL 中的取消操作

[Top]

示例

下面的基本示例演示如何使用任务组。 本示例使用 parallel_invoke 算法并发执行两个任务。 每个任务都将子任务添加到 task_group 对象中。 请注意,task_group 类允许多个任务并发向其添加任务。

// using-task-groups.cpp 
// compile with: /EHsc
#include <ppl.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// Prints a message to the console. 
template<typename T>
void print_message(T t)
{
   wstringstream ss;
   ss << L"Message from task: " << t << endl;
   wcout << ss.str(); 
}

int wmain()
{  
   // A task_group object that can be used from multiple threads.
   task_group tasks;

   // Concurrently add several tasks to the task_group object.
   parallel_invoke(
      [&] {
         // Add a few tasks to the task_group object.
         tasks.run([] { print_message(L"Hello"); });
         tasks.run([] { print_message(42); });
      },
      [&] {
         // Add one additional task to the task_group object.
         tasks.run([] { print_message(3.14); });
      }
   );

   // Wait for all tasks to finish.
   tasks.wait();
}

此示例的示例输出如下:

  

因为 parallel_invoke 算法并发运行任务,所以输出消息的顺序可能会有所不同。

有关演示如何使用 parallel_invoke 算法的完整示例,请参见如何:使用 parallel_invoke 来编写并行排序运行时如何:使用 parallel_invoke 来执行并行操作。 有关使用 task_group 类实现异步功能的完整示例,请参见演练:实现 Future

[Top]

可靠编程

请确保在使用任务、任务组和并行算法时您已了解取消和异常处理的作用。 例如,在并行工作树中,取消的任务阻止子任务运行。 如果某个子任务执行的操作对应用程序很重要(例如释放资源),则这可能引发问题。 另外,如果子任务引发异常,该异常可能在对象析构函数中传播并导致您的应用程序出现未定义的行为。 有关说明这些问题的示例,请参阅并行模式库文档中的最佳实践中的了解取消和异常处理如何影响对象销毁部分。 有关 PPL 中取消和异常处理模型的更多信息,请参见PPL 中的取消操作并发运行时中的异常处理

[Top]

相关主题

标题

描述

如何:使用 parallel_invoke 来编写并行排序运行时

演示如何使用 parallel_invoke 算法来提高 bitonic 排序算法的性能。

如何:使用 parallel_invoke 来执行并行操作

演示如何使用 parallel_invoke 算法来提高对共享数据源执行多项操作的程序的性能。

如何:创建在延迟一段时间后完成的任务

演示如何使用 taskcancellation_token_sourcecancellation_tokentask_completion_event 类,以便创建延迟后完成的任务。

演练:实现 Future

演示如何将并发运行时中的现有功能合并为某些效用更大的功能。

并行模式库 (PPL)

描述了为开发并发应用程序提供命令性编程模型的 PPL。

引用

task 类(并发运行时)

task_completion_event 类

when_all 函数

when_any 函数

task_group 类

parallel_invoke 函数

structured_task_group 类