October 2009
Volume 24 Number 10
Concurrent Affairs - Four Ways to Use the Concurrency Runtime in Your C++ Projects
By Rick Molloy | October 2009
I'm often asked how to integrate the new parallel computing libraries in the Visual Studio 2010 Beta into existing C++ projects. In this column, I'll explain a few of the ways you can use the APIs and classes that are part of the Parallel Pattern Library (PPL), Asynchronous Agents Library, and Concurrency Runtime in your existing projects. I'll walk through four common scenarios developers face in multithreaded application development and describe how you can be productive right away, using the PPL and Agents Library to make your multithreaded program more efficient and more scalable.
One: Moving Work from a UI Thread to a Background Task
One of the first things you're told to avoid as a Windows developer is hanging the UI thread. The end-user's experience of an unresponsive window is incredibly unpleasant regardless of whether developers provide their customers with a waiting mouse pointer or Windows provides them with a hung UI in the form of a frosted-glass window. The guidance we're given is often rather terse: don't perform any blocking calls on the UI thread but instead move these calls to a background thread. My experience is that this guidance isn't sufficient and that the coding work involved is tedious, error-prone, and rather unpleasant.
In this scenario, I'll provide a simple serial example that shows how to move work by using the existing threading APIs. Then, I'll provide two approaches for moving work to a background thread using the PPL and Agents Library. I'll wrap up this scenario by tying the examples back to the specifics of a UI thread.
Move a Long-Running Serial Operation to a Background Thread
So what does it mean to move work to a background thread? If I have some function that is running a long or potentially blocking operation and I want to move that function to a background thread, a good deal of boilerplate code is involved in the mechanics of actually moving that work, even for something as simple as a single function call such as the one shown here:
void SomeFunction(int x, int y){
LongRunningOperation(x, y);
}
First, you need to package up any state that's going to be used. Here I'm just packaging up a pair of integers, so I could use a built-in container like a std::vector, a std::pair, or a std::tuple, but more typically what I've seen folks do is package up the values in their own struct or class, like this:
struct LongRunningOperationParams{
LongRunningOperationParams(int x_, int y_):x(x_),y(y_){}
int x;
int y;
}
Then you need to create a global or static function that matches the threadpool or CreateThread signature, unpackages that state (typically by dereferencing a void * pointer), executes the function, and then deletes the data if appropriate. Here's an example:
DWORD WINAPI LongRunningOperationThreadFunc(void* data){
LongRunningOperationParams* pData =
(LongRunningOperationParams*) data;
LongRunningOperation(pData->x,pData->y);
//delete the data if appropriate
delete pData;
}
Now you can finally get around to actually scheduling the thread with the data, which looks like this:
void SomeFunction(int x, int y){
//package up our thread state
//and store it on the heap
LongRunningOperationParams* threadData =
new LongRunningOperationParams(x,y);
//now schedule the thread with the data
CreateThread(NULL,NULL,&LongRunningOperationThreadFunc,
(void*) pData,NULL);
}
This might not seem like that much more code. Technically, I've added only two lines of code to SomeFunction, four lines for our class and three lines for the thread function. But that's actually four times as much code. We went from three lines of code to 12 lines of code just to schedule a single function call with two parameters. The last time I had to do something like this, I believe I had to capture approximately eight variables, and capturing and setting all this state becomes quite tedious and prone to error. If I recall correctly, I found and fixed at least two bugs just in the process of capturing the state and building the constructor.
I also haven't touched on what it takes to wait for the thread to complete, which typically involves creating an event and a call to WaitForSingleObject to track that handle and, of course, cleaning up the handle when you're done with it. That's at least three more lines of code, and that still leaves out handling exceptions and return codes.
An Alternative to CreateThread: The task_group Class
The first approach I'm going to describe is using the task_group class from the PPL. If you're not familiar with the task_group class, it provides methods for spawning tasks asynchronously via task_group::run and waiting for its tasks to complete via task_group::wait. It also provides cancellation of tasks that haven't been started yet and includes facilities for packaging an exception with std::exception_ptr and rethrowing it.
You'll see that significantly less code is involved here than with the CreateThread approach and that from a readability perspective, the code is much closer to the serial example. The first step is to create a task_group object. This object needs to be stored somewhere where its lifetime can be managed—for example, on the heap or as a member variable in a class. Next you use task_group::run to schedule a task (not a thread) to do the work. Task_group::run takes a functor as a parameter and manages the lifetime of that functor for you. By using a C++0x lambda to package up the state, this is effectively a two-line change to the program. Here's what the code looks like:
//a task_group member variable
task_group backgroundTasks;
void SomeFunction(int x, int y){
backgroundTasks.run([x,y](){LongRunningOperation(x, y);});
}
Making Work Asynchronous with the Agents Library
Another alternative is to use the Agents Library, which involves an approach based on message passing. The amount of code change is about the same, but there's a key semantic difference worth pointing out with an agent-based approach. Rather than scheduling a task, you build a message-passing pipeline and asynchronously send a message containing just the data, relying on the pipeline itself to process the message. In the previous case, I'd send a message containing x and y. The work still happens on another thread, but subsequent calls to the same pipeline are queued, and the messages are processed in order (in contrast to a task_group, which doesn't provide ordering guarantees).
First, you need a structure to contain the message. You could, in fact, use the same structure as the earlier one, but I'll rename it as shown here:
struct LongRunningOperationMsg{
LongRunningOperationMsg (int x, int y):m_x(x),m_y(y){}
int m_x;
int m_y;
}
The next step is to declare a place to send the message to. In the Agents Library, a message can be sent to any message interface that is a "target," but in this particular case the most suitable is call<T>. A call<T> takes a message and is constructed with a functor that takes the message as a parameter. The declaration and construction of the call might look like this (using lambdas):
call<LongRunningOperationMsg>* LongRunningOperationCall = new
call<LongRunningOperationMsg>([]( LongRunningOperationMsg msg)
{
LongRunningOperation(msg.x, msg.y);
})
The modification to SomeFunction is now slight. The goal is to construct a message and send it to the call object asynchronously. The call will be invoked on a separate thread when the message is received:
void SomeFunction(int x, int y){
asend(LongRunningOperationCall, LongRunningOperationMsg(x,y));
}
Getting Work Back onto the UI Thread
Getting work off the UI thread is only half the problem. Presumably at the end of LongRunningOperation, you're going to get some meaningful result, and the next step is often getting work back onto the UI thread. The approach to take varies based on your application, but the easiest way to achieve this in the libraries offered in Visual Studio 2010 is to use another pair of APIs and message blocks from the Agents Library: try_receive and unbounded_buffer<T>.
An unbounded_buffer<T> can be used to store a message containing the data and potentially the code that needs to be run on the UI thread. Try_receive is a nonblocking API call that can be used to query whether there is data to display.
For example, if you were rendering images on your UI thread, you could use code like the following to get data back onto the UI thread after making a call to InvalidateRect:
unbounded_buffer<ImageClass>* ImageBuffer;
LONG APIENTRY MainWndProc(HWND hwnd, UINT uMsg,
WPARAM wParam, LPARAM lParam)
{
RECT rcClient;
int i;
...
ImageClass image;
//check the buffer for images and if there is one there, display it.
if (try_receive(ImageBuffer,image))
DisplayImage(image);
...
}
Some details, like the implementation of the message loop, have been omitted here, but I hope this section was instructive enough to demonstrate the technique. I encourage you to check the sample code for the article, which has a full working example of each of these approaches.
Figure 1 A Non-Thread-Safe Class
samclass
Widget{
size_t m_width;
size_t m_height;
public:
Widget(size_t w, size_t h):m_width(w),m_height(h){};
size_t GetWidth(){
return m_width;
}
size_t GetHeight(){
return m_height;
}
void SetWidth(size_t width){
m_width = width;
}
void SetHeight(size_t height){
m_height = height;
}
};
Two: Managing Shared State with Message Blocks and Agents
Another common situation in multithreaded application development is managing shared state. Specifically, as soon as you try to communicate or share data between threads, managing shared state quickly becomes a problem you need to deal with. The approach I've often seen is to simply add a critical section to an object to protect its data members and public interfaces, but this soon becomes a maintenance problem and sometimes can become a performance problem as well. In this scenario, I'll walk through a serial and naïve example using locks, and then I'll show an alternative using the message blocks from the Agents Library.
Locking a Simple Widget Class
Figure 1 shows a non-thread-safe Widget class with width and height data members and simple methods that mutate its state.
The naïve approach to making the Widget class thread safe is to protect its methods with a critical section or reader-writer lock. The PPL contains a reader_writer_lock, and Figure 2 offers a first look at the obvious solution to the naïve approach: using the reader_writer_lock in the PPL.
Figure 2 Using the reader_writer_lock from the Parallel Pattern Library
class LockedWidget{
size_t m_width;
size_t m_height;
reader_writer_lock lock;
public:
LockedWidget (size_t w, size_t h):m_width(w),m_height(h){};
size_t GetWidth(){
auto lockGuard = reader_writer::scoped_lock_read(lock);
return m_width;
}
size_t GetHeight(){
auto lockGuard = reader_writer::scoped_lock_read(lock);
return m_height;
}
void SetWidth(size_t width){
auto lockGuard = reader_writer::scoped_lock(lock);
m_width = width;
}
void SetHeight(size_t height){
auto lockGuard = reader_writer::scoped_lock(lock)
m_height = height;
}
};
What I've done here is add a read_writer_lock as a member variable and then decorate all appropriate methods with either the reader or the writer version of the lock. I'm also using scoped_lock objects to ensure that the lock isn't left held in the midst of an exception. All the Get methods now acquire the reader lock, and the Set methods acquire the write lock. Technically, this approach looks like it is correct, but the design is actually incorrect and is fragile overall because its interfaces, when combined, are not thread safe. Specifically, if I have the following code, I'm likely to have corrupt state:
Thread1{
SharedWidget.GetWidth();
SharedWidget.GetHeight();
}
Thread2{
SharedWidget.SetWidth();
SharedWidget.SetHeight();
}
Because the calls on Thread1 and Thread2 can be interleaved, Thread1 can acquire the read lock for GetWidth, and then before GetHeight is called, SetWidth and SetHeight could both execute. So, in addition to protecting the data, you have to ensure that the interfaces to that data are also correct; this is one of the most insidious kinds of race conditions because the code looks correct and the errors are very difficult to track down. Naïve solutions I've seen for this situation often involve introducing a lock method on the object itself—or worse, a lock stored somewhere else that developers need to remember to acquire when accessing that widget. Sometimes both approaches are used.
An easier approach is to ensure that interfaces can be interleaved safely without exposing this ability to tear the state of the object between interleaved calls. You might decide to evolve your interface as shown in Figure 3 to provide GetDimensions and UpdateDimensions methods. This interface is now less likely to cause surprising behavior because the methods don't allow exposing unsafe interleavings.
Figure 3 A Version of the Interface with GetDimensions and UpdateDimensions Methods
struct WidgetDimensions
{
size_t width;
size_t height;
};
class LockedWidgetEx{
WidgetDimensions m_dimensions;
reader_writer_lock lock;
public:
LockedWidgetEx(size_t w, size_t h):
m_dimensions.width(w),m_dimensions.height(h){};
WidgetDimensions GetDimensions(){
auto lockGuard = reader_writer::scoped_lock_read(lock);
return m_dimensions;
}
void UpdateDimensions(size_t width, size_t height){
auto lockGuard = reader_writer::scoped_lock(lock);
m_dimensions.width = width;
m_dimensions.height = height;
}
};
Managing Shared State with Message Blocks
Now let's take a look at how the Agents Library can help make managing shared state easier and the code a little more robust. The key classes from the Agents Library that are useful for managing shared variables are overwrite_buffer<T>, which stores a single updatable value and returns a copy of the latest value when receive is called; single_assignment<T>, which stores and returns a copy of a single value when receive is called but, like a constant, can be assigned only once; and unbounded_buffer<T>, which stores an unlimited number of items (memory permitting) and, like a FIFO queue, dequeues the oldest item when receive is called.
I'll start by using an overwrite_buffer<T>. In the Widget class, I'll first replace the m_dimensions member variable with overwrite_buffer<WidgetDimensions>, and then I'll remove the explicit locks from the methods and replace them with the appropriate send and receive calls. I still need to worry about our interface being safe, but I no longer have to remember to lock the data. Here's how this looks in code. It's actually slightly fewer lines of code than the locked version and the same number of lines as the serial version:
class AgentsWidget{
overwrite_buffer<WidgetDimensions> m_dimensionBuf;
public:
AgentsWidget(size_t w, size_t h){
send(&m_dimensionBuf,WidgetDimensions(w,h));
};
WidgetDimensions GetDimensions(){
return receive(&m_dimensionBuf);
}
void UpdateDimensions(size_t width, size_t height){
send(&m_dimensionBuf,WidgetDimensions(w,h));
}
};
There's a subtle semantic difference here from the reader_writer lock implementation. The overwrite_buffer allows a call to UpdateDimensions to occur during a call to Dimensions. This allows practically no blocking during these calls, but a call to GetDimensions may be slightly out of date. It's worth pointing out that the problem existed in the locked version as well, because as soon as you get the dimensions, they have the potential to be out of date. All I've done here is remove the blocking call.
An unbounded_buffer can also be useful for the Widget class. Imagine that the subtle semantic difference I just described was incredibly important. For example, if you have an instance of an object that you want to ensure is accessed by only one thread at a time, you can use unbounded_buffer as an object holder that manages access to that object. To apply this to the Widget class, you can remove m_dimensions and replace it with unbounded_buffer<WidgetDimension> and use this buffer via the calls to GetDimensions and UpdateDimensions. The challenge here is to ensure that no one can get a value from our widget while it is being updated. This is achieved by emptying the unbounded_buffer so that calls to GetDimension will block waiting for the update to occur. You can see this in Figure 4. Both GetDimensions and UpdateDimensions block, waiting for exclusive access to the dimensions variable.
Figure 4 Emptying the Unbounded_Buffer
class AgentsWidget2{
unbounded_buffer<WidgetDimensions> m_dimensionBuf;
public:
AgentsWidget2(size_t w, size_t h){
send(&m_dimensionBuf,WidgetDimensions(w,h));
};
WidgetDimensions GetDimensions(){
//get the current value
WidgetDimensions value = receive(&m_dimensionBuf);
//and put a copy of it right back in the unbounded_buffer
send(&m_dimensionBuf,value);
//return a copy of the value
return WidgetDimensions(value);
}
void UpdateDimensions (size_t width, size_t height){
WidgetDimensions oldValue = receive(&m_dimensionBuf);
send(&m_dimensionBuf,WidgetDimensions(width,height));
}
};
It's Really About Coordinating Access to the Data
I want to stress one more thing about our Widget class: ensuring that methods and data that can be accessed concurrently work "safely" together is critical. Often, this can be achieved by coordinating access to state rather than by locking methods or objects. From a pure "lines of code" perspective, you won't see a big win over the locked example, and, in particular, the second example might even have a little more code. What is gained, however, is a safer design, and with a little thought, you can often modify serial interfaces so that the internal state of the object isn't "torn." In the Widget example, I did this by using message blocks, and I was able to protect that state in such a way that it is safer. Adding methods or functionality to the Widget class in the future is less likely to destroy the internal synchronization we've set up. With a member lock, it's pretty easy to simply forget to lock the lock when a method is added on a class. But moving operations to a message-passing model and using message blocks such as the overwrite buffer in their natural way can often keep data and classes synchronized.
Three: Using Combinable for Thread Local Accumulations and Initialization
The second scenario, in which we protected access to an object with locks or message blocks, works very well for heavier weight objects that are accessed infrequently. If while reading that example you thought that there might be a performance problem if the synchronized widget were used in a tight (and parallel) loop, you're probably right. That's because protecting shared state can be problematic, and for completely general purpose algorithms and objects that truly share state, there unfortunately aren't a lot of options other than to coordinate access or introduce a lock. But you can almost always find a way to refactor code or an algorithm to relax the dependency on shared state, and once you've done this, a few specific but common patterns in which an object calls combinable<T> in the Parallel Pattern Library can really help out.
Combinable<T> is a concurrent container that offers support for three broad use cases: holding a thread-local variable or performing thread-local initialization, performing associative binary operations (like sum, min, and mix) on the thread-local variables and combining them, and visiting each thread-local copy with an operation (like splicing lists together). In this section, I'll explain each of these cases and provide examples of how to use them.
Holding a Thread-Local Variable or Performing Thread-Local Initialization
The first use case I mentioned for combinable<T> was for holding a thread-local variable. It is relatively common to store a thread-local copy of global state. For example, in the colorized ray tracers applications like the one in our sample pack (code.msdn.microsoft.com/concrtextras) or in the samples for parallel development with .NET 4.0 (code.msdn.microsoft.com/ParExtSamples) there is an option to colorize each row by thread to visualize the parallelism. In the native version of the demo, this is done by using a combinable object that holds the thread-local color.
You can hold a thread-local variable, of course, by using thread-local storage (TLS), but there are some disadvantages—most notably lifetime management and visibility, and these go hand in hand. To use TLS, you first need to allocate an index with TlsAlloc, allocate your object, and then store a pointer to your object in the index with TlsSetValue. Then, when your thread is exiting, you need to ensure that your object is deallocated. (TlsFree is called automatically.) Doing this once or twice per thread and ensuring that there aren't any leaks because of early exits or exceptions isn't that challenging, but if your application needs dozens or hundreds of these items, a different approach is likely better.
Combinable<T> can be used to hold a thread-local value as well, but the lifetimes of the individual objects are tied to the lifetime of the combinable<T> item, and much of the initialization is automated. You access the thread-local value simply by calling the combinable::local method, which returns a reference to the local object. Here's an example using task_group, but this can be done with Win32 threads as well:
combinable<int> values;
auto task = [&](){
values.local() = GetCurrentThreadId();
printf("hello from thread: %d\n",values.local());
};
task_group tasks;
tasks.run(task);
//run a copy of the task on the main thread
task();
tasks.wait();
I mentioned that thread-local initialization can also be achieved with combinable. If, for example, you need to initialize a library call on each thread on which it is used, you can create a class that performs the initialization in its constructor. Then, on the first use per thread, the library call will be made, but it will be skipped on subsequent uses. Here's an example:
class ThreadInitializationClass
{
public:
ThreadInitializationClass(){
ThreadInitializationRoutine();
};
};
...
//a combinable object will initialize these
combinable<ThreadInitializationClass> libraryInitializationToken;
...
//initialize the library if it hasn't been already on this thread
ThreadInitializationClass& threadInit = libraryInitalizationToken.local();
Performing Reductions in a Parallel Loop
Another major scenario for the combinable object is to perform thread-local reductions, or thread-local accumulations. Specifically, you can avoid a particular type of race condition when parallelizing loops or in recursive parallel traversals with combinable. Here's an incredibly naïve example that's not intended to show speed-ups. The following code shows a simple loop that looks like it can be parallelized with parallel_for_each, except for access to the sum variable:
int sum = 0;
for (vector<int>::iterator it = myVec.begin(); it != myVec.end(); ++it) {
int element = *it;
SomeFineGrainComputation(element);
sum += element;
}
Now, rather than placing a lock in our parallel_for_each, which destroys any chance we had of speed-ups, we can use a combinable object to calculate the thread-local sums:
combinable<int> localSums;
parallel_for_each(myVec.begin(), myVec.end(), [&localSums] (int element) {
SomeFineGrainComputation(element);
localSums.local() += element;
});
We've now successfully avoided the race condition, but we have a collection of thread-local sums stored in the localSums object, and we still need to extract the final value. We can do this with the combine method, which takes a binary functor like the following:
int sum = localSums.combine(std::plus<int>);
The third use case for combinable<T>, which involves using the combine_each method, is when you need to visit each of the thread-local copies and perform some operation on them (like cleanup or error checking). Another, more interesting example is when your combinable object is a combinable<list<T>>, and in your threads you are building up std::lists or std::sets. In the case of std::lists, they can easily be spliced together with list::splice; with std::sets, they can be inserted with set::insert.
Four: Converting an Existing Background Thread to an Agent or a Task
Suppose you already have a background or worker thread in your application. There are some very good reasons why you might want to convert that background thread to a task from the PPL or to an agent, and doing so is relatively straightforward. Some of the major advantages of doing this include the following:
Composability and performance. If your worker threads are compute intensive and you are considering using additional threads in the PPL or Agents Library, converting your background thread to a worker task allows it to cooperate with the other tasks in the runtime and avoid oversubscription on the system.
Cancellation and exception handling. If you want to be able to easily cancel work on a thread or have a well-described mechanism for handling exceptions, a task_group has these capabilities built in.
Control flow and state management. If you need to manage the state of your thread (started or completed, for example) or have an object whose state is effectively inseparable from the worker thread, implementing an agent might be useful.
Task_group Offers Cancellation and Exception Handling
In the first scenario, we explored what it takes to schedule work with a task_group: essentially packaging your work into a functor (using a lambda, an std::bind or a custom function object) and scheduling it with the task_group::run method. What I didn't describe was the cancellation and exception-handling semantics, which are, in fact, related.
Figure 5 Implementation of MyAgentClass
class MyAgentClass : public agent{
public:
MyAgentClass (){
}
AgentsWidget widget;
void run(){
//run is started asynchronously when agent::start is called
//...
//set status to complete
agent::done();
}
};
First, I'll explain the straightforward semantics. If your code makes a call to task_group::cancel or a task throws an uncaught exception, cancellation is in effect for that task_group. When cancellation is in effect, tasks that haven't been started on that task_group won't be started, which allows scheduled work to easily and quickly be canceled on a task_group. Cancellation doesn't interrupt tasks that are running or blocked, so a running task can query the cancellation status with the task_group::is_canceling method or by the helper function
is_current_task_group_canceling. Here's a brief example:
task_group tasks;
tasks.run([](){
...
if(is_current_task_group_canceling())
{
//cleanup then return
...
return;
}
});
tasks.cancel();
tasks.wait();
Exception handling impacts cancellation because an uncaught exception in a task_group triggers cancellation on that task_group. If there is an uncaught exception, the task_group will actually use std::exception_ptr to package up the exception on the thread it was thrown on. Later, when task_group::wait is called, the exception is rethrown on the thread that called wait.
Implementing an Asynchronous Agent
The Agents Library offers an alternative to using a task_group: replacing a thread with the agent base class. If your thread has a lot of thread-specific state and objects, an agent might be a better fit for the scenario. The abstract agent class is an implementation of the actor pattern; the intended usage is to implement your own class derived from agent and then encapsulate any state that your actor (or thread) may have into that agent. If there are fields that are intended to be publicly accessible, the guidance is to expose them as message blocks or sources and targets and use message passing to communicate with the agent.
Implementing an agent requires deriving a class from the agent base class and then overriding the virtual method run. The agent can then be started by calling agent::start, which spawns the run method as a task, much like a thread. The advantage is that thread-local state can now be stored in the class. This allows for easier synchronization of state between threads, particularly if the state is stored in a message block. Figure 5 shows an example of an implementation that has a publicly exposed member variable of type AgentsWidget.
Note that I've set the agent's status to done as the run method is exiting. This allows the agent to not only be started but also be waited on. Furthermore, the agent's current status can be queried by a call to agent::status. Starting and waiting on our agent class is straightforward, as the following code shows:
MyAgentClass MyAgent;
//start the agent
MyAgent.start();
//do something else
...
//wait for the agent to finish
MyAgent.wait(&MyAgent);
Bonus Item: Sorting in Parallel with parallel_sort
Finally, I'd like to suggest another potentially easy point of parallelization, this time not from the PPL or the Agents Library but from our sample pack available at code.msdn.microsoft.com/concrtextras. Parallel quicksort is one of the examples we use for explaining how to parallelize recursive divide-and-conquer algorithms with tasks, and the sample pack contains an implementation of parallel quicksort. Parallel sort can show speed-ups if you're sorting a large number of items where the comparison operation is somewhat expensive, as with strings. It probably won't show speed-ups for small numbers of items or when sorting built-in types like integers and doubles. Here's an example of how it can be used:
//from the sample pack
#include "parallel_algorithms.h"
int main()
using namespace concurrency_extras;
{
vector<string> strings;
//populate the strings
...
parallel_sort(strings.begin(),strings.end());
}
Wrapping Up
I hope this column helps expand the horizons of how the parallel libraries in Visual Studio 2010 will apply to your projects, beyond simply using parallel_for or tasks to speed up compute-intensive loops. You'll find many other instructive examples in our documentation on MSDN (msdn.microsoft.com/library/dd504870(VS.100).aspx) and in our sample pack (code.msdn.microsoft.com/concrtextras) that help illustrate the parallel libraries and how they can be used. I encourage you to check them out.
Rick Molloy* is a program manager on the Parallel Computing Platform team at Microsoft.*