并行 LINQ(PLINQ)是 Language-Integrated 查询(LINQ) 模式的并行实现。 PLINQ 将完整的 LINQ 标准查询运算符集实现为 System.Linq 命名空间的扩展方法,并提供额外的运算符以支持并行操作。 PLINQ 将 LINQ 语法的简单性和可读性与并行编程的强大功能相结合。
小窍门
如果不熟悉 LINQ,它具有统一模型,用于以类型安全的方式查询任何可枚举数据源。 LINQ to Objects 是针对内存中集合(例如 List<T> 和数组)运行的 LINQ 查询的名称。 本文假定你对 LINQ 有基本的了解。 有关详细信息,请参阅 Language-Integrated 查询(LINQ)。
什么是并行查询?
PLINQ 查询在许多方面类似于非并行 LINQ to Objects 查询。 PLINQ 查询(就像顺序 LINQ 查询一样)在任何内存 IEnumerable 或 IEnumerable<T> 数据源上执行操作,并具有延迟执行,这意味着查询在被枚举之前不会开始执行。 主要区别在于 PLINQ 尝试充分利用系统上的所有处理器。 它通过将数据源分区为段,然后在多个处理器上并行对单独的工作线程上的每个段执行查询来执行此作。 在许多情况下,并行执行意味着查询的运行速度要快得多。
通过并行执行,PLINQ 能够通过在 AsParallel 数据源上添加查询操作,来对旧代码的某些类型查询实现重大性能提升。 但是,并行性可能会带来其自身的复杂性,并非所有查询操作在 PLINQ 中运行得更快。 事实上,并行化实际上会降低某些查询的速度。 因此,应了解排序等问题如何影响并行查询。 有关详细信息,请参阅了解 PLINQ 中的加速。
注释
本文档使用 lambda 表达式在 PLINQ 中定义委托。 如果不熟悉 C# 或 Visual Basic 中的 lambda 表达式,请参阅 PLINQ 和 TPL 中的 Lambda 表达式。
本文的其余部分概述了主要的 PLINQ 类,并讨论了如何创建 PLINQ 查询。 每个部分都包含指向更多详细信息和代码示例的链接。
ParallelEnumerable 类
该 System.Linq.ParallelEnumerable 类几乎公开了 PLINQ 的所有功能。 它和 System.Linq 命名空间类型的其余部分一起被编译到 System.Core.dll 程序集中。 Visual Studio 中的默认 C# 和 Visual Basic 项目都引用程序集并导入命名空间。
ParallelEnumerable 包括 LINQ to Objects 支持的所有标准查询运算符的实现,尽管它不会尝试并行化每个运算符。 如果不熟悉 LINQ,请参阅 LINQ 简介(C#)和 LINQ 简介(Visual Basic)。
除了标准查询运算符之外, ParallelEnumerable 该类还包含一组方法,这些方法支持特定于并行执行的行为。 下表列出了这些特定于 PLINQ 的方法。
ParallelEnumerable 运算符 | DESCRIPTION |
---|---|
AsParallel | PLINQ 的入口点。 指定应并行化查询的其余部分(如果可能)。 |
AsSequential | 指定应按顺序运行查询的其余部分,作为非并行 LINQ 查询。 |
AsOrdered | 指定 PLINQ 应为查询的其余部分保留源序列的排序,或直到例如通过使用 orderby(在 Visual Basic 中为 Order By)子句更改排序为止。 |
AsUnordered | 指定不需要对查询的其余部分使用 PLINQ 来保留源序列的排序。 |
WithCancellation | 指定 PLINQ 应不断检查提供的取消令牌的状态,并在请求时中止执行。 |
WithDegreeOfParallelism | 指定 PLINQ 应用于并行化查询的最大处理器数。 |
WithMergeOptions | 提供有关 PLINQ 应如何(如果可能)将并行结果合并回使用线程上的一个序列的提示。 |
WithExecutionMode | 指定即使默认行为是按顺序运行查询,PLINQ 是否也应该并行化查询。 |
ForAll | 与循环访问查询结果不同,多线程枚举方法允许并行处理结果,而无需先合并回使用者线程。 |
Aggregate 超载 | 对于 PLINQ 唯一的重载,它启用对线程本地分区的中间聚合以及一个用于合并所有分区结果的最终聚合函数。 |
选择使用模型
编写查询时,通过在数据源上调用 ParallelEnumerable.AsParallel 扩展方法来启用 PLINQ,如以下示例所示。
var source = Enumerable.Range(1, 10000);
// Opt in to PLINQ with AsParallel.
var evenNums = from num in source.AsParallel()
where num % 2 == 0
select num;
Console.WriteLine($"{evenNums.Count()} even numbers out of {source.Count()} total");
// The example displays the following output:
// 5000 even numbers out of 10000 total
Dim source = Enumerable.Range(1, 10000)
' Opt in to PLINQ with AsParallel
Dim evenNums = From num In source.AsParallel()
Where num Mod 2 = 0
Select num
Console.WriteLine("{0} even numbers out of {1} total",
evenNums.Count(), source.Count())
' The example displays the following output:
' 5000 even numbers out of 10000 total
扩展AsParallel方法将后续查询运算符(在本例where
中)绑定到select
System.Linq.ParallelEnumerable实现。
执行模式
默认情况下,PLINQ 是保守的。 在运行时,PLINQ 基础结构分析查询的整体结构。 如果查询可能通过并行化产生加速,PLINQ 会将源序列分区为可以并发运行的任务。 如果并行化查询不安全,PLINQ 只需按顺序运行查询。 如果 PLINQ 在可能昂贵的并行算法或廉价的顺序算法之间进行选择,则默认选择顺序算法。 可以使用 WithExecutionMode 方法和 System.Linq.ParallelExecutionMode 枚举来指示 PLINQ 选择并行算法。 通过测试和测量,你发现特定查询可以更快地并行执行,这很有用。 有关详细信息,请参阅 如何:在 PLINQ 中指定执行模式。
并行度
默认情况下,PLINQ 使用主计算机上的所有处理器。 可以使用该方法指示 PLINQ 使用不超过指定数量的处理器 WithDegreeOfParallelism 。 如果要确保计算机上运行的其他进程收到一定数量的 CPU 时间,这非常有用。 以下代码片段将查询限制为最多使用两个处理器。
var query = from item in source.AsParallel().WithDegreeOfParallelism(2)
where Compute(item) > 42
select item;
Dim query = From item In source.AsParallel().WithDegreeOfParallelism(2)
Where Compute(item) > 42
Select item
如果查询执行大量非计算绑定工作(如文件 I/O),则指定大于计算机上的内核数的并行度可能很有用。
已排序的并行查询与无序并行查询
在某些查询中,查询运算符必须生成保留源序列排序的结果。 PLINQ 为此提供 AsOrdered 运算符。 AsOrdered 不同于 AsSequential. 序列 AsOrdered 仍并行处理,但其结果会缓冲和排序。 由于订单保留通常涉及额外的工作,因此 AsOrdered 序列的处理速度可能比默认 AsUnordered 序列慢。 特定有序并行操作是否比顺序版本更快取决于许多因素。
下面的代码示例演示了如何选择使用顺序保留。
var evenNums =
from num in numbers.AsParallel().AsOrdered()
where num % 2 == 0
select num;
Dim evenNums = From num In numbers.AsParallel().AsOrdered()
Where num Mod 2 = 0
Select num
有关详细信息,请参阅 PLINQ 中的订单保留。
并行查询与顺序查询
某些操作要求以顺序的方式传递源数据。 当需要时,ParallelEnumerable 查询运算符会自动转换为顺序模式。 对于需要顺序执行的用户定义的查询运算符和用户委托,PLINQ 提供 AsSequential 该方法。 使用 AsSequential时,查询中的所有后续运算符将按顺序执行,直到 AsParallel 再次调用为止。 有关详细信息,请参阅 如何:合并并行和顺序 LINQ 查询。
合并查询结果的选项
当 PLINQ 查询并行执行时,必须将每个工作线程的结果合并回主线程,以供 foreach
循环(在 Visual Basic 中为 For Each
)使用,或插入到列表或数组中。 在某些情况下,指定特定类型的合并作可能很有帮助,例如,更快地开始生成结果。 为此,PLINQ 支持 WithMergeOptions 该方法和 ParallelMergeOptions 枚举。 有关详细信息,请参阅 PLINQ 中的合并选项。
ForAll 运算符
在顺序 LINQ 查询中,执行一直延迟到在 foreach
(Visual Basic 中为 For Each
)循环中或通过调用 ToList、ToArray 或 ToDictionary 等方法枚举查询。 在 PLINQ 中,还可以用于 foreach
执行查询并循环访问结果。 但是, foreach
本身不会并行运行,因此,它要求将所有并行任务的输出合并回运行循环的线程。 在 PLINQ 中,当必须保留查询结果的最终顺序时,可以使用 `foreach
`,并且每当以串行方式处理结果时,例如,调用每个元素的 `Console.WriteLine
`。 若要在不需要顺序保存且结果处理本身可以并行化时加快查询执行速度,请使用 ForAll 该方法执行 PLINQ 查询。
ForAll 不执行此最终合并步骤。 下面的代码示例演示如何使用 ForAll 该方法。
System.Collections.Concurrent.ConcurrentBag<T> 在此处使用,因为它针对多个线程的并发添加进行了优化,而无需尝试删除任何项。
var nums = Enumerable.Range(10, 10000);
var query =
from num in nums.AsParallel()
where num % 10 == 0
select num;
// Process the results as each thread completes
// and add them to a System.Collections.Concurrent.ConcurrentBag(Of Int)
// which can safely accept concurrent add operations
query.ForAll(e => concurrentBag.Add(Compute(e)));
Dim nums = Enumerable.Range(10, 10000)
Dim query = From num In nums.AsParallel()
Where num Mod 10 = 0
Select num
' Process the results as each thread completes
' and add them to a System.Collections.Concurrent.ConcurrentBag(Of Int)
' which can safely accept concurrent add operations
query.ForAll(Sub(e) concurrentBag.Add(Compute(e)))
下图展示了 foreach
与 ForAll 在查询执行方面的区别。
取消
PLINQ 与 .NET 中的取消类型集成。 (有关详细信息,请参阅 托管线程中的取消。)因此,与顺序 LINQ to Objects 查询不同,可以取消 PLINQ 查询。 若要创建可取消的 PLINQ 查询,请使用 WithCancellation 查询上的运算符并提供 CancellationToken 实例作为参数。 如果令牌上的 IsCancellationRequested 属性设置为 true,PLINQ 就会注意到它,停止处理所有线程并抛出 OperationCanceledException。
设置取消令牌后,PLINQ 查询可能会继续处理某些元素。
为了提高响应速度,还可以在长时间运行的用户委托中响应取消请求。 有关详细信息,请参阅 “如何:取消 PLINQ 查询”。
例外
执行 PLINQ 查询时,可能会同时从不同的线程引发多个异常。 此外,处理异常的代码可能与引发异常的代码位于不同的线程上。 PLINQ 使用 AggregateException 类型封装查询引发的所有异常,并将这些异常封送回调用线程。 在调用线程上,只需要一个 try-catch 块。 不过,可以循环访问在 AggregateException 中封装的所有异常,并捕获任何可以安全恢复的异常。 在极少数情况下,可能会抛出一些未在 AggregateException 中包装、ThreadAbortException 也没有进行包装的异常。
如果允许异常向上冒泡回到联接线程,那么在引发异常后,查询可能会继续处理某些项。
有关详细信息,请参阅 如何:处理 PLINQ 查询中的异常。
自定义分区器
在某些情况下,可以通过编写利用源数据的某些特征的自定义分区程序来提高查询性能。 在查询中,自定义分区程序本身是查询的可枚举对象。
int[] arr = new int[9999];
Partitioner<int> partitioner = new MyArrayPartitioner<int>(arr);
var query = partitioner.AsParallel().Select(SomeFunction);
Dim arr(10000) As Integer
Dim partitioner As Partitioner(Of Integer) = New MyArrayPartitioner(Of Integer)(arr)
Dim query = partitioner.AsParallel().Select(Function(x) SomeFunction(x))
PLINQ 支持固定数量的分区(尽管数据可能在运行时动态重新分配给这些分区进行负载均衡)。 For 并且 ForEach 仅支持动态分区,这意味着在运行时更改分区数。 有关详细信息,请参阅 PLINQ 和 TPL 的自定义分区程序。
测量 PLINQ 性能
在许多情况下,可以并行化查询,但设置并行查询的开销超过了获得的性能优势。 如果查询不执行太多计算,或者数据源较小,则 PLINQ 查询的速度可能比顺序 LINQ to Objects 查询慢。 可以使用 Visual Studio Team Server 中的并行性能分析器来比较各种查询的性能,以查找处理瓶颈,并确定查询是并行运行还是按顺序运行。 有关详细信息,请参阅 并发可视化工具 以及如何 :度量 PLINQ 查询性能。