Custom Partitioners for PLINQ and TPL
To parallelize an operation on a data source, one of the essential steps is to partition the source into multiple sections that can be accessed concurrently by multiple threads. PLINQ and the Task Parallel Library (TPL) provide default partitioners that work transparently when you write a parallel query or ForEach loop. For more advanced scenarios, you can plug in your own partitioner.
Kinds of Partitioning
There are many ways to partition a data source. In the most efficient approaches, multiple threads cooperate to process the original source sequence, rather than physically separating the source into multiple subsequences. For arrays and other indexed sources such as IList collections where the length is known in advance, range partitioning is the simplest kind of partitioning. Every thread receives unique beginning and ending indexes, so that it can process its range of the source without overwriting or being overwritten by any other thread. The only overhead involved in range partitioning is the initial work of creating the ranges; no additional synchronization is required after that. Therefore, it can provide good performance as long as the workload is divided evenly. A disadvantage of range partitioning is that if one thread finishes early, it cannot help the other threads finish their work.
For linked lists or other collections whose length is not known, you can use chunk partitioning. In chunk partitioning, every thread or task in a parallel loop or query consumes some number of source elements in one chunk, processes them, and then comes back to retrieve additional elements. The partitioner ensures that all elements are distributed and that there are no duplicates. A chunk may be any size. For example, the partitioner that is demonstrated in How to: Implement Dynamic Partitions creates chunks that contain just one element. As long as the chunks are not too large, this kind of partitioning is inherently load-balancing because the assignment of elements to threads is not pre-determined. However, the partitioner does incur the synchronization overhead each time the thread needs to get another chunk. The amount of synchronization incurred in these cases is inversely proportional to the size of the chunks.
In general, range partitioning is only faster when the execution time of the delegate is small to moderate, and the source has a large number of elements, and the total work of each partition is roughly equivalent. Chunk partitioning is therefore generally faster in most cases. On sources with a small number of elements or longer execution times for the delegate, then the performance of chunk and range partitioning is about equal.
The TPL partitioners also support a dynamic number of partitions. This means they can create partitions on-the-fly, for example, when the ForEach loop spawns a new task. This feature enables the partitioner to scale together with the loop itself. Dynamic partitioners are also inherently load-balancing. When you create a custom partitioner, you must support dynamic partitioning to be consumable from a ForEach loop.
Configuring Load Balancing Partitioners for PLINQ
Some overloads of the Partitioner.Create method let you create a partitioner for an array or IList source and specify whether it should attempt to balance the workload among the threads. When the partitioner is configured to load-balance, chunk partitioning is used, and the elements are handed off to each partition in small chunks as they are requested. This approach helps ensure that all partitions have elements to process until the entire loop or query is completed. An additional overload can be used to provide load-balancing partitioning of any IEnumerable source.
In general, load balancing requires the partitions to request elements relatively frequently from the partitioner. By contrast, a partitioner that does static partitioning can assign the elements to each partitioner all at once by using either range or chunk partitioning. This requires less overhead than load balancing, but it might take longer to execute if one thread ends up with significantly more work than the others. By default when it is passed an IList or an array, PLINQ always uses range partitioning without load balancing. To enable load balancing for PLINQ, use the Partitioner.Create method, as shown in the following example.
' Static number of partitions requires indexable source.
Dim nums = Enumerable.Range(0, 100000000).ToArray()
' Create a load-balancing partitioner. Or specify false For Shared partitioning.
Dim customPartitioner = Partitioner.Create(nums, True)
' The partitioner is the query's data source.
Dim q = From x In customPartitioner.AsParallel()
Select x * Math.PI
q.ForAll(Sub(x) ProcessData(x))
// Static partitioning requires indexable source. Load balancing
// can use any IEnumerable.
var nums = Enumerable.Range(0, 100000000).ToArray();
// Create a load-balancing partitioner. Or specify false for static partitioning.
Partitioner<int> customPartitioner = Partitioner.Create(nums, true);
// The partitioner is the query's data source.
var q = from x in customPartitioner.AsParallel()
select x * Math.PI;
q.ForAll((x) =>
{
ProcessData(x);
});
The best way to determine whether to use load balancing in any given scenario is to experiment and measure how long it takes operations to complete under representative loads and computer configurations. For example, static partitioning might provide significant speedup on a multi-core computer that has only a few cores, but it might result in slowdowns on computers that have relatively many cores.
The following table lists the available overloads of the Create method. These partitioners are not limited to use only with PLINQ or ForEach. They can also be used with any custom parallel construct.
Overload |
Uses load balancing |
---|---|
Always |
|
When the Boolean argument is specified as true |
|
When the Boolean argument is specified as true |
|
Never |
|
Never |
|
Never |
|
Never |
Configuring Static Range Partitioners for Parallel.ForEach
In a For loop, the body of the loop is provided to the method as a delegate. The cost of invoking that delegate is about the same as a virtual method call. In some scenarios, the body of a parallel loop might be small enough that the cost of the delegate invocation on each loop iteration becomes significant. In such situations, you can use one of the Create overloads to create an IEnumerable<T> of range partitions over the source elements. Then, you can pass this collection of ranges to a ForEach method whose body consists of a regular for loop. The benefit of this approach is that the delegate invocation cost is incurred only once per range, rather than once per element. The following example demonstrates the basic pattern.
Imports System.Threading.Tasks
Imports System.Collections.Concurrent
Module PartitionDemo
Sub Main()
' Source must be array or IList.
Dim source = Enumerable.Range(0, 100000).ToArray()
' Partition the entire source array.
' Let the partitioner size the ranges.
Dim rangePartitioner = Partitioner.Create(0, source.Length)
Dim results(source.Length - 1) As Double
' Loop over the partitions in parallel. The Sub is invoked
' once per partition.
Parallel.ForEach(rangePartitioner, Sub(range, loopState)
' Loop over each range element without a delegate invocation.
For i As Integer = range.Item1 To range.Item2 - 1
results(i) = source(i) * Math.PI
Next
End Sub)
Console.WriteLine("Operation complete. Print results? y/n")
Dim input As Char = Console.ReadKey().KeyChar
If input = "y"c Or input = "Y"c Then
For Each d As Double In results
Console.Write("{0} ", d)
Next
End If
End Sub
End Module
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main()
{
// Source must be array or IList.
var source = Enumerable.Range(0, 100000).ToArray();
// Partition the entire source array.
var rangePartitioner = Partitioner.Create(0, source.Length);
double[] results = new double[source.Length];
// Loop over the partitions in parallel.
Parallel.ForEach(rangePartitioner, (range, loopState) =>
{
// Loop over each range element without a delegate invocation.
for (int i = range.Item1; i < range.Item2; i++)
{
results[i] = source[i] * Math.PI;
}
});
Console.WriteLine("Operation complete. Print results? y/n");
char input = Console.ReadKey().KeyChar;
if (input == 'y' || input == 'Y')
{
foreach(double d in results)
{
Console.Write("{0} ", d);
}
}
}
}
Every thread in the loop receives its own Tuple<T1, T2> that contains the starting and ending index values in the specified sub-range. The inner for loop uses the fromInclusive and toExclusive values to loop over the array or the IList directly.
One of the Create overloads lets you specify the size of the partitions, and the number of partitions. This overload can be used in scenarios where the work per element is so low that even one virtual method call per element has a noticeable impact on performance.
Custom Partitioners
In some scenarios, it might be worthwhile or even required to implement your own partitioner. For example, you might have a custom collection class that you can partition more efficiently than the default partitioners can, based on your knowledge of the internal structure of the class. Or, you may want to create range partitions of varying sizes based on your knowledge of how long it will take to process elements at different locations in the source collection.
To create a basic custom partitioner, derive a class from System.Collections.Concurrent.Partitioner<TSource> and override the virtual methods, as described in the following table.
This method is called once by the main thread and returns an IList(IEnumerator(TSource)). Each worker thread in the loop or query can call GetEnumerator on the list to retrieve a IEnumerator<T> over a distinct partition. |
|
Return true if you implement GetDynamicPartitions, otherwise, false. |
|
If SupportsDynamicPartitions is true, this method can optionally be called instead of GetPartitions. |
If the results must be sortable or you require indexed access into the elements, then derive from System.Collections.Concurrent.OrderablePartitioner<TSource> and override its virtual methods as described in the following table.
This method is called once by the main thread and returns an IList(IEnumerator(TSource)). Each worker thread in the loop or query can call GetEnumerator on the list to retrieve a IEnumerator<T> over a distinct partition. |
|
Return true if you implement GetDynamicPartitions; otherwise, false. |
|
Typically, this just calls GetOrderableDynamicPartitions. |
|
If SupportsDynamicPartitions is true, this method can optionally be called instead of GetPartitions. |
The following table provides additional details about how the three kinds of load-balancing partitioners implement the OrderablePartitioner<TSource> class.
Method/Property |
IList / Array without Load Balancing |
IList / Array with Load Balancing |
IEnumerable |
---|---|---|---|
Uses range partitioning |
Uses chunk partitioning optimized for Lists for the partitionCount specified |
Uses chunk partitioning by creating a static number of partitions. |
|
Throws not-supported exception |
Uses chunk partitioning optimized for Lists and dynamic partitions |
Uses chunk partitioning by creating a dynamic number of partitions. |
|
Returns true |
Returns true |
Returns true |
|
Returns true |
Returns false |
Returns false |
|
Returns true |
Returns true |
Returns true |
|
Returns false |
Returns true |
Returns true |
Dynamic Partitions
If you intend the partitioner to be used in a ForEach method, you must be able to return a dynamic number of partitions. This means that the partitioner can supply an enumerator for a new partition on-demand at any time during loop execution. Basically, whenever the loop adds a new parallel task, it requests a new partition for that task. If you require the data to be orderable, then derive from System.Collections.Concurrent.OrderablePartitioner<TSource> so that each item in each partition is assigned a unique index.
For more information, and an example, see How to: Implement Dynamic Partitions.
Contract for Partitioners
When you implement a custom partitioner, follow these guidelines to help ensure correct interaction with PLINQ and ForEach in the TPL:
If GetPartitions is called with an argument of zero or less for partitionsCount, throw ArgumentOutOfRangeException. Although PLINQ and TPL will never pass in a partitionCount equal to 0, we nevertheless recommend that you guard against the possibility.
GetPartitions and GetOrderablePartitions should always return partitionsCount number of partitions. If the partitioner runs out of data and cannot create as many partitions as requested, then the method should return an empty enumerator for each of the remaining partitions. Otherwise, both PLINQ and TPL will throw an InvalidOperationException.
GetPartitions, GetOrderablePartitions, GetDynamicPartitions, and GetOrderableDynamicPartitions should never return null (Nothing in Visual Basic). If they do, PLINQ / TPL will throw an InvalidOperationException.
Methods that return partitions should always return partitions that can fully and uniquely enumerate the data source. There should be no duplication in the data source or skipped items unless specifically required by the design of the partitioner. If this rule is not followed, then the output order may be scrambled.
The following Boolean getters must always accurately return the following values so that the output order is not scrambled:
KeysOrderedInEachPartition: Each partition returns elements with increasing key indices.
KeysOrderedAcrossPartitions: For all partitions that are returned, the key indices in partition i are higher than the key indices in partition i-1.
KeysNormalized: All key indices are monotonically increasing without gaps, starting from zero.
All indices must be unique. There may not be duplicate indices. If this rule is not followed, then the output order may be scrambled.
All indices must be nonnegative. If this rule is not followed, then PLINQ/TPL may throw exceptions.
See Also
Tasks
How to: Implement Dynamic Partitions
How to: Implement a Partitioner With a Static Number of Partitions