Digging deeper into PLINQ’s internal implementation

PLINQ is built on top of the Task Parallel Library (TPL) and promises to revolutionise the way we write programs that can benefit from the multi-core processor era. But how does it work internally?

This article assumes that you are familiar with the basics of LINQ and have an understanding PLINQ and TPL.

In this short article, I will concentrate on the techniques used by the first CTP of PLINQ to partition work streams and associate different partitions to different threads. It is worth mentioning that by default, TPL provides one thread per processor core.

The deferred execution and lazy evaluation characteristics of LINQ allow for creation of infinite size lists as the source for a LINQ query. For instance it is possible to define an enumerator that represents the set of Natural Numbers:

class NaturalNumbersEnumerable : IEnumerable<uint>

{

  public IEnumerator<uint> GetEnumerator()

  {

    uint i = 1;

    while (true)

      yield return i++;

  }

  IEnumerator IEnumerable.GetEnumerator()

  {

    return GetEnumerator();

  }

}

(I know that it is not really a full set of natural numbers and it can only go up to uint.MaxValue before either throwing an OverflowException or starting back from 0, but it fits the purpose of this article)

The fact of the matter is that the above list never ends because the more you call MoveNext on the enumerator the more natural numbers are generated. So somehow we need to limit the execution by using functions that limit the size of the returned list. As shown below, Take can be one of those functions:

var linq =

  new NaturalNumbersEnumerable()

    .Take(100);

foreach (uint i in linq)

  Console.WriteLine(i);

Executing the above code prints numbers from 1 to 100 in a sequential manner. With PLINQ however, it is possible to execute the same query using more than one thread. The work distribution algorithms used here are the focus of the remainder of this article.

An enumerator (stream) is partitioned into multiple smaller enumerators of type IQueryOperatorEnumerator<T>. The partitioned stream is represented by PartitionedStream<T> which exposes an array of IQueryOperatorEnumerator<T> that can be iterated concurrently.

One way of building a partitioned stream is through manual creation of partitions, effectively allowing for the introduction of custom partitioning algorithms. However the PartitionedStream<T> derived type responsible for this (ManuallyPartitionedStream<T>) is an internal class at present. If you believe that such extensibility can be useful then let the product team know through here. I personally think that it would be a valuable feature as the users often have a deep understanding of the source stream and can better decide on the size and the content of each partition resulting in an improved experience.

Internally there are at least 2 different algorithms used to create partitioned streams. One algorithm partitions the stream based on a range and the other based on a hashcode.

Range based partitioning is sensitive to the type of the source stream. If the source is an indexible data type (i.e. an array or an implementer of IList<T>) then the partitioning is done based on a unique range given to each partition. Also, the source stream is shared by all partitions and there are no range overlaps between partitions. Obviously the access to the source does not need to be synchronised, therefore the indexer of the source may need to be thread-safe.

Note that range partitioning assumes that the work required to process each element will take roughly the same time and will be largely homogenous. This might not always be true and a custom partitioning solution might perform better.

However if the source is not an IList<T>, we can no longer assume that we are dealing with a finite set of items. This in itself takes away the ability to index into the source stream. Nonetheless we still need to find a way to partition the source.

As the source can have an infinite range, we can no longer decide on the actual size of each partition. This means that each partition can grow substantially during the execution. As part of the growth phase a subset of elements from the source stream are read and stored locally in memory. When a MoveNext is called, the next item is returned from this internal buffer and if needed a new growth operation is performed synchronously.

Due to the nature of these source streams, requests for a move of the reader and the read of the current value must be synchronised using a synchronization primitive such as a lock. Therefore amazingly, you can safely assume that MoveNext() and the Current property of the source stream are not called by more than one thread at a time. Hence, in our Natural Number example, we can rely on ‘i++’ instead of Interlocked.Increment of ‘i’:

class NaturalNumbersEnumerable : IEnumerable<uint>

{

  public IEnumerator<uint> GetEnumerator()

  {

    uint i = 1;

    while (true)

    {

      yield return i;

      Interlocked.Increment(ref i);

    }

  }

...

This chunk partitioning with self-expanding partitions has a number of implications:

- High cost of synchronization

- Memory cost of caching sections of the source stream per partition

- Reading data elements from the source stream that could never be used:

In the example below, it is realistic to expect the generation of more than 100 natural numbers even though Take(100) can only consume the first 100 numbers. In fact, on my dual core processor, it generated around 130 numbers

var linq =

  new NaturalNumbersEnumerable().AsParallel()

    .Take(100);

foreach (uint i in linq)

  Console.Write(i);

 

Read the next paragraph with a pinch of salt as I cannot find signs of its use by the current CTP!

The hash based partitioning algorithm aims to divide the source stream into smaller enumerators. The content of each partition is decided based on the hashcode provided by or for each source element. If the source stream is an IList<T> then based on the hashcode value of the items in the list, they are assigned to partitions. If however the source stream is an ICollection<T> or any other IEnumerable<T>, it is first converted into an IList<T> and then divided between partitions. Please note that it would not be possible to convert NaturalNumbersEnumerable above to an IList<T>.