任务并行(任务并行库)

更新:2011 年 3 月

顾名思义,任务并行库 (TPL) 基于任务的概念。 术语“任务并行”是指一个或多个独立的任务同时运行。 任务表示异步操作,在某些方面它类似于创建新线程或 ThreadPool 工作项,但抽象级别较高。 任务提供两个主要好处:

  • 系统资源的使用效率更高,可伸缩性更好。

    在后台,任务排队到 ThreadPool,ThreadPool 已使用登山等算法进行增强,这些算法能够确定并调整到可最大化吞吐量的线程数。 这会使任务相对轻量,您可以创建很多任务以启用细化并行。 为了补偿这一点,可使用众所周知的工作窃取算法提供负载平衡。

  • 对于线程或工作项,可以使用更多的编程控件。

    任务和围绕它们生成的框架提供了一组丰富的 API,这些 API 支持等待、取消、继续、可靠的异常处理、详细状态、自定义计划等功能。

出于这两个原因,在 .NET Framework 4 中,任务是用于编写多线程、异步和并行代码的首选 API。

隐式创建和运行任务

Parallel.Invoke 方法提供了一种简便方式,可同时运行任意数量的任意语句。 只需为每个工作项传入 Action 委托即可。 创建这些委托的最简单方式是使用 lambda 表达式。 lambda 表达式可调用指定的方法,或提供内联代码。 下面的示例演示一个基本的 Invoke 调用,该调用创建并启动同时运行的两个任务。

注意注意

本文档使用 lambda 表达式在 TPL 中定义委托。如果您不熟悉 C# 或 Visual Basic 中的 lambda 表达式,请参见 在 PLINQ 和 TPL 中的 Lambda 表达式

Parallel.Invoke(Sub() DoSomeWork(), Sub() DoSomeOtherWork())
Parallel.Invoke(() => DoSomeWork(), () => DoSomeOtherWork());
注意注意

Invoke 在后台创建的 Task 实例数不一定与所提供的委托数相等。TPL 可能会使用各种优化,特别是对于大量的委托。

有关更多信息,请参见如何:使用 Parallel.Invoke 来执行并行操作

为了更好地控制任务执行或从任务返回值,必须更加显式地使用 Task 对象。

显式创建和运行任务

任务由 System.Threading.Tasks.Task 类表示。 返回值的任务由 System.Threading.Tasks.Task<TResult> 类表示,该类从 Task 继承。 任务对象处理基础结构详细信息,并提供可在任务的整个生存期内从调用线程访问的方法和属性。 例如,可以随时访问任务的 Status 属性,以确定它是已开始运行、已完成运行、已取消还是引发了异常。 状态由 TaskStatus 枚举表示。

在创建任务时,您赋予它一个用户委托,该委托封装该任务将执行的代码。 该委托可以表示为命名的委托、匿名方法或 lambda 表达式。 lambda 表达式可以包含对命名方法的调用,如下面的示例所示。

        ' Create a task and supply a user delegate by using a lambda expression.
        Dim taskA = New Task(Sub() Console.WriteLine("Hello from taskA."))

        ' Start the task.
        taskA.Start()

        ' Output a message from the joining thread.
        Console.WriteLine("Hello from the joining thread.")

        ' Output:
        ' Hello from the joining thread.
        ' Hello from taskA. 

            // Create a task and supply a user delegate by using a lambda expression.
            var taskA = new Task(() => Console.WriteLine("Hello from taskA."));

            // Start the task.
            taskA.Start();

            // Output a message from the joining thread.
            Console.WriteLine("Hello from the calling thread.");


            /* Output:
             * Hello from the joining thread.
             * Hello from taskA. 
             */

您还可以使用 StartNew 方法在一个操作中创建并启动任务。 如果不必将创建和计划分开,则这是创建和启动任务的首选方法,如下面的示例所示

' Better: Create and start the task in one operation.
Dim taskA = Task.Factory.StartNew(Sub() Console.WriteLine("Hello from taskA."))

' Output a message from the joining thread.
Console.WriteLine("Hello from the joining thread.")
// Create and start the task in one operation.
var taskA = Task.Factory.StartNew(() => Console.WriteLine("Hello from taskA."));

// Output a message from the joining thread.
Console.WriteLine("Hello from the joining thread.");

任务公开静态 Factory 属性,该属性返回 TaskFactory 的默认实例,以便您可以通过 Task.Factory.StartNew(…) 的形式调用方法。 此外,在此示例中,由于任务的类型为 System.Threading.Tasks.Task<TResult>,因此每个任务都具有包含计算结果的公共 Result 属性。 任务以异步方式运行,可以按任意顺序完成。 如果在计算完成之前访问 Result,则该属性将阻止线程,直到值可用为止。

Dim taskArray() = {Task(Of Double).Factory.StartNew(Function() DoComputation1()),
                   Task(Of Double).Factory.StartNew(Function() DoComputation2()),
                   Task(Of Double).Factory.StartNew(Function() DoComputation3())}


Dim results() As Double
ReDim results(taskArray.Length)
For i As Integer = 0 To taskArray.Length
    results(i) = taskArray(i).Result
Next
Task<double>[] taskArray = new Task<double>[]
   {
       Task<double>.Factory.StartNew(() => DoComputation1()),

       // May be written more conveniently like this:
       Task.Factory.StartNew(() => DoComputation2()),
       Task.Factory.StartNew(() => DoComputation3())                
   };

double[] results = new double[taskArray.Length];
for (int i = 0; i < taskArray.Length; i++)
    results[i] = taskArray[i].Result;

有关更多信息,请参见如何:从任务中返回值

使用 lambda 表达式创建任务的委托时,您有权访问源代码中在该点可见的所有变量。 然而,在某些情况下,特别是在循环中,lambda 不按照您预期的方式捕获变量。 它仅捕获最终值,而不是它每次迭代后更改的值。 通过使用构造函数向任务提供状态对象,可以访问每次迭代的值,如下面的示例所示:


    Class MyCustomData

        Public CreationTime As Long
        Public Name As Integer
        Public ThreadNum As Integer
    End Class

    Sub TaskDemo2()
        ' Create the task object by using an Action(Of Object) to pass in custom data
        ' in the Task constructor. This is useful when you need to capture outer variables
        ' from within a loop. 
        ' As an experiement, try modifying this code to capture i directly in the lamda,
        ' and compare results.
        Dim taskArray() As Task
        ReDim taskArray(10)
        For i As Integer = 0 To taskArray.Length - 1
            taskArray(i) = New Task(Sub(obj As Object)
                                        Dim mydata = CType(obj, MyCustomData)
                                        mydata.ThreadNum = Thread.CurrentThread.ManagedThreadId
                                        Console.WriteLine("Hello from Task #{0} created at {1} running on thread #{2}.",
                                                          mydata.Name, mydata.CreationTime, mydata.ThreadNum)
                                    End Sub,
            New MyCustomData With {.Name = i, .CreationTime = DateTime.Now.Ticks}
            )
            taskArray(i).Start()
        Next

    End Sub


       class MyCustomData
       {
        public long CreationTime;
        public int Name;
        public int ThreadNum;
        }

    void TaskDemo2()
    {
        // Create the task object by using an Action(Of Object) to pass in custom data
        // in the Task constructor. This is useful when you need to capture outer variables
        // from within a loop. As an experiement, try modifying this code to 
        // capture i directly in the lambda, and compare results.
        Task[] taskArray = new Task[10];

        for(int i = 0; i < taskArray.Length; i++)
        {
            taskArray[i] = new Task((obj) =>
                {
                                        MyCustomData mydata = (MyCustomData) obj;
                                        mydata.ThreadNum = Thread.CurrentThread.ManagedThreadId;
                                        Console.WriteLine("Hello from Task #{0} created at {1} running on thread #{2}.",
                                                          mydata.Name, mydata.CreationTime, mydata.ThreadNum)
                },
            new MyCustomData () {Name = i, CreationTime = DateTime.Now.Ticks}
            );
            taskArray[i].Start();
        }
    }

此状态作为参数传递给任务委托,并且可通过使用 AsyncState 属性从任务对象访问。 另外,在某些情况下,通过构造函数传递数据可以获得少量性能改进。

任务 ID

每个任务都具有一个在应用程序域中唯一标识它的整数 ID,可以使用 Id 属性访问该 ID。 该 ID 用于在 Visual Studio 调试器的**“并行堆栈”“并行任务”**窗口中查看任务信息。 该 ID 是惰式创建的,这意味着它不会在被请求之前创建;因此每次运行该程序时,任务可能具有不同的 ID。 有关在调试器中查看任务 ID 的更多信息,请参见使用“并行堆栈”窗口

任务创建选项

创建任务的大多数 API 提供接受 TaskCreationOptions 参数的重载。 通过指定下列选项之一,可指示任务计划程序如何在线程池中计划任务。 下表列出了各种任务创建选项。

元素

说明

None

未指定任何选项时的默认选项。 计划程序将使用其默认试探法来计划任务。

PreferFairness

指定应当计划任务,以使越早创建的任务将更可能越早执行,而越晚创建的任务将更可能越晚执行。

LongRunning

指定该任务表示长时间运行的操作。

AttachedToParent

指定应将任务创建为当前任务(如果存在)的附加子级。 有关更多信息,请参见嵌套任务和子任务

可以通过按位 OR 运算组合选项。 下面的示例演示一个具有 LongRunningPreferFairness 选项的任务。


Dim task3 = New Task(Sub() MyLongRunningMethod(),
                        TaskCreationOptions.LongRunning Or TaskCreationOptions.PreferFairness)
task3.Start()
var task3 = new Task(() => MyLongRunningMethod(),
                    TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness);
task3.Start();

创建任务延续

使用 Task.ContinueWith 方法和 Task<TResult>.ContinueWith 方法,可以指定在前面的任务完成时要启动的任务。 延续任务的委托中将传入对前面的任务的引用,以便它可以检查其状态。 此外,可以在 Result 属性中将用户定义的值从前面的任务传递到其延续任务,以便前面的任务的输出可以作为延续任务的输入。 在下面的示例中,程序代码启动 getData,然后在 getData 完成时自动启动 analyzeData,在 analyzeData 完成时启动 reportData。 getData 将其结果生成为字节数组,该数组传递到 analyzeData。 analyzeData 处理该数组并返回结果,该结果的类型推断自 Analyze 方法的返回类型。 reportData 采用来自 analyzeData 的输入,并产生结果,该结果的类型以类似方式推断,且可用于 Result 属性中的程序。

        Dim getData As Task(Of Byte()) = New Task(Of Byte())(Function() GetFileData())
        Dim analyzeData As Task(Of Double()) = getData.ContinueWith(Function(x) Analyze(x.Result))
        Dim reportData As Task(Of String) = analyzeData.ContinueWith(Function(y As Task(Of Double)) Summarize(y.Result))

        getData.Start()

        System.IO.File.WriteAllText("C:\reportFolder\report.txt", reportData.Result)

            Task<byte[]> getData = new Task<byte[]>(() => GetFileData());
            Task<double[]> analyzeData = getData.ContinueWith(x => Analyze(x.Result));
            Task<string> reportData = analyzeData.ContinueWith(y => Summarize(y.Result));

            getData.Start();

            //or...
            Task<string> reportData2 = Task.Factory.StartNew(() => GetFileData())
                                        .ContinueWith((x) => Analyze(x.Result))
                                        .ContinueWith((y) => Summarize(y.Result));

            System.IO.File.WriteAllText(@"C:\reportFolder\report.txt", reportData.Result);



使用 ContinueWhenAllContinueWhenAny 方法,可以从多个任务继续。 有关更多信息,请参见延续任务如何:用延续将多个任务链接在一起

创建分离的嵌套任务

如果在任务中运行的用户代码创建一个新任务,且未指定 AttachedToParent 选项,则该新任务不采用任何特殊方式与外部任务同步。 此类任务称为“分离的嵌套任务”。 下面的示例演示一个任务,该任务创建一个分离的嵌套任务。

Dim outer = Task.Factory.StartNew(Sub()
                                      Console.WriteLine("Outer task beginning.")
                                      Dim child = Task.Factory.StartNew(Sub()
                                                                            Thread.SpinWait(5000000)
                                                                            Console.WriteLine("Detached task completed.")
                                                                        End Sub)
                                  End Sub)
outer.Wait()
Console.WriteLine("Outer task completed.")

' Output:
'     Outer task beginning.
'     Outer task completed.
'    Detached child completed.
            var outer = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("Outer task beginning.");

                var child = Task.Factory.StartNew(() =>
                {
                    Thread.SpinWait(5000000);
                    Console.WriteLine("Detached task completed.");
                });

            });

            outer.Wait();
            Console.WriteLine("Outer task completed.");

            /* Output:
                Outer task beginning.
                Outer task completed.
                Detached task completed.

             */

请注意,外部任务不会等待嵌套任务完成。

创建子任务

如果在一个任务中运行的用户代码创建任务时指定了 AttachedToParent 选项,则该新任务称为原始任务的子任务,原始任务称为父任务。 因为父任务隐式地等待所有子任务完成,所以可以使用 AttachedToParent 选项表示结构化的任务并行。 下面的示例演示一个任务,该任务创建一个子任务:

Dim parent = Task.Factory.StartNew(Sub()
                                       Console.WriteLine("Parent task beginning.")
                                       Dim child = Task.Factory.StartNew(Sub()
                                                                             Thread.SpinWait(5000000)
                                                                             Console.WriteLine("Attached child completed.")
                                                                         End Sub,
                                                                         TaskCreationOptions.AttachedToParent)

                                   End Sub)
outer.Wait()
Console.WriteLine("Parent task completed.")

' Output:
'     Parent task beginning.
'     Attached child completed.
'     Parent task completed.
var parent = Task.Factory.StartNew(() =>
{
    Console.WriteLine("Parent task beginning.");

    var child = Task.Factory.StartNew(() =>
    {
        Thread.SpinWait(5000000);
        Console.WriteLine("Attached child completed.");
    }, TaskCreationOptions.AttachedToParent);

});

parent.Wait();
Console.WriteLine("Parent task completed.");

/* Output:
    Parent task beginning.
    Attached task completed.
    Parent task completed.
 */

有关更多信息,请参见嵌套任务和子任务

等待任务

System.Threading.Tasks.Task 类型和 System.Threading.Tasks.Task<TResult> 类型提供了 Task.WaitTask<TResult>.Wait 方法的一些重载,使您能够等待任务完成。 此外,静态 Task.WaitAll 方法和 Task.WaitAny 方法的重载使您可以等待任务数组的任一任务或所有任务完成。

通常,会出于以下某个原因等待任务:

  • 主线程依赖于任务计算的最终结果。

  • 您必须处理可能从任务引发的异常。

下面的示例演示不包含异常处理的基本模式。

Dim tasks() =
{
    Task.Factory.StartNew(Sub() MethodA()),
    Task.Factory.StartNew(Sub() MethodB()),
    Task.Factory.StartNew(Sub() MethodC())
}

' Block until all tasks complete.
Task.WaitAll(tasks)

' Continue on this thread...
Task[] tasks = new Task[3]
{
    Task.Factory.StartNew(() => MethodA()),
    Task.Factory.StartNew(() => MethodB()),
    Task.Factory.StartNew(() => MethodC())
};

//Block until all tasks complete.
Task.WaitAll(tasks);

// Continue on this thread...

有关演示异常处理的示例,请参见如何:处理由任务引发的异常

某些重载允许您指定超时,而其他重载采用附加 CancellationToken 作为输入参数,以便可以通过编程方式或为了响应用户输入来取消等待本身。

当您等待任务时,是在隐式等待通过使用 TaskCreationOptions AttachedToParent 选项创建的该任务的所有子级。 Task.Wait 在该任务已完成时立即返回。 即使 Wait 方法是在某任务完成之后调用的,Wait 方法也会引发由该任务引发的任何异常。

有关更多信息,请参见如何:等待一个或多个任务完成

任务中的异常处理

当某个任务引发一个或多个异常时,异常包装在 AggregateException 中。 该异常传播回与该任务联接的线程,此线程通常是正在等待该任务或尝试访问该任务的 Result 属性的线程。 此行为用于强制实施所有未处理的异常默认情况下应关闭进程的 .NET Framework 策略。 调用代码可以通过在任务或任务组上使用 WaitWaitAllWaitAny 方法或 Result() 属性,或者通过在 try-catch 块中包括 Wait 方法,来处理异常。

联接线程也可以通过在对任务进行垃圾回收之前访问 Exception 属性来处理异常。 通过访问此属性,可防止未处理的异常触发在对象完成时关闭进程的异常传播行为。

有关异常和任务的的更多信息,请参见异常处理(任务并行库)如何:处理由任务引发的异常

取消任务

Task 类支持协作取消,并与 .NET Framework 版本 4 中新增的 System.Threading.CancellationTokenSource 类和 System.Threading.CancellationToken 类完全集成。 System.Threading.Tasks.Task 类中的大多数构造函数采用 CancellationToken 作为输入参数。 许多 StartNew 重载也采用 CancellationToken

您可以创建标记,并使用 CancellationTokenSource 类在以后某一时间发出取消请求。 可以将该标记作为参数传递给 Task,还可以在执行响应取消请求的工作的用户委托中引用同一标记。 有关更多信息,请参见任务取消如何:取消任务及其子级

TaskFactory 类

TaskFactory 类提供静态方法,这些方法封装了用于创建和启动任务和延续任务的一些常用模式。

默认 TaskFactory 可作为 Task 类或 Task<TResult> 类上的静态属性访问。 您还可以直接实例化 TaskFactory 并指定各种选项,包括 CancellationTokenTaskCreationOptions 选项、TaskContinuationOptions 选项或 TaskScheduler。 创建任务工厂时所指定的任何选项将应用于它创建的所有任务,除非该任务是通过使用 TaskCreationOptions 枚举创建的(在这种情况下,任务的选项重写任务工厂的选项)。

无委托的任务

在某些情况下,可能需要使用 Task 封装由外部组件(而不是您自己的用户委托)执行的某个异步操作。 如果该操作基于异步编程模型 Begin/End 模式,您可以使用 FromAsync 方法。 如果不是这种情况,您可以使用 TaskCompletionSource<TResult> 对象将该操作包装在任务中,并因而获得 Task 可编程性的一些好处,例如对异常传播和延续的支持。 有关更多信息,请参见 TaskCompletionSource<TResult>

自定义的计划程序

大多数应用程序或库开发人员并不关心任务在哪个处理器上运行、任务如何将其工作与其他任务同步以及如何在 System.Threading.ThreadPool 中计划任务。 他们只需要它在主机上尽可能高效地执行。 如果需要对计划细节进行更细化的控制,可以使用任务并行库在默认任务计划程序上配置一些设置,甚至是提供自定义计划程序。 有关更多信息,请参见 TaskScheduler

相关数据结构

TPL 有几种在并行和顺序方案中都有用的新公共类型。 它们包括 System.Collections.Concurrent 命名空间中的一些线程安全的、快速且可缩放的集合类,还包括一些新的同步类型(例如 SemaphoreLock 和 System.Threading.ManualResetEventSlim),对特定类型的工作负荷而言,这些新同步类型比旧的同步类型效率更高。 .NET Framework Framework 版本 4 中的其他新类型(例如 System.Threading.BarrierSystem.Threading.SpinLock)提供了早期版本中未提供的功能。 有关更多信息,请参见用于并行编程的数据结构

自定义任务类型

建议不要从 System.Threading.Tasks.TaskSystem.Threading.Tasks.Task<TResult> 继承。 相反,应使用 AsyncState 属性将其他数据或状态与 TaskTask<TResult> 对象相关联。 还可以使用扩展方法扩展 TaskTask<TResult> 类的功能。 有关扩展方法的更多信息,请参见扩展方法(C# 编程指南)扩展方法 (Visual Basic)

如果必须从 TaskTask<TResult> 继承,则不能使用 System.Threading.Tasks.TaskFactorySystem.Threading.Tasks.TaskFactory<TResult>System.Threading.Tasks.TaskCompletionSource<TResult> 类创建自定义任务类型的实例,因为这些类仅创建 TaskTask<TResult> 对象。 此外,不能使用 TaskTask<TResult>TaskFactoryTaskFactory<TResult> 提供的任务延续机制创建自定义任务类型的实例,因为这些机制也只创建 TaskTask<TResult> 对象。

相关主题

标题

说明

延续任务

描述延续任务的工作方式。

嵌套任务和子任务

描述子任务与嵌套任务之间的差异。

任务取消

描述在 Task 类中内置的取消支持。

异常处理(任务并行库)

描述如何处理并行线程中的异常。

如何:使用 Parallel.Invoke 来执行并行操作

描述如何使用 Invoke

如何:从任务中返回值

描述如何从任务中返回值。

如何:等待一个或多个任务完成

描述如何等待任务。

如何:取消任务及其子级

描述如何取消任务。

如何:处理由任务引发的异常

描述如何处理由任务引发的异常。

如何:用延续将多个任务链接在一起

描述在一个任务完成时如何执行另一个任务。

如何:使用并行任务遍历二叉树

描述如何使用任务遍历二叉树。

数据并行(任务并行库)

描述如何使用 ForForEach 来创建循环访问数据的并行循环。

.NET Framework 中的并行编程

.NET 并行编程的顶级节点。

请参见

概念

.NET Framework 中的并行编程

修订记录

日期

修订记录

原因

2011 年 3 月

添加了有关如何从 TaskTask<TResult> 类继承的信息。

信息补充。