September 2010

Volume 25 Number 09

Async Agents - Actor-Based Programming with the Asynchronous Agents Library

By Michael Chu | September 2010

With multi-core processors now commonplace in the market, from servers to desktops to laptops, the parallelization of code has never been more important. To address this vital area, Visual Studio 2010 introduces several new ways to help C++ devel­opers take advantage of these capabilities with a new parallel runtime and new parallel programming models. However, one main hurdle left for developers is deciding which programming model is correct for their applications. The correct model may significantly exploit underlying parallelism, but may also require a rethink of how your program is structured and actually executes.

The most common parallel programming models today involve general-purpose, concurrency-aware containers, and algorithms such as parallelizing loop iterations. While these traditional techniques can be a powerful method for scaling applications to take advantage of a multi-core machine, they don’t address one of the other major factors affecting parallel performance: the growing impact of latency. As parallelization techniques speed up computations and spread them out across multiple cores, Amdahl’s law (wikipedia.org/wiki/Amdahl's_law) shows us that the performance improvement is limited by the slowest portion of the execution. In many cases, there’s an increasing percentage of time spent waiting on data from I/O such as disks or networks.

Actor-based programming models deal quite well with problems such as latency and were first introduced in the early 1970s to exploit the resources of highly parallel computers with hundreds or thousands of independent processors. The fundamental concept behind an actor model is to treat the components of an application as individual actors that can interact with the world by sending, receiving and processing messages.

More recently, with the abundance of multi-core processors, the actor model has resurfaced as an effective method to hide latencies for efficient parallel execution. Visual Studio 2010 introduces the Asynchronous Agents Library (AAL), an exciting new actor-based model with message-passing interfaces where the agents are the actors. AAL enables developers to design their applications in a more dataflow-centric manner. Such a design typically makes for productive use of latency while waiting for data.

In this article, we’ll provide an overview of the AAL and demonstrate how you can take advantage of it in your applications.

The Concurrency Runtime

The foundation for concurrency support in Visual Studio 2010 and AAL is the new Concurrency Runtime, which is shipped as part of the C Runtime (CRT) in Visual Studio 2010. The Concurrency Runtime offers a cooperative task scheduler and a resource manager that has a deep understanding of the underlying resources of the machine. This allows the runtime to execute tasks in a load-balanced fashion across a multi-core machine.

Figure 1 shows an outline of the support in Visual Studio 2010 for concurrency in native code. The Scheduler is the main component that determines when and where tasks execute. It leverages information gathered by the Resource Manager to best utilize the execution resources. Applications and libraries themselves mainly interact with the Concurrency Runtime through the two programming models that sit on top of the scheduler, the AAL and the Parallel Patterns Library (PPL), although they can also directly interact with the runtime itself.

image: The Concurrency Runtime

Figure 1 The Concurrency Runtime

The PPL offers the more traditional parallelization techniques, such as parallel_for and parallel_for_each constructs, runtime-aware locks, and concurrent data structures such as queues and vectors. While not the focus of this article, the PPL is a powerful tool for developers that can be used in conjunction with all the new methods introduced in the AAL. For more information on the PPL, see the February 2009 installment of the Windows With C++ column (msdn.microsoft.com/magazine/dd434652).

In contrast, the AAL provides the ability to parallelize applications at a higher level and from a different perspective than traditional techniques. Developers need to think about applications from the perspective of the data to be processed, and consider how the processing of the data can be separated into components or stages that can execute in parallel.

The AAL provides two main components: a message-passing framework and asynchronous agents.

The message-passing framework includes a set of message blocks, which can receive, process and propagate messages. By chaining together message blocks, pipelines of work can be created that can execute simultaneously.

Asynchronous agents are the actors that interact with the world by receiving messages, performing local work on their own maintained state, and sending messages.

Together, these two components allow developers to exploit parallelism in terms of the flow of data rather than the flow of control, and to better tolerate latencies by utilizing parallel resources more efficiently.

Message-Passing Framework

The first important component of the AAL is the message-passing framework, a set of constructs to help develop dataflow networks to pipeline work. Pipelining work is a fundamental piece of the dataflow model, as it allows streaming data to be processed in parallel whenever the data is ready by breaking up the work into multiple independent stages. When the processing of data in one stage finishes, that stage can pass the data off to the next stage while the first looks for new data on which to work.

As an example, consider an e-mail application that formats outgoing messages and censors them for inappropriate content. The code for this type of operation is shown here:

std::foreach(reader.begin(); reader.end(); 
  [](const string& word) { 
    auto w1 = censor(word); 
    auto w2 = format(w1); 
    writer.write_word(w2);
  });

For each word in the e-mail, the application needs to check if it exists in a dictionary of censored words, replacing it if it does. The code then formats each word according to a set of guidelines.

There’s asignificant amount of inherent parallelism within such a scenario. However, traditional techniques for parallelism fall short. For example, a simple approach would be to use a parallel_for_each algorithm across the strings in the text to censor, then format them.

The first main deterrent to such a solution is that it must read the entire file so that an iterator can properly divide up the work.
Forcing the entire file to be read makes the process I/O-bound and can diminish parallelization gains. Of course, you could use a smart iterator to overlap processing of words with reading the input.

The second major issue with a traditional parallelization approach is ordering. Obviously, in the case of an e-mail message, parallel processing of the text must maintain the order of the text or the meaning of the message is totally lost. To maintain the ordering of the text, a parallel_for_each technique would incur significant overhead in terms of synchronization and buffering, which is automatically handled by the AAL.

By processing the message in a pipeline, you can avoid these two issues while still taking advantage of parallelization. Consider Figure 2, where a simple pipeline was created. In this example, the main tasks of the application—censoring and formatting—are separated into two stages. The first stage takes a string and looks it up in a dictionary of censored words. If a match is found, the censor block substitutes the string with a different word from the dictionary. Otherwise, it outputs the same message that was inputted. Similarly, in the second stage, the format block takes in each word and properly formats it for a certain style.

image: E-mail Processing Pipeline

Figure 2 E-mail Processing Pipeline

This example can benefit from the dataflow approach in several ways. First, because it removes the requirement to read the entire message before processing, the strings in the message can immediately start streaming through the censoring and formatting stages. Second, the pipeline processing allows one string to be processed by the format block while the next string is being processed by the censor block. Finally, because strings are processed in the order they appear in the original text, no additional synchronization needs to be done.

Message Blocks

The messages blocks receive, process, store and propagate messages. Message blocks come in one of three forms: sources, targets, and propagators. Sources only have the ability to propagate messages, while targets can receive, store and process them. The majority of blocks are propagators, which are both sources and targets. In other words, they have the ability to receive, store and process messages, as well as to turn around and send these messages out.

The AAL contains a set of message block primitives that cover the majority of use cases for developers. Figure 3 shows a brief overview of all message blocks included in the AAL. However, the model remains open, so if your application requires a message block with a specific behavior, you can write a custom block yourself that can interact with all the predefined blocks. Each block has its own unique characteristics for processing, storing and propagating messages.

Figure 3 AAL Message Blocks

Message Block Purpose
unbounded_buffer<Type> Stores an unbounded number of messages and propagates them to its targets.
overwrite_buffer<Type> Stores a single message, which will be overwritten each time a new message is propagated to it, and broadcasts it to its targets.
single_assignment<Type> Stores a single message, which is write-once, and broadcasts it to its targets.
transformer<Input,Output> Takes a message of type Input and runs a user-provided function to transform it to a message of type Output. This transformed message is propagated to its targets.
call<Type> Takes a message and runs a user-provided function with that message’s payload as an argument. This is purely a message target.
timer<Type> Propagates a message to its target after a user-defined amount of time. This can be repeating or non-repeating. This block is purely a message source.
choice<Type1,Type2,...> Takes messages from multiple sources of multiple types and will only accept the message from the first block that propagated to the choice.
join<Type> Takes messages from multiple sources and combines them together to output a single message. Asynchronously waits for messages to be ready from each source input.
multitype_join<Type1,Type2,...> Takes messages from multiple sources of multiple types and combines them together. Asynchronously waits for messages to be ready from each source input.

One of the main benefits of the message block primitives supplied by the AAL is that they’re composable. Therefore, you can combine them, based on the desired behavior. For example, you can easily create a block that adds together multiple inputs by attaching a transformer block to the end of a join block. When the join block succeeds in retrieving messages from each of its sources, it can pass them to the transformer, which sums the message payloads.

You could also connect a repeating timer block as a source of a join block. This would result in a block that throttles messages, only letting them through whenever the timer block fires its message. These two composable blocks are illustrated in Figure 4.

image: Composing Adder and Message Throttling Blocks from Primitives

Figure 4 Composing Adder and Message Throttling Blocks from Primitives

Creating a Message-Passing Pipeline

Now let’s take a look at the code to create the message-block pipeline shown earlier. We can replace the pipeline with two transformer message blocks, as shown in Figure 5. The purpose of a transformer block is to take a message of a certain type and execute a user-­defined function on that message, which can modify the message’s payload or even completely change the type of the message. For example, the censor block takes as input a message containing a string and needs to process it.

image: A Message Block Pipeline

Figure 5 A Message Block Pipeline

The code for creating and connecting the message blocks is shown in Figure 6. This code begins with the instantiation of the two transformer message blocks. The C++0x lambda parameter on the censor block constructor defines the transformation function, which looks up the message’s stored input string in a dictionary to see if it should be changed to a different string. The resulting string is returned, and within the censor block it’s then wrapped in a message and propagated out of the block. A similar path is taken for the format transformer block, except its output is a string that has been changed by a format function.

Figure 6 Simple Message Pipeline

dictionary dict;
transformer<string, string> 
  censor([&dict](const string& s) -> string {
  string result = s;
  auto iter = dict.find(s);
  if (iter != dict.end()) {
    result =  iter->second;
  }
  return result;
});
transformer<string, string> 
  format([](const string& s) -> string {
  string result = s;
  for (string::size_type i = 0; i < s.size(); i++) {
    result[i] = (char)Format(s[i]);
  }
  return result;
});
censor.link_target(&format);
asend(&censor, "foo");
string newStr = receive(format);
printf("%s\n", newStr);

Following the instantiation of the two blocks, the next line links the two blocks together by calling the link_target method on the censor block. Every source and propagator block has a link_target method that’s used to determine to which message blocks the source should propagate its messages.

After the censor and format blocks have been linked together, any message propagated into the censor block will go through its transform function and the resulting message will implicitly be passed on to the format block for processing. If a message block is a source or propagator yet has no connected targets, the message block can store the message in a block-specific manner until either a target is linked, or the message is retrieved.

The last three lines of the example code show the process of initiating messages into a block and retrieving a message out of a block. There are two message initiation APIs in the AAL: send and asend. These input a message into a block synchronously and asynchronously, respectively.

The main difference is that when a send call returns, it’s guaranteed to have already pushed its message into and through the block to which the message is being sent. The asend call can return immediately and will allow the Concurrency Runtime to schedule its propagation. Similarly, there are two message retrieval APIs in the AAL: receive and try_receive. The receive method will block until a message arrives, whereas the try_receive will return immediately if it’s unable to retrieve a message.

In Figure 6, the string “foo” is sent in asynchronously to the censor block. The censor block will take the message, check if its string is in the dictionary of censored words, and then propagate the resulting string in a message. This will then be passed to the format block, which will take the string, capitalize each letter, and because it has no targets, hold on to the message. When receive is called, it will grab the message from the format block. Thus, assuming “foo” was not in the dictionary, the output of this example would be “FOO.” While this example only pushes a single string through the network, you can see how a stream of input strings forms a pipeline of execution.

Looking at this messaging example, notice the distinct lack of references to messages themselves. A message is simply an envelope that wraps the data you want to pass around your dataflow network. The message passing itself is handled through a process of offering and accepting. When a message block receives a message, it has the ability to store that message in any way it wants. If it later wishes to send a message out, it offers the message to each of its connected targets. To actually take the message away, the receiver must accept the offered message to complete the transaction. This entire process of message passing between blocks is scheduled and handled by tasks that are scheduled and executed by the Concurrency Runtime.

Message-Block Propagation

Now that you’ve seen how message blocks are created and tied together, and how messages can be initiated and retrieved from each of them, let’s take a brief look at how messages are passed between blocks and how the Concurrency Runtime fits at the heart of the AAL.

This information is not necessary for using message blocks or the AAL, but can help give a deeper understanding of how the message-passing protocols work and how you can take advantage of them. For the rest of this section, I’ll discuss propagator blocks, because they’re both sources and targets. Obviously, a pure source or pure target block would simply be a subset of the propagator block implementation.

Internally, each propagator block has an input queue for messages and another block-specific storage container for messages. Other blocks that are linked to this propagator block send messages that are stored into the input queue.

For example, in Figure 7, the censor transformer block has an input queue that’s currently storing a message with a string str6 in it. The actual transformer itself contains two messages: str4 and str5. Because this is a transformer, its block-specific storage is another queue. Different block types can have different storage containers. For example, the overwrite_buffer block only stores a single message that would always get overwritten.

image: Message-Passing Protocol

Figure 7 Message-Passing Protocol

When a message is presented to a block from one of its linked sources (or the send/asend APIs), the block first checks a filter function to determine whether or not to accept the message. If it decides to accept the message, the message is placed into the input queue. A filter is an optional function that can be passed into the constructor of each target or propagator block that returns a Boolean that determines whether a message offered from a source should be accepted. If the message is declined, the source will continue to its next target to offer the message.

Once a message is placed in the input queue, the source block it came from no longer holds on to the message. However, the accepting block does not yet have the message ready for propagation. Thus, messages can buffer up in the input queue while they await processing.

When a message arrives in an input queue on a message block, a lightweight task (LWT) is scheduled within the Concurrency Runtime scheduler. The purpose of this LWT is twofold. First, it must move messages from the input queue to the internal storage of the block (which we refer to as message processing). Second, it must also try to propagate messages to any targets (which we refer to as message propagation).

For example, in Figure 7, there were messages in the input queue that prompted the LWT to be scheduled. The LWT then processed the message by first executing the transformer’s user-provided function on the message, checking it in the censored string dictionary, then moving the message to the storage buffer for the block.

After transferring it into a storage buffer, the LWT begins the propagation step where messages are sent to the target format block. In this case, because message str4 was at the head of the transformer, it’s propagated to the format block first, and then the next message, str5, is propagated. The same entire process occurs on the format block.

Message processing can differ, depending on the type of message block. For example, an unbounded_buffer had a simple processing step of moving a message to its storage buffer. The transformer processes messages by calling its user-defined function on the message before moving it to a storage buffer. Other blocks can become even more complex, such as the join, which must combine multiple messages from different sources and store them to a buffer in preparation for propagation.

For performance efficiency, the AAL is intelligent in its creation of LWTs so that only one is scheduled at a time for each message block. If further messages arrive in an input queue while the processing LWT is active, it will continue to pick up and process those messages. Thus, in Figure 7, if the transformer’s LWT is still processing when message str7 enters the input queue, it will pick up and process this message rather than starting a new processing and propagation task.

The fact that each message block has its own LWT that handles processing and propagation is central to the design, which allows the message-passing framework to pipeline work in a dataflow manner. Because each message block does its processing and propagation of its messages in its own LWT, the AAL is able to decouple the blocks from one another and allow parallel work to be executed across multiple blocks. Each LWT must simply propagate its messages into its target blocks’ input queues, and each target will simply schedule an LWT to handle its own inputs. Using a single LWT to process and propagate ensures that message ordering is maintained for the message blocks.

Asynchronous Agents

The second main component of the AAL is the asynchronous agent. Asynchronous agents are coarse-grained application components that are meant to asynchronously deal with larger computing tasks and I/O. Agents are expected to communicate with other agents and initiate lower-level parallelism. They’re isolated because their view of the world is entirely contained within their class, and they can communicate with other application components by using message passing. Agents themselves are scheduled as tasks within the Concurrency Runtime. This allows them to block and yield cooperatively with other work executing at the same time.

An asynchronous agent has a set lifecycle, as shown in Figure 8. The lifecycle can be monitored and waited on. States in green signify running states, while states in red are the terminal states. Developers can create their own agents by deriving from the base agent class.

image: The Asynchronous Agent Lifecycle

Figure 8 The Asynchronous Agent Lifecycle

Three base class functions—start, cancel and done—transition the agent between its different states. Once constructed, agents are in the created state. Starting an agent is similar to starting a thread. They will not execute anything until the start method is called on them. At that time, the agent is scheduled to execute and the agent moves into the runnable state.

When the Concurrency Runtime picks up the agent, it moves into the started state and continues to run until the user calls the done method, indicating its work has completed. Any time after the agent has been scheduled but not yet started, a call to cancel will transition the agent to a canceled state and it will never execute.

Let’s look back at the e-mail filtering example, where the pipelined message blocks introduced dataflow into the application and improved its ability to parallel process words. However, the example did not show how to handle the I/O of dealing with the e-mails themselves and breaking them into streams of strings for the pipeline to process. Also, once the strings have been passed through the pipeline, the strings must be gathered so that the text can be rewritten in its newly censored and formatted state. This is where agents can come into play in order to help tolerate the differences in latencies with I/O.

For example, consider the end of our e-mail pipeline. At this point, strings are being outputted by the format and need to be written to files in a mailbox. Figure 9 shows how an output agent can capture strings and create output e-mail messages. The run function of the WriterAgent receives messages from the format block in a loop.

image: An Agent Capturing the Output of the Format Block

Figure 9 An Agent Capturing the Output of the Format Block

While the majority of the processing done in this application is using dataflow, the WriterAgent shows how some control-flow can be introduced into the program. For example, when an end-of-file message arrives, the WriterAgent must have different behavior depending on the input string being received; it must know to cease operation. The code for the WriterAgent is in Figure 10.

Figure 10 WriterAgent

class WriterAgent : public agent {
public:
  WriterAgent(ISource<string> * src) : m_source(src) {
  }
  ~WriterAgent() {
    agent::wait(this);
  }
  virtual void run() {
    FILE *stream;
    fopen_s( &stream, ... );
    string s;
    string eof("EOF");
    while (!feof(stream) && ((s=receive(m_source)) != eof)) {
      write_string(stream, s);
    }
    fclose(stream);
    done();
  }
private:
  ISource<string> * m_source;
};

There are a few interesting portions of this code to note. First, within the destructor, a call is made to a static function agent::wait. This function can be called with a pointer to any agent and will block until the agent enters one of the terminal states: done or canceled. While calling wait in the destructor is not necessary for all agents, in most cases it should be done, as it ensures the agent is no longer executing any code when destructing.

Second, an interesting aspect of this code is the run method itself. This method defines the main execution of the agent. In this code, the agent is dealing with writing out the strings it reads from its source (in our example, the format block).

Finally, note the last line of the run method, which is a call to the agent function done. The call to the done method moves the agent from the running state to the done state. In most cases, this will need to be called at the end of the run method. However, in some circumstances, applications may want to use agents to set up state, such as in a dataflow network, which should remain alive past the lifetime of the run method.

Tying Everything Together

Now that we’ve created a messaging pipeline to filter and format strings, and an output agent to process them, we can add an input agent that has very similar behavior to the output agent. Figure 11 shows an example of how this application fits together.

image: Agents Used to Process E-mail Messages

Figure 11 Agents Used to Process E-mail Messages

One of the benefits of agent processing is the ability to use asynchronous actors in the application. Thus, when data arrives for processing, the input agent will asynchronously start sending the strings through the pipeline and the output agent can likewise read and output files. These actors can start and stop processing entirely independently and totally driven by data. Such behavior works beautifully in many scenarios, especially latency-driven and asynchronous I/O, like the e-mail processing example.

In this example, I added a second agent, a ReaderAgent, which acts similarly to the WriterAgent, except it handles the I/O to deal with reading the e-mails and sending strings to the network. The code for the ReaderAgent is in Figure 12.

Figure 12 ReaderAgent

class ReaderAgent : public agent {
public:
  ReaderAgent(ITarget<string> * target) : m_target(target) {
  }
  ~ReaderAgent() {
    agent::wait(this);
  }
  virtual void run() {
    FILE *stream;       
    fopen_s( &stream, ...);
    while (!feof(stream)) {
      asend(m_target, read_word(stream));
    }
    fclose( stream );
    asend(m_target, string("eof"));
    done();
  }
private:
  ITarget<string> * m_target;
};

Now that we have both a ReaderAgent and a WriterAgent to asynchronously handle the I/O for the program, we simply need to link them up to the transformer blocks in the network to begin processing. This can be done easily after linking the two blocks together:

censor.link_target(&format);
ReaderAgent r(&censor);
r.start();
WriterAgent w(&format);
w.start();

The ReaderAgent is created with a reference to the censor so it can properly send messages to it, while the WriterAgent is created with a reference to the format so it can retrieve messages. Each agent is started with its start API, which schedules the agents for execution within the Concurrency Runtime. Because each agent calls the agent::wait(this) in its own destructor, the execution will wait until both agents have reached their done state.

Syncing Up

This article was written to give you a glimpse into some of the new possibilities for actor-based programming and dataflow pipelining built into Visual Studio 2010. We encourage you to try it out.

If you want to dig deeper, there are plenty of other features we weren’t able to cover in this article: custom message block creation, filtering messages, and much more.


Michael Chu is a software development engineer in the Parallel Computing Platform group at Microsoft. He works on the Concurrency Runtime team.

Krishnan Varadarajan is a software development engineer in the Parallel Computing Platform group at Microsoft. He works on the Concurrency Runtime team.

Thanks to the following technical experts for reviewing this article: Concurrency Runtime team