December 2009

Volume 24 Number 12

Concurrent Affairs - Data-Parallel Patterns and PLINQ

By Igor Ostrovsky | December 2009

Multicore processors are now ubiquitous on mainstream desktop computers, but applications that use their full potential are still difficult to write. Multicore parallelism is certainly feasible, however, and a number of popular applications have been retrofitted to provide a performance boost on multicore computers. Version 4 of the .NET Framework will deliver several tools that programmers can employ to make this task easier: a set of new coordination and synchronization primitives and data structures, the Task Parallel Library and Parallel LINQ (PLINQ). This article focuses on the last item in this list, PLINQ.

PLINQ is an interesting tool that makes writing code that scales on multicore machines much easier—provided that your problem matches a data-parallel pattern. PLINQ is a LINQ provider, so to program against it, you use the familiar LINQ model. PLINQ is very similar to LINQ to Objects, except that it uses multiple threads to schedule the work to evaluate a query. To bind a query to PLINQ instead of LINQ to Objects, you simply add an AsParallel call after the data source, as shown in the following code. This step wraps the data source with a ParallelQuery wrapper and causes the remaining extension methods in the query to bind to PLINQ rather than to LINQ to Objects.

    //C#
    
    IEnumerable<int> src = ...
    var query = 
          src.AsParallel()
          .Where(x => x % 2 == 0)
          .Select(x => Foo(x));
    
    foreach(var x in query)
    {
          Bar(x); }
    ‘Visual Basic
    
    Dim src As IEnumerable(Of Integer) = ‘.........
            Dim query = src.AsParallel().Where(Function(x)
                                                   Return x Mod 2 = 0
                                               End Function).Select(Function(x)
                                                                        Foo(x)
                                                                    End Function)
    
            For Each x In query
                Bar(x)
            Next x

The same code looks like the following using the C# query syntax:

    //C#
    
    IEnumerable<int> src = ...
    var query = 
          from x in src.AsParallel()
          where x % 2 == 0
          select Foo(x);
    
    foreach(var x in query)
    {
          Bar(x); }
    ‘Visual Basic
    
    Dim src As IEnumerable(Of Integer) = ‘……
            Dim query = From x In src.AsParallel()
                        Where x Mod 2 = 0
                        Select Foo(x)
    
            For Each x In query
                Bar(x)
            Next x

However, putting AsParallel into a LINQ-to-Objects query does not guarantee that your program will run faster. PLINQ attempts to use appropriate algorithms to partition the data, execute parts of the query independently in parallel and then merge the results. Whether this strategy results in a performance improvement on multicore machines depends on several factors.

To benefit from PLINQ, the total work in the query has to be large enough to hide the overhead of scheduling the work on the thread pool, and the work per element should be significant enough to hide the small amount of overhead to process that element. Also, PLINQ performs best when the most expensive part of the query can be decomposed in such a way that different worker threads evaluate the expensive computation on different input elements.

In the remaining part of the article, I’ll look at the types of data-parallel patterns that can be effectively parallelized using PLINQ.

Projection

Projection, mapping, Select operator—all these terms refer to the same common and naturally data-parallel operation. In a projection, you have a projection function that takes one argument and computes an answer, and you need to evaluate that function on a set of inputs.

Projection is naturally data-parallel because the projection function can be evaluated on different input elements concurrently. If the function is at least somewhat computationally expensive, PLINQ should be able to speed up the computation by distributing the work of evaluating the function among multiple cores on the machine.

For example, in the following query, PLINQ evaluates the calls to ExpensiveFunc in parallel (at least on multicore machines):

    //C#
    
    int[] src = Enumerable.Range(0, 100).ToArray();
    var query = src.AsParallel()
                 .Select(x => ExpensiveFunc(x));
    
    foreach(var x in query)
    {
          Console.WriteLine(x);
    }
    ‘Visual Basic
    
    Dim src() = Enumerable.Range(0, 100).ToArray()
            Dim query = src.AsParallel().Select(Function(x)
                                                    Return ExpensiveFunc(x)
                                                End Function)
            For Each x In query
                Console.WriteLine(x)
            Next x

This block of code prints the values ExpensiveFunc(0), ExpensiveFunc(1) and so on to ExpensiveFunc(99) on the console. However, the values are not necessarily printed in the expected order. By default, PLINQ treats sequences as unordered, so the values are printed in an undefined order.

To tell PLINQ to treat the src array as an ordered sequence, you can use the AsOrdered operator:

    //C#
    
    int[] src = Enumerable.Range(0, 100).ToArray();
    var query = src.AsParallel().AsOrdered()
                 .Select(x => ExpensiveFunc(x));
    
    foreach(var x in query)
    {
          Console.WriteLine(x);
    ‘Visual Basic
    Dim src() = Enumerable.Range(0, 100).ToArray()
            Dim query = src.AsParallel().AsOrdered().Select(Function(x) ExpensiveFunc(x))                                                            For Each x In query
    Console.WriteLine(x)
    Next x

Now the results are printed on the screen in the expected order, from ExpensiveFunc(0) to ExpensiveFunc(99). PLINQ incurs some additional overhead for each input element to preserve the ordering, but this is typically only a modest cost.

In the cases that we’ve examined so far, the PLINQ query is always consumed in a for loop. In such scenarios, PLINQ sets up asynchronous workers that compute the results in the background, and the for loop waits whenever the next result is not yet ready. However, this is not the only way to consume a PLINQ query. Alternatively, you can execute the query by operators such as ToArray, ToList and ToDictionary:

    //C#
    
    int[] src = Enumerable.Range(0, 100).ToArray();
    var query = src.AsParallel().AsOrdered()
                 .Select(x => ExpensiveFunc(x));
    
    int[] results = query.ToArray(); // The query runs here
    ‘Visual Basic
    
    Dim src() = Enumerable.Range(0, 100).ToArray()
            Dim query = src.AsParallel().AsOrdered().Select(Function(x) ExpensiveFunc(x))
            Dim results() As Integer = query.ToArray() ' The query runs here

Again, PLINQ makes the calls to ExpensiveFunc in parallel, speeding up the query execution. This time, however, the execution is synchronous—the entire query is completed on the one line identified in the code sample.

Instead of converting the results to an array, you could compute the sum of the results, or use min, max, average, or a user-defined aggregation of the results:

    //C#
    
    int[] src = Enumerable.Range(0, 100).ToArray();
    var query = src.AsParallel()
                 .Select(x => ExpensiveFunc(x));
    
    int resultSum = query.Sum(); // The query runs here
    ‘Visual Basic
    
    Dim src() = Enumerable.Range(0, 100).ToArray()
            Dim query = src.AsParallel().Select(Function(x) ExpensiveFunc(x))
            Dim resultSum As Integer = query.Sum() ' The query runs here

As yet another possibility, you could execute an action for each element produced:

    //C#
    
    int[] src = Enumerable.Range(0, 100).ToArray();
    var query = src.AsParallel()
                 .Select(x => ExpensiveFunc(x));
    
    int resultSum = query.ForAll(
          x => Console.WriteLine(x)
    );
    ‘Visual Basic
    
    Dim src() = Enumerable.Range(0, 100).ToArray()
            Dim query = src.AsParallel().Select(Function(x) ExpensiveFunc(x))
            Dim resultSum As Integer = query.ForAll(Function(x) Console.WriteLine(x))

There is an important difference between this example, which uses ForAll, and the first example in this section, which uses a for loop. In the ForAll example, the actions execute on the PLINQ worker threads. In the for loop example, the loop body obviously executes on the thread that creates the PLINQ query.

Finally, in writing parallel projection queries, you might run into one more difficulty that is worth calling out. PLINQ achieves parallel execution by splitting the input sequence into multiple sequences and then processing the sequences in parallel. The sequence-splitting step is called “partitioning,” and your choice of partitioning algorithm could have a significant impact on the performance of your queries.

PLINQ typically chooses a good algorithm to partition your input sequence, but one case where you might want to override PLINQ’s choice is if the input is an array (or another type implementing IList). In such cases, the default PLINQ behavior is to partition the array statically into the same number of sections as there are cores on the machine. But if the cost of the projection element varies per element, all expensive elements could end up in one partition.

To get PLINQ to use a load-balancing partitioning algorithm for arrays (or other IList types), you can use the Partitioner.Create method, passing in a true value for the loadBalancing argument:

    //C#
    
    int[] src = Enumerable.Range(0, 100).ToArray();
    var query = Partitioner.Create(src, true).AsParallel()
                 .Select(x => ExpensiveFunc(x));
    
    foreach(var x in query)
    {
          Console.WriteLine(x);
    }
    ‘Visual Basic
    
    Dim src() = Enumerable.Range(0, 100).ToArray()
            Dim query = Partitioner.Create(src, True).AsParallel().Select(Function(x) ExpensiveFunc(x))
    
            For Each x In query
                Console.WriteLine(x)
            Next x

Filtering

A slight variation of the projection pattern is filtering. Here, instead of having a projection function that computes an output from each input, you have a filtering function that decides whether a particular element should be included in the output.

For best parallel speedup, the filtering function should be computationally expensive to evaluate. In certain cases, filtering with even a cheap function might scale very well, especially when the filtering function rejects most inputs. In this sample, PLINQ prints those numbers in the range [0 to 99] on which ExpensiveFilter returns true:

    //C#
    
    int[] src = Enumerable.Range(0, 100).ToArray();
    var query = src.AsParallel()
                 .Where(x => ExpensiveFilter(x));
    
    foreach(var x in query)
    {
          Console.WriteLine(x);
    }
    ‘Visual Basic
    
    Dim src() = Enumerable.Range(0, 100).ToArray()
            Dim query = src.AsParallel().Where(Function(x) ExpensiveFilter(x))
    
            For Each x In query
                Console.WriteLine(x)
            Next x

As in the first projection example, the results here will be unordered. The solution to making the results ordered is the same: simply add AsOrdered after AsParallel. In fact, all the other follow-up points explained earlier about projections apply to filtering as well. This means that the query can be consumed using foreach, ToArray/ToList/ToDictionary, aggregation, or ForAll. Also, you may want to override the default partitioning scheme if your input is in an array, and static partitioning may lead to load imbalances. (These options work generally the same way for the remaining patterns in this article as well.)

Independent Actions

In the projection and filtering patterns, the expensive part of the computation is converting an input sequence into an output sequence. A simpler pattern is an expensive action that needs to be performed for each sequence element. The action does not need to return a value; it simply executes some computationally expensive and thread-safe side effect, as shown here:

    //C#
    
    int[] src = Enumerable.Range(0, 100).ToArray();
    src.AsParallel()
    .ForAll(
         x => { ExpensiveAction(x); }
    );
    ‘Visual Basic
    
    Dim src() = Enumerable.Range(0, 100).ToArray()
            src.AsParallel().ForAll(Sub(x) ExpensiveAction(x))

For concurrency, PLINQ executes ExpensiveAction on worker threads. This means that ExpensiveAction should be computationally expensive and, even more importantly, thread safe. Since ExpensiveAction is invoked on different threads, no order is implied among the invocations.

As it turns out, this pattern is so simple that you don’t need PLINQ and can simply use the Parallel.ForEach method available in the Task Parallel Library (as of .NET Framework 4). However, ForAll in PLINQ is often handy when it is combined with other PLINQ operators:

    //C#
    
    int[] src = Enumerable.Range(0, 100).ToArray();
    src.AsParallel()
    .Where(x => x%2 == 0)
    .ForAll(
         x => { ExpensiveAction(x); }
    );
    ‘Visual basic
    
    Dim src() = Enumerable.Range(0, 100).ToArray()
            src.AsParallel().Where(Function(x) x Mod 2 = 0).ForAll(Sub(x) ExpensiveAction(x))

Sequence Zipping

Sequence zipping is a pattern similar to projection, except that two input sequences are present rather than one. Instead of having an expensive function that converts one input element into one output element, you have an expensive function that converts one input from one sequence and one input from the other sequence into a single output element.

This pattern is supported by using the Zip LINQ-to-Objects operator introduced in .NET 4.0. You can use the operator in PLINQ as well. For the best performance, the input sequences should be in arrays or IList collections:

    //C#
    
    int[] arr1 = ..., arr2 = ...;
    int[] results =
          arr1.AsParallel().AsOrdered()
          .Zip(
               arr2.AsParallel().AsOrdered(),
               (arr1Elem, arr2Elem) => ExpensiveFunc(arr1Elem, arr2Elem))
          .ToArray();
    ‘Visual Basic
    
    Dim arr1() = ‘......
    Dim arr2() = ‘......
    Dim results() = arr1.AsParallel().AsOrdered().Zip(arr2.AsParallel().AsOrdered(), Function(arr1Elem, arr2Elem)
                                                                                                            Return ExpensiveFunc(arr1Elem, arr2Elem)
                                                                                                        End Function).ToArray()

In fact, you might notice that if you have the input sequences in arrays, the Zip operator can be conveniently restated as a projection:

    //C#
    
    int[] arr1 = ..., arr2 = ...;
    int length = Math.Min(arr1.Length, arr2.Length);
    int[] results = 
          ParallelEnumerable.Range(0, length).AsOrdered()
          .Select(index => ExpensiveFunc(arr1[index], arr2[index]))
          .ToArray();
    ‘Visual Basic
    
    Dim arr1() = ‘.........
    Dim arr2() = ‘.........
            Dim length = Math.Min(arr1.Length, arr2.Length)
            Dim results() = ParallelEnumerable.Range(0, length).AsOrdered().Select(Function(index)
                                                                                                  Return ExpensiveFunc(arr1(index), arr2(index))
                                                                                              End Function).ToArray()

Regardless of which implementation of the pattern you choose, remember that this type of workload can be nicely sped up with PLINQ.

Reduction

Reduction, also known as aggregation or folding, is an operation in which elements of a sequence are combined until you are left with a single result. Sum, average, min and max are a few popular reductions, and these reductions are so frequently used that they are directly supported by PLINQ as operators (Sum, Average, Min and Max). However, these operators perform little work per element, so in PLINQ they are usually used in queries that also contain an expensive computation—a projection or a filter, for example. One possible exception to this rule is a min or max operation with an expensive comparison function. However, if the reduction function is an expensive operation, a reduction can be a parallel workload in its own right.

There are several different overloads of Aggregate, but I will not discuss them in this article because of space constraints. (See blogs.msdn.com/pfxteam/archive/2008/01/22/7211660.aspx and blogs.msdn.com/pfxteam/archive/2008/06/05/8576194.aspx for a more thorough discussion of PLINQ reductions.) The most general overload of Aggregate has this signature:

    //C#
    
    public static TResult Aggregate<TSource, TAccumulate, TResult>(
        this ParallelQuery<TSource> source,
        Func<TAccumulate> seedFactory,
        Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
        Func<TAccumulate, TAccumulate, TAccumulate> combineAccumulatorsFunc,
        Func<TAccumulate, TResult> resultSelector)
    ‘Visual Basic
    
    <System.Runtime.CompilerServices.Extension()> _
    Public Shared Function Aggregate(Of TSource, TAccumulate, TResult)(ByVal source As ParallelQuery(Of TSource), ByVal seedFactory As Func(Of TAccumulate), ByVal updateAccumulatorFunc As Func(Of TAccumulate, TSource, TAccumulate), ByVal combineAccumulatorsFunc As Func(Of TAccumulate, TAccumulate, TAccumulate), ByVal resultSelector As Func(Of TAccumulate, TResult)) As TResult
    End Function

And here is how you’d use it to implement a parallel Average operator:

    //C#
    
    public static double Average(this IEnumerable<int> source)
    {
          return source.Aggregate(
                 () => new double[2],
                 (acc, elem) => {
                       acc[0] += elem; acc[1]++; return acc;
                 },
                 (acc1, acc2) => {
                       acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1;
                 },
                 acc => acc[0] / acc[1]);
    }
    ‘Visual Basic
    
    <System.Runtime.CompilerServices.Extension()> _
        Public Shared Function Average(ByVal source As IEnumerable(Of Integer)) As Double
            Return source.Aggregate(Function()
                                        Return New Double(1)
                                    End Function, Function(acc, elem)
                                                      acc(0) = acc(0) + elem
                                                      acc(1) = acc(1) + 1
                                                      Return acc
                                                  End Function, Function(acc1, acc2)
                                                                    acc1(0) = acc1(0) + acc2(0)
                                                                    acc1(1) = acc1(1) + acc2(1)
                                                                    Return acc1
                                                                End Function, Function(acc)
                                                                                  Return acc(0) / acc(1)
                                                                              End Function)
    End Function

Each PLINQ worker initializes its accumulator by using seedFactory, so it will get its own array of two double values. Then the worker processes part of the input sequence, updating its accumulator with each element by using updateAccumulatorFunc. Next, different workers combine their accumulators by using combineAccumulatorsFunc, and finally the single accumulator is converted into the return value by using resultSelector.

Keep in mind that although the parallel Average operator sample is convenient for explaining the semantics of the Aggregate operator, its work per element (two additions) is probably too low to make parallelization worthwhile. Scenarios with a more expensive reduction function often come up in the real world, though.

Sorting

LINQ supports sorting via the OrderBy operator, and PLINQ naturally implements the sort by using a parallel algorithm. Usually, the sorting algorithm seems to get a decent speedup against the LINQ-to-Objects sort (perhaps two to three times on a four-core machine). However, one fact to remember is that the LINQ-to-Objects sorting model imposes a fairly heavy interface on OrderBy. The key selector is mandatory and passed in as a delegate rather than an expression tree, so PLINQ does not ignore the key selector even if it is an identity function, x => x. Consequently, PLINQ manipulates key-value pairs, even in cases where keys are equal to values. Also, as a result of the functional nature of LINQ, PLINQ cannot sort the sequence in place even if it is in an array because that would destroy the original sequence.

With this in mind, if you are using the LINQ-to-Objects OrderBy operator, you should be able to speed up your query by using PLINQ. However, if you need to sort only an array of integers, an in-place sort like Array.Sort is likely to be faster than PLINQ’s OrderBy. If you need to speed up an in-place sort, you might have to implement your own parallel sorting algorithm on top of Task Parallel Library.

One-to-Many Transformation

A projection converts every input element into exactly one output element. Using a filter, you can convert every input element into zero or one output elements. But what if you want to be able to generate an arbitrary number of outputs from each input? PLINQ supports that case as well through the SelectMany operator. Here’s an example:

    //C#
    
    IEnumerable<int> inputSeq = ...
    int[] results = 
          inputSeq.AsParallel()
          .SelectMany(input => ComputeResults(input))
          .ToArray();
    ‘Visual basic
    
    Dim inputSeq As IEnumerable(Of Integer) = Enumerable.Range(0, 100).ToArray()
            Dim results() = inputSeq.AsParallel().SelectMany(Function(input) ComputeResults(input)).ToArray()

This code calls ComputeResults on every element in the input sequence. Each ComputeResults returns an IEnumerable type (for example, an array) that contains zero, one, or multiple results. The output of the query contains all the results returned by the ComputeResults calls.

Because this pattern is a little less intuitive than the other patterns in this article, let’s take a look at a concrete example of its use. The one-to-many pattern could implement a search algorithm for a problem such as the familiar N-Queens problem (find all placements of queens on a chessboard so that no two queens attack each other). The input sequence would be a sequence of chessboards with a few queens already in place. Then you would use a query with the SelectMany operator to find all N-Queens solutions that can be reached starting from any of the initial states in the input:

    //C#
    
    IEnumerable<ChessboardState> initStates = GenerateInitialStates();
    ChessboardState[] solutions = 
          initStates.AsParallel()
          .SelectMany(board => board.FindAllSolutions())
          .ToArray();
    ‘Visual Basic
    
    Dim initStates As IEnumerable(Of ChessboardState) = GenerateInitialStates()
            Dim solutions() = initStates.AsParallel().SelectMany(Function(board) board.FindAllSolutions()).ToArray()

More Complex Queries

The PLINQ patterns discussed in this article are all short query snippets, generally with one or two operators. Of course, different patterns can be used together in one query. The following query combines a filter, a projection and independent actions:

    //C#
    
    int[] src = Enumerable.Range(0, 100).ToArray();
    src.AsParallel()
                .Where(x => ExpensiveFilter(x))
                .Select(x => ExpensiveFunc(x));
                .ForAll(x => { ExpensiveAction(x); });
    ‘Visual basic
    
    Dim src() = Enumerable.Range(0, 100).ToArray()
            src.AsParallel().Where(Function(x) ExpensiveFilter(x)).Select(Function(x) ExpensiveFunc(x)).ForAll((Sub(x) ExpensiveAction(x)))

PLINQ will effectively parallelize this query regardless of whether the filtering, projection and side-effect actions are about the same cost, or if one of them dominates the execution time. And like LINQ to Objects, PLINQ does not materialize the result set after each operator. So, PLINQ won’t execute the Where for the entire sequence, store the filtered sequence, do the Select and finally the ForAll. Operations are combined as much as possible, and in simpler queries a worker will “flow” an input element through the entire query before moving on to the next element.

In addition to combining the parallel patterns with one another, you can combine them with any of the LINQ operators. PLINQ strives to maintain parity with LINQ to Objects, so all LINQ operators are available. Even though PLINQ will execute just about any LINQ-to-Objects query, it does not necessarily make the query faster. Some operators and query shapes do not parallelize well, if at all. Ideally, the most computationally expensive part of the query should have the form of one of the parallel patterns from this article.

One thing to be aware of is that in some complex queries, PLINQ decides to execute parts of the query sequentially instead of using potentially expensive algorithms needed for parallel execution. That may not be what you want, especially if your query contains an expensive delegate that would dominate the execution time anyway. In this code sample, PLINQ decides to execute the ExpensiveFunc() delegates sequentially:

    //C#
    
    int[] src = Enumerable.Range(0, 100).ToArray();
    int[] res = src.AsParallel()
                 .Select(x => ExpensiveFunc(x))
                 .TakeWhile(x => x % 2 == 0)
                 .ToArray();
    ‘Visual Basic
    
    Dim src() = Enumerable.Range(0, 100).ToArray()
            Dim res() As Integer = src.AsParallel().Select(Function(x) ExpensiveFunc(x)).TakeWhile(Function(x) x Mod 2 = 0).ToArray()

You can solve this issue in two ways. You can give PLINQ a hint to execute the query in parallel, even if potentially expensive 
algorithms have to be used:

    //C#
    
    int[] src = Enumerable.Range(0, 100).ToArray();
    int[] res = src.AsParallel()
                 .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
                 .Select(x => ExpensiveFunc(x))
                 .TakeWhile(x => x % 2 == 0)
                 .ToArray();
    ‘Visual Basic
    
    Dim src() = Enumerable.Range(0, 100).ToArray()
            Dim res() = src.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism).Select(Function(x) ExpensiveFunc(x)).TakeWhile(Function(x) x Mod 2 = 0).ToArray()

Or you can decompose the query so that PLINQ executes only the expensive part of the query and LINQ to Objects executes the rest. You can use the AsSequential operator in a PLINQ query to get subsequent operators to bind to LINQ to Objects:

    //C#
    
    int[] src = Enumerable.Range(0, 100).ToArray();
    int[] res = src.AsParallel()
                 .Select(x => ExpensiveFunc(x))
                 .AsSequential()
                 .TakeWhile(x => x % 2 == 0)
                 .ToArray();
    ‘Visual Basic
    
    Dim src() = Enumerable.Range(0, 100).ToArray()
            Dim res() = src.AsParallel().Select(Function(x) ExpensiveFunc(x)).AsSequential().TakeWhile(Function(x) x Mod 2 = 0).ToArray()

Make the Most of PLINQ

Writing multicore applications can be hard, but it does not always have to be. PLINQ is a useful tool to have in your toolbox to speed up data-parallel computations when you need to. Remember the patterns, and use them appropriately in your programs.        


Igor Ostrovsky is a software development engineer on the Parallel Computing Platform team at Microsoft. He is the primary developer for PLINQ.

Thanks to the following technical experts for reviewing this article: Michael Blome

Send your questions and comments for Igor to mmsync@microsoft.com.