February 2009

Volume 24 Number 02

.NET Matters - Ordered Execution With ThreadPool

By Stephen Toub | February 2009

QMany components in my system need to execute work asynchronously, which makes me think that the Microsoft .NET Framework ThreadPool is the right solution. However, I have what I believe is a unique requirement: each component needs to ensure that its work items are processed in order and that, as a result, no two of its work items are executed at the same time. It's OK, though, for multiple components to execute concurrently with each other; in fact, that's desired. Do you have any recommendations?

AThis isn't as unique a predicament as you might think, as it occurs in a variety of important scenarios, including ones based on message passing. Consider a pipeline implementation that obtains parallelism benefits by having multiple stages of the pipeline active at any one time.

For example, you could have a pipeline that reads in data from a file, compresses it, encrypts it, and writes it out to a new file. The compression can be done concurrently with the encryption, but not on the same data at the same time, since the output of one needs to be the input to the other. Rather, the compression routine can compress some data and send it off to the encryption routine to be processed, at which point the compression routine can work on the next piece of data.

Since many compression and encryption algorithms maintain a state that affects how future data is compressed and encrypted, it's important that ordering is maintained. (Never mind that this example deals with files, and it'd be nice if you could decrypt and decompress the results in order to get back the original with all of the data in the correct order.)

There are several potential solutions. The first solution is simply to dedicate a thread to each component. This DedicatedThread would have a first-in-first-out (FIFO) queue of work items to be executed and a single thread that services that queue. When the component has work to be run, it dumps that work into the queue, and eventually the thread will get around to picking up the work and executing it. Since there's only one thread, only one item will be run at a time. And as a FIFO queue is being used, the work items will be processed in the order they were generated.

As with the example I provided in the January 2008 .NET Matters column, I'll use a simple WorkItem class to represent the work to be executed, shown in Figure 1. An implementation of DedicatedThread that uses this WorkItem type is shown in Figure 2. The bulk of the implementation is in a naive BlockingQueue<T> implementation (the .NET Framework 4.0 includes a BlockingCollection<T> type that would be a better fit for an implementation like this). The constructor of DedicatedThread simply creates a BlockingQueue<T> instance, then spins up a thread that continually waits for another item to arrive in the queue and then executes it.

Figure 1 Capturing a Work Item

internal class WorkItem { public WaitCallback Callback; public object State; public ExecutionContext Context; private static ContextCallback _contextCallback = s => { var item = (WorkItem)s; item.Callback(item.State); }; public void Execute() { if (Context != null) ExecutionContext.Run(Context, _contextCallback, this); else Callback(State); } }

Figure 2 DedicatedThread Implementation

public class DedicatedThread { private BlockingQueue<WorkItem> _workItems = new BlockingQueue<WorkItem>(); public DedicatedThread() { new Thread(() => { while (true) { workItems.Dequeue().Execute(); } }) { IsBackground = true }.Start(); } public void QueueUserWorkItem(WaitCallback callback, object state) { _workItems.Enqueue(new WorkItem { Callback = callback, State = state, Context = ExecutionContext.Capture() }); } private class BlockingQueue<T> { private Queue<T> _queue = new Queue<T>(); private Semaphore _gate = new Semaphore(0, Int32.MaxValue); public void Enqueue(T item) { lock (_queue) _queue.Enqueue(item); _gate.Release(); } public T Dequeue() { _gate.WaitOne(); lock (_queue) return _queue.Dequeue(); } } }

This provides the basic functionality for your scenario and it may meet your needs, but there are some important downsides. First, a thread is being reserved for each component. With one or two components, that may not be a problem. But for a lot of components, this could result in a serious explosion in the number of threads. That can lead to bad performance.

This particular implementation is also not extremely robust. For example, what happens if you want to tear down a component—how do you tell the thread to stop blocking? And what happens if an exception is thrown from a work item?

As an aside, it's interesting to note that this solution is similar to what Windows uses in a typical message pump. The message pump is a loop waiting for messages to arrive, dispatching them (processing them), then going back and waiting for more. The messages for a particular window are processed by a single thread. The similarities are demonstrated by the code in Figure 3, which should exhibit behavior very much like the code in Figure 2. A new thread is spun up that creates a Control, ensures that its handle has been initialized, and uses Application.Run to execute a message loop. To queue a work item to this thread, you simply use the Control's Begin­Invoke method. Note that I'm not recommending this approach, but rather just pointing out that, at a high level, it's the same basic concept as the DedicatedThread solution already shown.

Figure 3 Similarities with a UI Message Loop

public class WindowsFormsDedicatedThread { private Control _control; public WindowsFormsDedicatedThread() { using (var mre = new ManualResetEvent(false)) { new Thread(() => { _control = new Control(); var forceHandleCreation = _control.Handle; mre.Set(); Application.Run(); }) { IsBackground = true }.Start(); mre.WaitOne(); } } public void QueueUserWorkItem(WaitCallback callback, object state) { _control.BeginInvoke(callback, state); } }

A second solution involves using the ThreadPool for execution. Rather than spinning up a new, custom thread per component that services a private queue, we'll keep just the queue per component, such that no two elements from the same queue will ever be serviced at the same time. This has the benefits of allowing the ThreadPool itself to control how many threads are needed, to handle their injection and retirement, to handle reliability issues, and to get you out of the business of spinning up new threads, which is infrequently the right thing to do.

An implementation of this solution is shown in Figure 4. The Fifo­Execution class maintains just two fields: a queue of work items to be processed, and a Boolean value that indicates whether a request has been issued to the ThreadPool to process work items. Both of these fields are protected by a lock on the work items list. The rest of the implementation is simply two methods.

Figure 4 Implementing FifoExecution

public class FifoExecution { private Queue<WorkItem> _workItems = new Queue<WorkItem>(); private bool _delegateQueuedOrRunning = false; public void QueueUserWorkItem(WaitCallback callback, object state) { var item = new WorkItem { Callback = callback, State = state, Context = ExecutionContext.Capture() }; lock (_workItems) { _workItems.Enqueue(item); if (!_delegateQueuedOrRunning) { _delegateQueuedOrRunning = true; ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null); } } } private void ProcessQueuedItems(object ignored) { while (true) { WorkItem item; lock (_workItems) { if (_workItems.Count == 0) { _delegateQueuedOrRunning = false; break; } item = _workItems.Dequeue(); } try { item.Execute(); } catch { ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null); throw; } } } }

The first method is QueueUserWorkItem, with a signature that matches that exposed by the ThreadPool (the ThreadPool also provides a convenience overload that accepts just a WaitCallback, an overload you could choose to add). The method first creates a WorkItem to be stored and then takes the lock. (No shared state is accessed while creating the WorkItem. Thus, in order to keep the lock as small as possible, this capturing of the item is done before taking the lock.) Once the lock is held, the created work item is enqueued onto the work item queue.

The method then checks whether a request has been made to the ThreadPool to process queued work items, and, if one hasn't been made, it makes such a request (and notes it for the future). This request to the ThreadPool is simply to use one of the ThreadPool's threads to execute the ProcessQueuedItems method.

When invoked by a ThreadPool thread, ProcessQueuedItems enters a loop. In this loop, it takes the lock and, while holding the lock, it checks whether there are any more work items to be processed. If there aren't any, it resets the request flag (such that future queued items will request processing from the pool again) and exits. If there are work items to be processed, it grabs the next one, releases the lock, executes the processing, and starts all over again, running until there are no more items in the queue.

This is a simple yet powerful implementation. A component may now create an instance of FifoExecution and use it to schedule work items. Per instance of FifoExecution, only one queued work item will be able to execute at a time, and queued work items will execute in the order they were queued. Additionally, work items from distinct FifoExecution instances will be able to execute concurrently. And the best part is that you're now out of the business of thread management, leaving all of the hard (but very important) work of thread management to the ThreadPool.

In the extreme case, where every component is keeping the pool saturated with work, the ThreadPool will likely ramp up to having one thread per component, just like in the original Dedicated­Thread implementation. But that will only happen if it's deemed appropriate by the ThreadPool. If components aren't keeping the pool saturated, many fewer threads will be required.

There are additional benefits, such as letting the ThreadPool do the right thing with regard to exceptions. In the DedicatedThread implementation, what happens if the processing of an item throws an exception? The thread will come crashing down, but depending upon the application's configuration, the process may not be torn down. In that case, work items will start queueing up to the DedicatedThread, but none will ever get processed. With Fifo­Execution, the ThreadPool will just end up adding more threads to compensate for those that have gone away.

Figure 5shows a simple demo application that utilizes the FifoExecution class. This app has three stages in a pipeline. Each stage writes out the ID of the current piece of data it's working with (which is just the loop iteration). It then does some work (represented here by a Thread.SpinWait) and passes data (again, just the loop iteration) along to the next stage. Each step outputs its information with a different number of tabs so that it's easy to see the results separated out. As you can observe in the output shown in Figure 6, each stage (a column) is keeping the work ordered correctly.

Figure 5 Demonstration of FifoExecution

static void Main(string[] args) { var stage1 = new FifoExecution(); var stage2 = new FifoExecution(); var stage3 = new FifoExecution(); for (int i = 0; i < 100; i++) { stage1.QueueUserWorkItem(one => { Console.WriteLine("" + one); Thread.SpinWait(100000000); stage2.QueueUserWorkItem(two => { Console.WriteLine("\t\t" + two); Thread.SpinWait(100000000); stage3.QueueUserWorkItem(three => { Console.WriteLine("\t\t\t\t" + three); Thread.SpinWait(100000000); }, two); }, one); }, i); } Console.ReadLine(); }

Figure 6 Output from Demo Application

It's also interesting to note that there's a lack of fairness between the stages of the pipeline. You can see, for example, that stage1 in Figure 6is already up to iteration 21, while stage2 is still back on 13 and stage3 is on 9. This is largely due to my implementation of ProcessQueuedItems. The sample app is very quickly pushing 100 work items into stage1, and thus the thread from the pool that services stage1 will likely sit in the ProcessQueuedItems loop and not return until there's no more stage1 work. This gives it an unfair bias over the other stages. If you see similar behavior in your app, and it's a problem, you can increase fairness between the stages by modifying the implementation of ProcessQueuedItems to one more like the following:

private void ProcessQueuedItems(object ignored) { WorkItem item; lock (_workItems) { if (_workItems.Count == 0) { _delegateQueuedOrRunning = false; return; } item = _workItems.Dequeue(); } try { item.Execute(); } finally { ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null); } }

Now, even if there are more items to be processed, ProcessQueuedItems won't loop around but rather will recursively queue itself to the ThreadPool, thus prioritizing itself behind items from other stages. With this modification, the output from the application in Figure 5now looks like that shown in Figure 7. You can see in this new output that scheduling is indeed treating stage2 and stage3 with more fairness than before (there's still some lag between the stages, but that's to be expected given that this is a pipeline).

Figure 7 New Output with Fairer Scheduling

Of course, this increased fairness doesn't come for free. Each work item now incurs an extra trip through the scheduler, which adds some cost. You'll need to decide whether this is a trade-off you can make for your application; for example, if the work you're doing in your work items is at all substantial, this overhead should be negligible and unnoticeable.

This is just one more example of how it's possible to build systems on top of the ThreadPool that add functionality without having to build custom thread pools yourself. For other examples, see previous editions of the .NET Matterscolumn in MSDN Magazine.

Send your questions and comments to netqa@microsoft.com.

Stephen Toubis a Senior Program Manager on the Parallel Computing Platform team at Microsoft. He is also a Contributing Editor for MSDN Magazine.