Grandiosely Serialized Tasks
No one can accuse me of spamming the MSDN blog site with too many posts, but after months of writing nothing, I recently and suddenly felt inspired to get something posted again. Maybe it had something to do with the release of Beta 2 of VS 2010 and .NET Framework 4?
Apple recently shipped the latest rev of its Mac OS X operating system, version 10.6. Among other features, it contains a thread pool called Grand Central Dispatch. While there is significant feature overlap between GCD and in the Vista thread pool, GCD has made some serious improvements in usability, similar in some ways to what we’re doing with PPL and ConcRT.
The significant one, in my opinion, is the addition of code blocks to the C/C++/Objective-C languages. Code blocks are similar to C++ lambdas, but since they are available also in C and Objective-C, the OS APIs can take full advantage of them, and GCD does. Just like lambdas add tremendous usability to PPL, code blocks make it much easier to take advantage of the features offered by GCD to define tasks.
Here’s an example lifted from the GCD documentation:
int x = 123;
int y = 456;
void (^aBlock)(int) = ^(int z){ printf(“%d %d %d\n”, x, y, z);};
aBlock(789);
Which is very similar to how you would do it with C++ lambdas:
int x = 123;
int y = 456;
auto aBlock = [=](int z){ printf("%d %d %d\n", x, y, z); };
aBlock(789);
GCD combines this with functions that dispatch blocks onto queues, just like any thread pool would, just with more ease.
What I wanted to explore, though, was one feature in particular: serial queues. What a serial queue gives you is a guarantee that there will be only one worker thread for all tasks that are added to the queue. Sometimes referred to as turn-based scheduling, this approach allows code to avoid using locks for mutual exclusion, which turns out to be especially advantageous for high-contention situations where the chance of having to block in the kernel, a very expensive operation, is high.
In Axum, we use a similar approach to protect domain state, although we go a step farther and separate the state readers from the state writers. GCD’s serial queues effectively treat every block as a writer, which allows for a simpler implementation with lower per-task overhead, but also less opportunity for parallelism between readers.
The Vista thread pool has this capability, too, but there’s nothing in PPL that supports this scenario: even using a thread-based scheduler, setting both the min and max concurrency to ‘1,’ and using a task_group still allows a task-creating thread to execute tasks when waiting for the group to complete. Going directly to the scheduler interfaces to call ScheduleTask doesn’t help – any call to Oversubscribe(true) will increase the thread count temporarily.
Luckily, the functionality is simple enough that it only takes a few lines to implement our own solution on top of the Windows thread pool. For good measure, we’ll throw in cancellation support, too, and make the API conform to PPL’s style. In fact, we can rely on the older thread pool which was available from Windows 2000 forward, so this is not a Vista-only feature. ConcRT, which the code depends on, does not support Windows 2000, just XP and later versions.
The full source code is available here. It was built with Beta2 and will not work with earlier versions. Specifically, concurrent_queue is new in Beta2.
Let’s go through it step by step:
The class we’re creating is called ‘serial_task_group,’ as it is meant to be used in the same way that the PPL ‘task_group’ and ‘structured_task_group’ are used. It is a non-template class with template methods.
The constructor is very simple as we are not associating with a ConcRT scheduler or defining scheduler policies. The event is set initially, so that any code calling wait or cancel will immediately return since the task group is initially empty.
serial_task_group() : taskCount(0), canceling(0) { evnt.set(); }
‘taskCount’ and ‘canceling’ are volatile 64-bit integers that will be used for wait and cancellation support, respectively. Together with a ConcRT event and a concurrent queue, they form the data members of the class:
volatile unsigned __int64 taskCount;
volatile unsigned __int64 canceling;
::Concurrency::event evnt;
::Concurrency::concurrent_queue<_task_info *> queue;
Each time a task is added, ‘taskCount’ is incremented, each time one completes, it is decremented. ‘canceling’ is incremented for each cancellation request, and decremented when the request is met.
Just like with task_group, tasks are scheduled using a template ‘run’ method, which is how the lambda magic is leveraged:
template <class Func>
void run(const Func& fn)
{
long tasks = InterlockedIncrement(&this->taskCount);
if (tasks > 0)
evnt.reset();
_Schedule_task(fn, tasks == 1);
}
‘wait’ and ‘cancel,’ which should really be called ‘cancel-and-wait,’ are both implemented synchronously, waiting for the event to be set before returning. Cancellation works by communicating to the implementation that requests to cancel have been registered, which prevents further scheduling from happening until the task count goes to 0.
void wait()
{
evnt.wait();
}
void cancel()
{
InterlockedIncrement(&this->canceling);
evnt.wait();
InterlockedDecrement(&this->canceling);
}
Those methods form the public interface of the class. The more interesting parts of the implementation is in the private stuff.
First, the _Schedule_task method, which places the newly submitted task at the end of a queue and creates a worker by calling QueueUserWorkItem. The actual decision to create a worker is made by the run method when the task count is incremented from 0 to 1.
Second, since Win32 doesn’t understand C++ functors, we need to supply something else as the argument to QueueUserWorkItem, which is what the static _Task_proc method is for. Its being static complicates things, as we have to pass it the ‘this’ pointer together with the functo. Therefore, we place not just the functor on the queue, but a _task_info record which contains both the pieces we need.
template <class Func>
void _Schedule_task(const Func& fn, bool createWorker)
{
// Create a task info record and queue it up.
_task_info *pInfo = new _task_info;
pInfo->pFunc = new Func(fn);
pInfo->pTaskGroup = this;
queue.push(pInfo);
// Request a new worker thread if necessary.
if (createWorker)
QueueUserWorkItem((LPTHREAD_START_ROUTINE)_Task_proc<Func>, &queue, 0);
}
template<class Func>
static void _Task_proc(void* data)
{
concurrent_queue<_task_info *> *queue = (concurrent_queue<_task_info *> *)data;
_task_info *pInfo;
long tasks = 0;
serial_task_group *pTaskGroup = NULL;
// Loop until there is no more work to perform.
do
{
while (!queue->try_pop(pInfo))
{
Context::CurrentContext()->Yield();
}
Func* pFunc = (Func*) pInfo->pFunc;
pTaskGroup = ((_task_info*) pInfo)->pTaskGroup;
if (InterlockedXor(&pTaskGroup->canceling, 0L) == 0)
(*pFunc)();
delete pFunc;
delete pInfo;
tasks = InterlockedDecrement(&pTaskGroup->taskCount);
} while (tasks > 0);
pTaskGroup->evnt.set();
}
Creating a worker only when we do not already have one is important for performance: calling QueueUserWorkItem for each task means possibly having more than one running, so we then need to introduce locks to protect the functor invocation. As taking locks is what we’re trying to avoid in the first place, it would be a shame. Besides, if we’re using serial task groups to protect access to some data structure, having it in cache is more likely if as many tasks as possible are run by the same worker thread.
Unfortunately, it is possible to schedule a new task so that we increment the task count before the decrement is called, but hit the try_pop call before the task has been added to the queue:
Task-executing thread: | Task-creating thread: | |
Finish the last task; taskCount = 1 | ||
Increment taskCount; taskCount = 2 | ||
Decrement taskCount; taskCount = 1 | ||
Test ‘tasks’; go to the top of the loop | ||
Call try_pop on an empty queue | ||
Push the task onto the queue |
Therefore, if the remaining taskCount is greater than zero, but the queue is empty, we have to spin until the two agree. The right way to do this with ConcRT is to call ‘Yield’ on the current context, which (arggh!) forces us to include this undefine directive at the beginning of the file:
#include <windows.h>
#undef Yield
Everything else in the implementation is pretty straight-forward. There are no locks taken, but there is going to be some contention on the interlocked operations, which is less of a performance concern. The cost of calling QueueUserWorkItem is amortized over the number of tasks that are batched together in the queue at any point in time, but in low-contention situations, there is still a significant cost.
Ready to use it? Except for the task-group declaration, the code looks like any other PPL-based code:
serial_task_group tg;
for (int i = 0; i < 5; i++)
{
tg.run( [=] {
printf("running task %d from thread %d\n", i, GetCurrentThreadId()); } );
}
tg.wait();
Serial task groups are not particularly expensive: a couple of object allocations (one for the task group, one for internals of the concurrent queue), for a total just above 300 bytes. If the task group is on the stack, it’s even cheaper, of course. Scheduling tasks requires a push and a pop per task, as well as two object allocations.
Here’s a challenge for you: I have this sneaking suspicion that the cancellation functionality has a race in it, but I’m not sure. I haven’t had time to prove its absence, nor have I established the exact sequence of events that would cause it. Gold stars galore to anyone who manages to convince me one way or the other! Remember that with cancellation, it’s OK to miss cancelling some work – it’s impossible to guarantee it. What would be of concern, however, is if we get stuck waiting for the cancellation when it will never be signaled.