Parallel Performance
Optimize Managed Code For Multi-Core Machines
Daan Leijen and Judd Hall
This article discusses:
|
This article uses the following technologies: Parallel FX Library |
Contents
Introducing TPL
Structured Parallelism
Overexposing Parallelism
A Ray Tracer Example
Dynamic Work Distribution
Aggregation
Fork-Join Parallelism
Tasks and Futures
Replicable Tasks
The Task Manager
Multi-processor machines are now becoming standard while the speed increases of single processors have slowed down. The key to performance improvements is therefore to run a program on multiple processors in parallel. Unfortunately, it is still very hard to write algorithms that actually take advantage of those multiple processors. In fact, most applications use just a single core and see no speed improvements when run on a multi-core machine. We need to write our programs in a new way.
Introducing TPL
The Task Parallel Library (TPL) is designed to make it much easier to write managed code that can automatically use multiple processors. Using the library, you can conveniently express potential parallelism in existing sequential code, where the exposed parallel tasks will be run concurrently on all available processors. Usually this results in significant speedups.
TPL is being created as a collaborative effort by Microsoft® Research, the Microsoft Common Language Runtime (CLR) team, and the Parallel Computing Platform team. TPL is a major component of the Parallel FX library, the next generation of concurrency support for the Microsoft .NET Framework. Though it has not yet reached version 1.0, the first Parallel FX Community Tech Preview (CTP) will be available from MSDN® in Fall '07. Watch https://blogs.msdn.com/somasegar for details. TPL does not require any language extensions and works with the .NET Framework 3.5 and higher.
Visual Studio® 2008 is fully supported and all parallelism is expressed using normal method calls. For example, suppose you have the following for loop that squares the elements of an array:
for (int i = 0; i < 100; i++) {
a[i] = a[i]*a[i];
}
Since the iterations are independent of each other, that is, subsequent iterations do not read state updates made by prior iterations, you can use TPL to express the potential parallelism with a call to the parallel for method, like this:
Parallel.For(0, 100, delegate(int i) {
a[i] = a[i]*a[i];
});
Note that Parallel.For is just a normal static method with three arguments, where the last argument is a delegate expression. This delegate captures the unchanged loop body of the previous example, which makes it particularly easy to experiment with introducing concurrency into a program.
The library contains sophisticated algorithms for dynamic work distribution and automatically adapts to the workload and particular machine. Meanwhile, the primitives of the library only express potential parallelism, but do not guarantee it. For example, on a single-processor machine, parallel for loops are executed sequentially, closely matching the performance of strictly sequential code. On a dual-core machine, however, the library uses two worker threads to execute the loop in parallel, depending on the workload and configuration. This means you can introduce parallelism into your code today and your applications will use multiple processors automatically when they are available. At the same time, the code will still perform well on older single-processor machines.
Unfortunately, the library does not help to correctly synchronize parallel code that uses shared memory. It is still the programmer's responsibility to ensure that certain code can be safely executed in parallel. Other mechanisms, such as locks, are still needed to protect concurrent modifications to shared memory. TPL does offer some abstractions, though, that help with synchronization (as we will show you in a moment).
Structured Parallelism
One of the most important abstractions of the parallel programmer is a parallel loop. For example, consider the following (naïve) matrix multiplication routine:
void SeqMatrixMult(int size, double[, ] m1, double[, ] m2, double[, ] result) {
for (int i = 0; i < size; i++) {
for (int j = 0; j < size; j++) {
result[i, j] = 0;
for (int k = 0; k < size; k++) {
result[i, j] += m1[i, k] * m2[k, j];
}
}
}
}
In this example, the outer iterations are independent of one another and can potentially be done in parallel. Exposing this potential parallelism with TPL is easy. First, we reference the System.Concurrency.dll assembly during compilation. Then we are able to import the library into our code with a using statement:
using System.Concurrency;
Once the namespace is available, we can simply replace the outer for loop of the matrix multiplication with a call to the static Parallel.For method:
void ParMatrixMult(int size, double[, ] m1, double[, ] m2, double[, ] result) {
Parallel.For(0, size, delegate(int i) {
for (int j = 0; j < size; j++) {
result[i, j] = 0;
for (int k = 0; k < size; k++) {
result[i, j] += m1[i, k] * m2[k, j];
}
}
});
}
The Parallel.For construct is a normal static method with three arguments. The first two arguments specify the iteration limits (between 0 and size). The last argument is a delegate function that is called for each iteration. This delegate takes the iteration index as its first argument and executes the unchanged loop body of the previous example. No changes to the original loop body are necessary since delegates automatically capture the free variables of the loop body (like result and m1). For more information on delegate expressions, see msdn.microsoft.com/msdnmag/issues/06/00/C20.
Finally, if any exception is thrown in any of the iterations, all iterations are canceled and the first thrown exception is rethrown in the calling thread, ensuring that exceptions are properly propagated and never lost.
Without TPL, it is much harder to express the potential parallelism in this loop. Even with the help of the .NET ThreadPool class, we have to consider the cost of synchronization and the division of work. Figure 1 shows the matrix multiplication routine parallelized using the thread pool.
Figure 1 Static Work Distribution and Explicit Synchronization
void ThreadpoolMatrixMult(int size, double[, ] m1, double[, ] m2, double[, ] result) {
int N = size;
int P = 2 * Environment.ProcessorCount;
// assume twice the procs for
// good work distribution
int Chunk = N / P; // size of a work chunk
AutoResetEvent signal = new AutoResetEvent(false);
int counter = P;
// use a counter to reduce
// kernel transitions
for (int c = 0; c < P; c++) {
// for each chunk
ThreadPool.QueueUserWorkItem(delegate(Object o) {
int lc = (int) o;
for (int i = lc * Chunk;
// iterate through a work chunk
i < (lc + 1 == P ? N : (lc + 1) * Chunk);
// respect upper
// bound
i++) {
// original inner loop body
for (int j = 0; j < size; j++) {
result[i, j] = 0;
for (int k = 0; k < size; k++) {
result[i, j] += m1[i, k] * m2[k, j];
}
}
}
if (Interlocked.Decrement(ref counter) == 0) {
// use efficient
// interlocked
// instructions
signal.Set();
// and kernel transition only when done
}
}, c);
}
signal.WaitOne();
}
The example is already quite sophisticated, using the thread pool for work items and using a counter together with a single wait handle to minimize the number of kernel transitions. In addition, it statically divides the loop into chunks based on the number of processors, creating twice as many as necessary to adapt better to dynamic workloads. However, unlike Parallel.For, the approach shown in Figure 1 does not propagate exceptions in the loop body and cannot be canceled.
Obviously, this code is much harder to write and more error-prone than the Parallel.For method. Also, despite being hand-tuned and using a near-optimal division of work, the thread pool approach performs generally worse than the Parallel.For method. Figure 2 shows the results of some anecdotal testing. The results represent the relative speedup when parallelizing the outer loop of a matrix multiplication with 750x750 elements—1 represents the running time with a normal for loop. The tests were conducted on a four-socket dual-core machine with 3GB of memory and running Windows Vista® Ultimate. Notice that on the single-core machine the Parallel.For version performs practically the same as the direct for loop.
Figure 2** Parallel.For vs. ThreadPool Performance **
Overexposing Parallelism
You may have noticed that we can expose even more parallelism by parallelizing the second for loop, as in:
Parallel.For(0, size, delegate(int i) {
Parallel.For(0, size, delegate(int j) {
result[i, j] = 0;
for (int k = 0; k < size; k++) {
result[i, j] += m1[i, k] * m2[k, j];
}
});
});
Even though it is fine to nest parallel loops, the performance of this approach is generally worse for two reasons. First, in this particular example, the outer loop already exposes more than enough opportunities for parallelism since we generally have far fewer cores available than the size of the matrix. Second, every delegate expression allocates some memory to hold the free variables. That is why in our initial example, there was just a single allocation whose cost is amortized over the iterations. Unfortunately, in the new code, the inner Parallel.For will perform heap allocation on each iteration of the outer loop. Allocation is very efficient in the CLR, but this is still a noticeable cost compared to the amount of work done in each iteration.
Note that you cannot parallelize the inner loop since those iterations are not independent. In particular, there is a race since every iteration adds to the result[i,j] location. If you parallelize this loop, two iterations could concurrently read the current value into a register, add to it, and write back the result—and one addition would be lost! The only way to parallelize the inner loop is to properly protect the addition with a lock. Of course, actually doing this is not advisable: even if you ignore the extra allocations for the moment, performance is severely impacted since each concurrent iteration competes for the same lock. We'll come back to this in a bit when we discuss aggregation operations. For more information on races and locks, see What Every Dev Must Know About Multithreaded Apps.
A Ray Tracer Example
Ray tracing is a simple but powerful way to generate photorealistic renderings. The technique, however, is very computationally intensive. Ray tracing is actually an excellent candidate for our library since each ray can be computed in parallel. We took an existing ray tracer, written by Luke Hoban (see blogs.msdn.com/lukeh/archive/2007/04/03/a-ray-tracer-in-c-3-0.aspx), and modified it to run in parallel using TPL. The ray tracer generates images as shown in Figure 3 and will be available as a sample in the Parallel FX CTP. The core loop of the original ray tracer iterates through all pixels of the result image:
Figure 3** Parallel Ray Tracer **(Click the image for a larger view)
void Render(Scene scene, Color[, ] rgb) {
for (int y = 0; y < screenHeight; y++) {
for (int x = 0; x < screenWidth; x++) {
rgb[x, y] = TraceRay(new Ray(scene, x, y));
}
}
}
Since each ray can be traced independently, we only needed to make a single line change to the original code to parallelize it:
void Render(Scene scene, Color[, ] rgb) {
Parallel.For(0, screenHeight, delegate(int y) {
for (int x = 0; x < screenWidth; x++) {
rgb[x, y] = TraceRay(new Ray(scene, x, y));
}
});
}
On an eight-core machine, the original code could generate 1.7 frames every second for an image of 350-by-350 pixels. In comparison, the parallel version, running on the same eight processor machine, generates 12 frames every second. This is a 7 times speedup on an eight processor machine, which is an extremely good result for such a small change. With 12 frames per second, we are just fast enough to produce a smooth animation of the balls bouncing on the floor. And since this is a very simple ray tracer, you may be able to optimize it further to achieve even smoother animation.
Dynamic Work Distribution
When parallelizing loops manually using the thread pool, developers often end up dividing the work statically. For example, in a ray tracer, the image is often divided in even parts where each part is processed by a separate thread. In general, this is not a good idea since the actual work load might be unevenly divided. If the bottom part of the image takes, for example, twice as long to compute due to reflections, then the threads that serve the upper part of the image are left waiting most of the time for the bottom threads to finish. Even if the work is evenly divided, this can still happen due to page faults or other processes on the system that run concurrently.
To scale well on multiple processors, TPL uses work-stealing techniques to dynamically adapt and distribute work items over the worker threads. The library has a task manager that, by default, uses one worker thread per processor. This ensures minimal thread switching by the OS. Each worker thread has its own local task queue of work to be done. Each worker usually just pushes new tasks onto its own queue and pops work whenever a task is done. When its local queue is empty, a worker looks for work itself and tries to "steal" work from the queues of other workers.
The advantage here is that there is almost no synchronization between workers since the work queues are distributed and most operations are local to the worker, which is crucial for scalability. Moreover, work stealing has provably good cache locality and work distribution properties. For example, if the workload is uneven, one worker may take a long time for a specific task, but other workers will now steal work from its queue, keeping all processors busy. Dynamic work distribution is crucial in typical applications since it is hard to predict how long a task will take. This is especially true for desktop systems where processors are shared among many different processes and where we cannot predict the time slices the worker threads will get.
Figure 4 demonstrates the dynamic work distribution in action using four threads. It shows the same ray-traced image as in Figure 3 but this time each worker thread is using a different color to render its pixels. You can see that the library distributed the work evenly among the worker threads, dynamically adapting to the workload.
Figure 4** Distributing Work among Worker Threads **(Click the image for a larger view)
Besides performing dynamic work distribution, the library also dynamically adjusts the number of worker threads if workers get blocked. Some examples of blocking operations are file reads, waiting for a key press, and retrieving the user name (since this can access the network on a domain). If a task is unknowingly blocked, performance can degrade as the concurrency level drops (but the program still behaves correctly). To improve performance, the library automatically tracks to see if worker threads are blocked and injects extra worker threads to maintain the concurrency level if necessary. Once operations are unblocked, some workers may be retired to reduce the cost of thread switching.
Aggregation
A for loop is often used to iterate over a domain and aggregate the values into a single result. Take, for example, the following iterations that sum the prime numbers less than 100:
int sum = 0;
for (int i = 0; i < 100; i++) {
if (isPrime(i)) sum += i;
}
Unfortunately, we cannot parallelize this loop as it is since parallelizing this would lead to a data race. Each iteration modifies the shared sum variable without lock protection. If two concurrent iterations increment the sum at the same time, they can both potentially read the same value in a register, add to it, and write their result back—and we would lose one addition. A correct version would use a lock to protect the addition, like so:
int sum = 0;
Parallel.For(0, 100, delegate(int i) {
if (isPrime(i)) {
lock(this) {
sum += i;
}
}
});
However, the program now suffers from a performance problem since all parallel iterations contend for both the same lock and the same memory location (sum). It would be better if each worker thread would maintain a thread local sum and only add to the global sum at the end of the loop. This pattern is captured by the Paral- lel.Aggregate operation, and so we can rewrite the example as:
int sum = Parallel.Aggregate(0, 100,
// iteration domain 0,
// initial value
delegate(int i) {
return (isPrime(i) ? i : 0)
},
// apply
// on each element
delegate(int x, int y) {
return x + y;
}
// combine results
);
The aggregate operation takes five arguments. The first two specify the iteration domain, which can also be an enumerator. The next argument is the initial value for the result. The next two arguments are delegate functions. The first function is applied to each element, and the other is used to combine the element results.
The library automatically uses a thread local variable to compute the thread local results without any locking, only using a lock to combine the final thread local results. Keep in mind that if aggregation is done in parallel, it is possible for elements to be combined in a different order than with a sequential aggregation. Therefore, the combining delegate function must be associative and the initial value must be a unit element.
Fork-Join Parallelism
Another common parallel pattern is fork-join parallelism. As an example, consider the following sequential quicksort implementation:
static void SeqQuickSort < T > (T[] domain, int lo, int hi)
where T: IComparable < T > {
if (hi - lo <= Threshold) InsertionSort(domain, lo, hi);
int pivot = Partition(domain, lo, hi);
SeqQuickSort(domain, lo, pivot - 1);
SeqQuickSort(domain, pivot + 1, hi);
}
The algorithm is generic in the element type T and only requires that T instances can be compared. Under a certain threshold, the algorithm falls back on insertion sort, which performs better for a small number of elements. Otherwise, we partition the input array in two parts and quicksort each part separately. These two sorts can be performed in parallel since each sort works on a distinct part of the array. We can express this conveniently using the Parallel.Do method:
static void ParQuickSort < T > (T[] domain, int lo, int hi)
where T: IComparable < T > {
if (hi - lo <= Threshold) InsertionSort(domain, lo, hi);
int pivot = Partition(domain, lo, hi);
Parallel.Do(delegate {
ParQuickSort(domain, lo, pivot - 1);
}, delegate {
ParQuickSort(domain, pivot + 1, hi);
});
}
The Parallel.Do method is a static method that takes two or more delegates as arguments and potentially executes them in parallel. Since quick sort is recursive, a lot of parallelism is exposed because every invocation introduces more parallel tasks. Again, since the library does not guarantee parallel execution, most of the tasks are actually executed sequentially, which is essential for good performance.
Tasks and Futures
The previous examples all demonstrate structured parallelism where the scope of the parallel code is determined by the lexical scope. But not all parallel algorithms can be expressed this way. Fortunately, the library also provides support for general parallel tasks:
class Task {
Task(Action action);
void Wait();
void Cancel();
bool IsCompleted {
get;
}
...
}
A task is created by supplying an associated action that can potentially be executed in parallel. The action is executed somewhere between the creation time of the task and the first call to the Wait method. The associated action may be executed in parallel on another thread, but there is a guarantee that the action will not migrate among threads. This is a useful guarantee since programmers can use thread affine abstractions like Windows critical sections without having to worry, for example, that the LeaveCriticalSection is executed in a different thread than the EnterCriticalSection. If a task is already completed, Wait returns immediately.
Any exception that is raised in the associated action is stored in a task and raised again whenever Wait is called. Similarly, the Parallel.For and Parallel.Do functions accumulate all exceptions thrown and are reraised when all tasks complete. This ensures that exceptions are never lost and are properly propagated to dependents.
Finally, you can cancel the task and all tasks created in its associated actions (child tasks) by calling Cancel. Cancellation is not pre-emptive and a running task must cooperatively quit doing work by calling back into the library. This can be done, for example, by creating new tasks or calling the Wait method. If the parent task has been canceled, these library calls will raise a (synchronous) OperationCanceled exception to stop the action.
You can see tasks as an improved thread pool where work items return a handle that can be canceled or waited upon, and where exceptions are propagated. There is also a variation of tasks, called futures, where the associated action computes a result:
class Task < T > : Task {
Task(Func < T > function);
T Value {
get;
}
// does an implicit wait
}
A future, which is a task that computes a result, is constructed not with a normal action but with an action that returns a result. This result is a delegate with the Func<T> type where T is the type of the future value.
The result of the future is retrieved through the Value property. The Value property calls Wait internally to ensure that the task has completed and the result value has been computed. Since Wait is called, calling Value throws any exceptions raised during the computation of the value. One way to look at this is to think of a future as having either a value or an exceptional value, determined by the computation.
Futures are an old concept already implemented in multi-lisp. Note though that our notion of a future is not "safe" in the sense that the programmer is responsible for properly locking shared memory. This is in contrast to approaches where the action of a future is wrapped automatically in a memory transaction.
The future abstraction works well with symbolic code that is less structured than loops. For example, consider the following definition of binary tree nodes and leafs:
class Node: Tree {
int depth; // The depth of the tree
Tree left; // The left sub tree
Tree right; // The right sub tree
...
}
class Leaf: Tree {
int value; // values are stored in the leafs
...
}
Now suppose we define a virtual Sum method on a Tree that sums up all the values of the leaves. A leaf simply returns its value. Nodes add the sums of their sub-trees:
override int Sum() {
int l = left.Sum();
int r = right.Sum();
return (r + l);
}
In this case, each child computation can be done in parallel since they are independent. The parallelism here is lexically scoped and we can use Parallel.Do, but for the sake of demonstration, we use futures:
override int Sum() {
Task < int > l = new Task < int > (left.Sum);
int r = right.Sum();
return (r + l.Value);
}
For each left subtree, we create a new future of type int, passing a delegate as the constructor argument. In this sample, we pass the sum method of the left child, left.Sum, without calling it. We continue by calculating the sum of the right subtree. By creating the future, other processors could potentially start evaluating the sum of the left subtree in parallel. In the end, we request the value of the future using the Value property.
If the task has already been calculated by another worker thread, this call immediately returns the result value—great! If the task is still running on another worker thread, we block until the result is available (but another worker thread is scheduled to maintain the concurrency level).
There is another very common scenario where the task has not been started at all. In this case, the call to Value executes the task directly on the calling thread. This is common and very efficient since we just make an indirect method call. In contrast, when we wait on OS-supplied signals, it is not possible to make the signal happen and the only thing that can be done is to block the calling thread, which is generally bad for performance. In our case, it is clear how to calculate the value and the library executes the associated action directly instead of blocking the thread.
Since the amount of work done in the leaves is very small, it might be better to increase the size of work done by each task by doing the summation sequentially at a certain tree depth. For example, if we assume a sequential sum method SeqSum, we can write:
override int Sum() {
if (depth < 10) return SeqSum();
Task < int > l = new Task < int > (left.Sum);
int r = right.Sum();
return (r + l.Value);
}
In general, determining the right threshold limit depends on the amount of work done in the action of a task, compared to the cost of allocating a task object. In our experience, allocation is fairly cheap and the threshold limit is usually around 100 floating point multiplications.
Since futures are true first-class values, you can use futures to introduce parallelism between logically distinct parts of a program. For example, you can store futures in data structures where another distinct phase actually requests the values of these futures. An appropriate venue for this is gaming applications. One phase could calculate, for instance, the new health of all characters as a future, while other phases use the values of those health futures later on. On a multi-core machine, those futures can be calculated in parallel with the work done in each phase.
Replicable Tasks
The library is built upon just two primitive concepts: tasks and replicable tasks. All other abstractions, such as futures and parallel for loops, are expressed in terms of those two primitives. This ensures that operations behave in a regular way with consistent semantics. For example, exceptions are always nicely propagated and all abstractions can be canceled (including parallel for loops).
Note that replicable tasks are really meant for library writers that want to extend the standard abstractions offered by TPL, and they should only be used sparingly (if ever) in normal code. A replicable task derives directly from a normal task, like this:
class ReplicableTask : Task { ReplicableTask( Action action ); }
A replicable task represents a task that can itself be executed by multiple threads at the same time and captures the ubiquitous apply-to-all concurrency pattern while abstracting from the dynamics of work distribution. The constructor takes an action delegate that is potentially executed in parallel on another thread and potentially executed by multiple threads at the same time. If an exception is raised in any of those executions, only one of them is stored and thrown again by Wait.
A replicable task can be used if other threads can potentially participate in doing the work. All the Parallel.For and Parallel.ForEach variations are implemented using replicable tasks. For example, we can naïvely implement the basic Parallel.For as:
static void For(int from, int to, Action < int > body) {
int index = from;
ReplicableTask rtask = new ReplicableTask(delegate {
int i;
while ((i = Interlocked.Increment(ref index) - 1) < to) {
body(i);
}
});
rtask.Wait();
}
Since all replicable tasks share the index variable, the actual action delegate that is passed to the replicable task constructor can be executed by as many threads as needed. In the implementation, this can be used to let idling processors participate in the work.
In this implementation, each worker thread claims one index at a time. This corresponds to the dynamic(1) strategy of OpenMP and works generally well when the workload of each index can differ significantly. (For more on this, see msdn.microsoft.com/msdnmag/issues/05/10/OpenMP.) But this strategy can lead to cache contention when the workload per index is small. In that case, it would be better to process a stride of indices at once. Look at this variation of Parallel.For that corresponds to the dynamic(n) strategy of OpenMP and takes a stride as an argument:
static void ForWithStride(int stride, int from, int to, Action < int > body) {
int index = from;
if (stride <= 0) stride = 1;
ReplicableTask rtask = new ReplicableTask(delegate {
int i;
while ((i = Interlocked.Add(ref index, stride) - stride) < to) {
int end = Math.Min(i + stride, to);
do {
body(i);
i++;
} while (i < end)
}
});
rtask.Wait();
}
Replicable tasks are a powerful abstraction for implementing different parallel iteration strategies. However, in our experience, the standard Parallel.For and Foreach implementations work well in many scenarios and we hope there won't be much need for the extra expressiveness of replicable tasks in practice.
The Task Manager
All tasks belong to a task manager, which, as the name implies, manages the tasks and oversees worker threads to execute tasks. While there is always a default task manager available, an application can also explicitly create a task manager. The task manager interface is defined as:
class TaskManager: IDisposable {
TaskManager();
TaskManager(int maxConcurrentThreads);
static TaskManager Current {
get;
}
int MaxConcurrentThreads {
get;
}
...
}
class Task {
Task(TaskManager taskManager, Action action)
...
}
A task manager has an associated concurrency level that is returned by the MaxConcurrentThreads property. This specifies to the manager the ideal number of threads executing tasks at any given time. It is a hint, so if the manager needs to use more to make forward progress, it will do so dynamically. When you create a task manager, you can supply this number explicitly; by default it is equal to the number of processors.
In general, you are never required to create a task manager explicitly since a default is always available. However, you might want to use multiple task managers where each has a different concurrency level or each handles a separate set of tasks. In that case, you can create a new task manager and use the special Task constructor that takes a task manager as its first argument and executes that task and all of its children in that task manager. For example, consider the following code:
using(TaskManager tm = new TaskManager(2)) {
// only use 2 worker
// threads for tasks
new Task(tm, delegate {
// tasks created in this delegate use the tm task manager by default
...
// finally, show some statistics
Console.WriteLine("statistics: " + tm);
}).Wait();
}
Another important use of the task manager interface is to run all code sequentially using a single worker thread. This means that all tasks and parallel for loops are executed sequentially. This is excellent for debugging purposes where you can verify whether the code functions correctly when executed sequentially before running it in parallel on a multi-processor machine.
Daan Leijen is a Researcher at Microsoft Research. His current interests are declarative and functional programming, type inference systems, and parallel computing.
Judd Hall is a Program Manager II at Microsoft.