sup { vertical-align:text-top; } FEATURE HEADER (DELETE IF NOT USED)

Concurrent Affairs

Simplified APM With The AsyncEnumerator

Jeffrey Richter

Contents

Using My AsyncEnumerator Class
AsyncEnumerator's Architecture
Manipulating the Wait and Inbox Counters
Thread-Safe Morphing
Tune In for Even More

In my last column (msdn2.microsoft.com/magazine/cc163323), I introduced some concepts about how new C# language features—anonymous methods, lambda expressions, and iterators—could be used to simplify asynchronous programming. I ended that column showing how asynchronous programming could be accomplished using a synchronous programming model by way of a C# iterator. The example iterator is shown in Figure 1 . However, you can't just use a normal C# foreach statement to execute the iterator's code because the code would always be executed by the thread calling foreach, and execution would run continuously without pausing for the asynchronous I/O operations to complete.

Figure 1 C# Iterator

private static IEnumerator<Int32> ApmPatternWithIterator(
   AsyncEnumerator ae, String pathname) {
   using (FileStream fs = new FileStream(pathname, FileMode.Open,
      FileAccess.Read, FileShare.Read, 8192, FileOptions.Asynchronous)) {

      Byte[] data = new Byte[fs.Length];
      fs.BeginRead(data, 0, data.Length, ae.End(), null);
      yield return 1;

      Int32 bytesRead = fs.EndRead(ae.DequeueAsyncResult());
      ProcessData(data);
   }
}

In this column, I introduce my AsyncEnumerator class, which intelligently drives an iterator so that different thread pool threads can execute the code at different times, and I ensure that the iterator only advances after asynchronous I/O operations complete. I am also going to explain the architecture of my AsyncEnumerator class and how it works.

Using My AsyncEnumerator Class

Here is the definition of my AsyncEnumerator class:

public class AsyncEnumerator {
   // Methods called by code outside of the iterator
   public AsyncEnumerator();
   public void Execute(IEnumerator<Int32> enumerator);

   // Methods called by code inside the iterator
   public AsyncCallback End();
   public IAsyncResult DequeueAsyncResult();
}

Using my AsyncEnumerator class is quite simple. Let's talk first about how to implement your iterator member and then I'll explain how to invoke it.

Define your iterator member to accept any parameters you want and add an additional parameter that is a reference to an Async­Enumerator object. Your iterator member must be prototyped as returning a collection of Int32s; in other words, it must have a return type of IEnumerator<Int32>.

Then, inside your iterator member, start each asynchronous operation by calling the appropriate BeginXxx method. As you know, when you call a BeginXxx method, it is important that you pass it the name of a method that should be called when the asynchronous operation completes.

Instead of defining your own method, call the AsyncEnumerator object's End method. The End method returns an AsyncCallback delegate that identifies a private method defined inside my Async­Enumerator class. So, as each asynchronous operation completes, code in an AsyncEnumerator object is notified. This code then places the IAsyncResult object for the completed operation into a List<IAsyncResult> object, which I refer to as the inbox.

In your code, after calling a BeginXxx method, place a yield return statement that returns the number of asynchronous operations that you've queued up. In Figure 1, since I called just one BeginXxx method (BeginRead), I yield return 1. The yield return statement suspends your iterator method and stops anymore code in it from executing.

I'll show other examples later where I yield return values other than 1, but, for many applications, returning 1 will be appropriate. The number you specify in a yield return statement tells the Async­Enumerator object how many asynchronous operations need to complete before resuming your iterator. So, while your iterator is suspended, any asynchronous operations you initiated start completing. As each completes, an entry is added to the Async­Enumerator's inbox. When the count of items in the inbox is equal to the number you specified in your yield return statement, the Async­Enumerator object resumes your iterator and allows its code to continue executing.

After the yield return statement, you call the corresponding EndXxx method. When you call an EndXxx method, you need to pass it an IAsyncResult object. Fortunately, the AsyncEnumerator object has been queuing these up in its inbox for you, so you just need to dequeue them by calling the AsyncEnumerator object's DequeueAsyncResult method. Items are dequeued from the inbox using a first-in-first-out (FIFO) algorithm.

Once the EndXxx method returns, you can process the result of the asynchronous operation directly inside the iterator! This is very cool, as you now have a synchronous programming model even though you're performing asynchronous operations.

In many scenarios, it is useful to now queue up yet another asynchronous operation by calling an additional BeginXxx method. In the iterator, you can call another BeginXxx method and then have another yield return 1 statement followed by calling the corresponding EndXxx method. This allows you to easily execute a sequence of asynchronous operations, one after another, without blocking any threads. Plus, you get to write all the code using a synchronous programming model!

As an example, the HtmlToFile iterator shown in Figure 2 reads HTML from a Web server in 1 kilobyte (KB) chunks and saves it to a local file. Invoking the iterator is very simple: you construct an AsyncEnumerator object, and then you call its Execute method, passing it the name of your iterator member. You can pass arguments to your iterator member, and you'll also need to pass a reference to the AsyncEnumerator object into your iterator so that the iterator can call the End and DequeueAsyncResult methods. Here is code that invokes the HtmlToFile iterator:

AsyncEnumerator ae = new AsyncEnumerator();
ae.Execute(HtmlToFile(
  ae, 
  "https://www.Wintellect.com/", 
  "LocalFile.html")); 

Figure 2 HtmlToFile Iterator

private static IEnumerator<Int32> HtmlToFile(AsyncEnumerator ae, 
   String url, String file) {

   // Issue asynchronous web request operation
   WebRequest webRequest = WebRequest.Create(url);
   webRequest.BeginGetResponse(ae.End(), null);
   yield return 1;

   WebResponse webResponse;
   try {
      webResponse = webRequest.EndGetResponse(ae.DequeueAsyncResult());
   }
   catch (WebException e) {
      Console.WriteLine("Failed to contact server: {0}", e.Message);
      yield break;
   }

   using (webResponse) {
      Stream webResponseStream = webResponse.GetResponseStream();

      // Read the stream data and write it to a file in 1KB chunks
      Byte[] data = new Byte[1024];
      using (FileStream fs = new FileStream(file, FileMode.Create, 
         FileAccess.Write, FileShare.None, data.Length, 
         FileOptions.Asynchronous)) {

         // See support.microsoft.com/kb/156932 
         fs.SetLength(webResponse.ContentLength);

         while (true) {
            // Issue asynchronous web response stream read operation
            webResponseStream.BeginRead(data, 0, data.Length, 
               ae.End(), null);
            yield return 1;

            // Get result of web response stream read operation
            Int32 bytesRead = webResponseStream.EndRead(
               ae.DequeueAsyncResult());
            if (bytesRead == 0) break; // Stream end: close file & exit

            // Issue asynchronous file write operation
            fs.BeginWrite(data, 0, bytesRead, ae.End(), null);
            yield return 1;

            // Get result of file write operation
            fs.EndWrite(ae.DequeueAsyncResult());
         }
      }
   }
}

My AsyncEnumerator's Execute method will not return until the iterator member either exits or executes a yield break statement. While the main thread is blocked, thread pool threads will continue to execute the iterator as asynchronous operations complete. All of this means that each part of your iterator's code could theoretically be executed by a different thread, and therefore your iterator code should not depend on a thread-specific state such as thread local storage, culture, UI culture, Principal, or priority.

By the way, having the thread that calls Execute block until the iterator is finished running is not something that I'm in love with. But it makes explaining how my AsyncEnumerator class works easier and can simplify experimentation and debugging. However, I want to point out that there is a way to get my AsyncEnumerator to not block while executing the iterator, and this is important in order to obtain scalability and responsiveness if you want to invoke an iterator from a GUI thread in a Windows ® Form or a Windows Presentation Foundation (WPF) application. In the next installment of this column, I'll show you how to execute your iterator member asynchronously.

AsyncEnumerator's Architecture

My AsyncEnumerator class defines the private fields shown in Figure 3. The m_enumerator field refers to your iterator member, the constructor initializes this field to null, and Execute sets this field to the value passed to it. The m_inbox field refers to a collection of IAsyncResult objects. The m_waitAndInboxCounts field is a structure that contains two 16-bit integer values; I'll explain how these values get manipulated later in this column.

Figure 3 AsyncEnumerator Fields

public partial class AsyncEnumerator {
   // Refers to the iterator member's code
   private IEnumerator<Int32> m_enumerator;

   // Collection of completed async operations (the inbox)
   private List<IAsyncResult> m_inbox = 
      new List<IAsyncResult>();

   // Structure with Wait & Inbox counters
   private WaitAndInboxCounts m_waitAndInboxCounts;
}

As I discussed in my last column, an iterator is really an easy way to get the compiler to create a class that implements the IEnumerator<T> interface. Normally, the way you advance through an IEnumerator<T> object is with the foreach statement. However, there is nothing stopping you from explicitly calling the IEnumerator<T> MoveNext method and Current property in code, which is exactly what my AsyncEnumerator class does.

AsyncEnumerator's Execute method internally calls a private method called ResumeIterator (shown in Figure 4). This method is responsible for starting the iterator and also for resuming it from a suspended state.

When a thread calls ResumeIterator, it calls the IEnumerator<T> MoveNext method, which wakes up and runs the iterator. If the iterator exits or executes a yield break statement, then the IEnumerator<T> MoveNext method returns false, indicating that it has no more work to do. When my ResumeIterator method's continueIterating variable is set to false, it exits the loop and then calls the IEnumerator<T> object's Dispose method, allowing it to clean up. My ResumeIterator method then returns because the iterator has completed all its work.

Figure 4 ResumeIterator

private void ResumeIterator() {
   Boolean continueIterating;

   // While there are more operations to perform...
   while (continueIterating = m_enumerator.MoveNext()) {

      // Get the value returned from the enumerator
      UInt16 numberOpsToWaitFor = checked((UInt16) m_enumerator.Current);

      // If inbox has fewer items than requested, keep iterator suspended
      if (!m_waitAndInboxCounts.AtomicSetWait(numberOpsToWaitFor)) break;

      // Inbox has enough items, loop to resume the iterator
   }

   // The iterator is suspended, just return
   if (continueIterating) return;

   // The iterator has exited, execute the iterator's finally code
   m_enumerator.Dispose();
}

On the other hand, if the enumerator object's MoveNext method returns true, then it has initiated some asynchronous operations and has suspended its execution by executing a yield return statement. My ResumeIterator method now needs to know what number the IEnumerator<T> object specified in its yield return statement. This number indicates the number of asynchronous operations that should complete before resuming the iterator. To get the number, ResumeIterator queries the enumerator object's Current property; this property returns the last value specified via a yield return statement.

ResumeIterator then calls the WaitAndInboxCounts AtomicSetWait method, which sets the number of items that the iterator should wait for. If AtomicSetWait sees that the number of items in the inbox is less than this wait count, then AtomicSetWait returns false, causing ResumeIterator to return because this thread has nothing else to do. If AtomicSetWait sees that the number of items in the inbox is greater than or equal to the wait count, then AtomicSetWait returns true and loops around, calling the enumerator object's MoveNext method again and allowing the iterator to resume execution so it can process the completed operations.

Inside the iterator, all BeginXxx methods pass the same method for the AsyncCallback method. This private method, called Enqueue­AsyncResult, is acquired by calling AsyncEnumerator object's End method and is implemented like this:

private void EnqueueAsyncResult(IAsyncResult result) {
   // Add this item to the inbox
   lock (m_inbox) { m_inbox.Add(result); }

   // Add 1 to inbox count. If inbox has enough items 
   // in it; this thread calls ResumeIterator
   if (m_waitAndInboxCounts.AtomicIncrementInbox()) 
      ResumeIterator();
}

In a thread-safe way, this method adds the completed operation's IAsyncResult object to the inbox and then calls the WaitAndInboxCounts AtomicIncrementInbox method, which adds one to the number of items in the inbox. If AtomicIncrement­Inbox sees that the number of items in the inbox is less than the wait count, then AtomicIncrementInbox returns false, which causes the thread pool thread to return to the pool so it can be used to perform other work.

If AtomicIncrementInbox sees that the number of items in the inbox is equal to the wait count, then AtomicIncrementInbox returns true and calls my AsyncEnumerator's ResumeIterator method, which in turn calls the enumerator object's MoveNext method, resuming the iterator's execution so it can then process the completed operations.

Manipulating the Wait and Inbox Counters

Each AsyncEnumerator object maintains a 16-bit Wait count and a 16-bit Inbox count; of course, both counts have a range from 0 to 65535. The Wait count indicates the number of asynchronous operations that must complete before the enumerator object is resumed. An iterator sets this value by way of its yield return statement. The Inbox count maintains the number of asynchronous operations that have completed. The Wait and Inbox counts must be modified and checked using thread-safe techniques since multiple threads could be executing asynchronous operation completion code concurrently while the iterator is also executing its yield return statement.

The fastest way to update these counts in a thread-safe manner is to put them both in a single Int32 and then to use interlocked methods to update the Int32. To make the code more readable and maintainable, I decided to define a WaitAndInboxCounts structure to manage these two counts (see Figure 5). The structure defines a private Int32 field and offers internal members to manipulate the Int32 in a thread-safe way. The AsyncEnumerator code calls these members.

Figure 5 Thread-Safe Wait and Inbox Counts

private struct WaitAndInboxCounts {
   private const UInt16 c_MaxWait = 0xFFFF;
   // Wait=High 16 bits, Inbox=low-16 bits
   private Int32 m_waitAndInboxCounts;    

   private UInt16 Wait {
      get { return (UInt16) (m_waitAndInboxCounts >> 16); }
      set { m_waitAndInboxCounts = (Int32) ((value << 16) | Inbox); }
   }
   private UInt16 Inbox {
      get { return (UInt16) m_waitAndInboxCounts; }
      set { m_waitAndInboxCounts = 
         (Int32)((m_waitAndInboxCounts & 
            0xFFFF0000)|value); }
   }

   private WaitAndInboxCounts(Int32 waic) { m_waitAndInboxCounts = waic; }
   private Int32 ToInt32() { return m_waitAndInboxCounts; }

   internal void Initialize() { Wait = c_MaxWait; }

   internal Boolean AtomicSetWait(UInt16 numberOpsToWaitFor) {
      return InterlockedEx.Morph<Boolean, UInt16>(
         ref m_waitAndInboxCounts, 
         numberOpsToWaitFor, SetWait);
   }

   private static Int32 SetWait(Int32 i, UInt16 numberOpsToWaitFor, 
      out Boolean shouldMoveNext) {
      WaitAndInboxCounts waic = new WaitAndInboxCounts(i);
      // Set the number of items to wait for
      waic.Wait = numberOpsToWaitFor;  
      shouldMoveNext = (waic.Inbox >= waic.Wait);

      // Does the inbox contain enough items to MoveNext?
      if (shouldMoveNext) {         
         // Subtract the number of items from the inbox 
         waic.Inbox -= waic.Wait;   
         // The next wait is indefinite 
         waic.Wait = c_MaxWait;     
      }
      return waic.ToInt32();
   }

   internal Boolean AtomicIncrementInbox() {
      return InterlockedEx.Morph<Boolean, Object>(
         ref m_waitAndInboxCounts, 
         null, IncrementInbox);
   }

   private static Int32 IncrementInbox(Int32 i, Object argument, 
      out Boolean shouldMoveNext) {
      WaitAndInboxCounts waic = new WaitAndInboxCounts(i);
      // Add 1 to the inbox count
      waic.Inbox++;                 
      shouldMoveNext = (waic.Inbox == waic.Wait);

      // Does the inbox contain enough items to MoveNext?
      if (shouldMoveNext) {         
         // Subtract the number of items from the inbox 
         waic.Inbox -= waic.Wait;   
         // The next wait is indefinite 
         waic.Wait = c_MaxWait;     
      }
      return waic.ToInt32();
   }
}

The two internal methods available are AtomicSetWait and AtomicIncrement­Inbox. Earlier in this column, I explained what these methods do. Now I'd like to explain how they do it, as this is a useful technique that could be used in many scenarios where you need to manipulate multiple values in an atomic fashion.

As I'm sure many of you are aware, the Microsoft ® .NET Framework Class Library's Interlocked class offers several methods to manipulate an Int32 value in a thread-safe way. There are methods to increment an Int32 by 1, decrement an Int32 by 1, add a positive or negative value to an Int32, change an Int32 to a new value, check whether an Int32 is a specific value, and, if so, change it to another value. In short, the interlocked methods are great, but they offer just a few ways to manipulate an Int32.

Thread-Safe Morphing

In my very first Concurrent Affairs column (msdn.microsoft.com/msdnmag/issues/05/10/ConcurrentAffairs), I showed a well-known pattern for how to use the Interlocked class's CompareExchange method to manipulate an Int32 value in richer ways. Specifically, I showed how to perform an atomic bitwise AND, OR, or XOR operation on the Int32. However, it is easy to extend this pattern to perform atomic multiply, divide, min, and max operations, and many, many more.

In fact, what I decided to do was to extend the pattern by turning it into a generic method called Morph<TResult, TArgument>, which looks like this:

public static TResult Morph<TResult, TArgument>(ref Int32 target,
   TArgument argument, Morpher<TResult, TArgument> morpher) {

   TResult morphResult;
   Int32 i, j = target;
   do {
      i = j;
      j = Interlocked.CompareExchange(ref target, 
         morpher(i, argument, out morphResult), i);
   } while (i != j);
   return morphResult;
}

When you call this method, you pass it (by reference) the Int32 whose value you wish to manipulate atomically; you can also pass an additional argument to be used by your morpher. Finally, you pass a method whose signature matches the Morpher delegate, defined as the following:

delegate Int32 Morpher<TResult, TArgument>(Int32 startValue,
   TArgument argument, out TResult morphResult); 

Your Morpher method is passed the initial value in the target Int32 and the additional argument passed to Morph. The Morpher method applies any algorithm to the Int32 value and returns the desired new value. If the original Int32 value was not modified by another thread while the algorithm was executing its code, then the value in target is assigned the value returned by the Morpher method. If the original Int32 value was changed by another thread while the algorithm was executing its code, then the Morph method calls the Morpher method again, passing the new Int32 value (as modified by the other thread).

You'll notice that the Morpher method must also set an out parameter, morphResult. When Morph successfully modifies the target, this out parameter is the value that the Morph method returns to its caller. The beautiful thing about all of this is that the code inside the Morpher method does not have to be thread safe at all. It can be thousands of lines long and can perform several manipulations; the Morph method is responsible for ensuring that the original Int32 value is manipulated in a thread-safe way.

So now, with the Morph method and the Morpher delegate defined, I can use this infrastructure to implement the WaitAndInboxCounts AtomicSetWait and AtomicIncrementInbox methods.

Internally, AtomicSetWait calls Morph and passes it a Morpher method that sets the new Wait count to the value returned by the iterator's yield return statement. If the Inbox count is less than the new Wait count, then the new Wait count is set atomically. Also, false is returned to Morph's caller telling the caller that the enumerator object's MoveNext method should not be called since the iterator indicates that it wants to wait for more items to appear in the inbox.

If the Morph method sees that the Inbox count is greater than or equal to the new Wait count, then the Morph method subtracts the Wait count from the Inbox count (since it's logically removing these items from the Inbox) and the Wait count is set to its maximum (65535) so that items being added to the inbox (by other threads) won't cause the iterator to resume execution. However, when all of this executes successfully, the Morph method will return true to its caller and this thread will call the enumerator object's MoveNext method, resuming the iterator so that it can process the completed items that it was waiting for.

Internally, AtomicIncrementInbox calls Morph, passing it a Morpher method that adds 1 to the Inbox count. If the Inbox count is not equal to the current Wait count, then the Wait count is set atomically and false is returned to Morph's caller, telling the caller that the enumerator object's MoveNext method should not be called since the iterator indicates that it wants to wait for more items to appear in the inbox.

As with Morph, if the Morpher method sees that the Inbox count is equal to the current Wait count, then the Morpher method subtracts the Wait count from the Inbox count (since it's logically removing these items from the Inbox) and the Wait count is set to its maximum so that items being added to the inbox (by other threads) won't cause the iterator to resume execution. However, when all of this executes successfully, the Morph method will return true to its caller and this thread will call the enumerator object's MoveNext method, resuming the iterator so that it can process the completed items that it was waiting for.

The WaitAndInboxCounts structure defines some additional private members that are used internally to manipulate the Int32 value in a non-thread-safe way. The private constructor effectively casts an Int32 to a WaitAndInboxCounts structure. The private ToInt32 method returns the Int32 value inside of a WaitAndInboxCounts structure. Also, there are private Wait and Inbox properties to get and set the Int32 field's low and high 16-bit values.

Tune In for Even More

Many developers understand how valuable asynchronous programming is in allowing you to implement high-performance, scalable applications and components. However, in practice, many programmers have found the asynchronous programming model to be so difficult to use that they have avoided it, accepting poor performance in their applications. It is my hope that my Async­Enumerator class, along with C# iterators, makes asynchronous programming so easy that programmers will embrace it, thereby improving the performance and scalability of their applications by reducing the number of threads and context switches.

In my next column, I will show additional features that my Async­Enumerator class has to offer, and I will also show various usage patterns that it supports. Until next time …

Jeffrey Richter is a cofounder of Wintellect (www.Wintellect.com), an architecture review, consulting, and training firm. He is the author of several books, including CLR via C#. Jeffrey is also a contributing editor to MSDN Magazine and has been consulting with Microsoft since 1990. Send your questions and comments for Jeffrey to mmsync@microsoft.com.

Insights: AsyncEnumeration in Context

Using C# iterators to perform asynchronous processing is one part of a much larger set of challenges related to concurrent programming. Microsoft currently has several development initiatives that seek to address these challenges.

One of these is the Concurrency and Coordination Runtime (CCR), a more generic framework that can be used to address a variety of both asynchronous- and concurrency-related problems.

CCR includes an implementation for performing asynchronous enumeration using C# iterators. For example, the following demonstrates how you could asynchronously process a file using the iterators in the CCR:

IEnumerator<ITask> CcrIterator(string pathname) {
  var resultPort = new Port<IAsyncResult>();
  using (FileStream fs = new FileStream(pathname, 
    FileMode.Open, FileAccess.Read, FileShare.Read, 
    8192, FileOptions.Asynchronous)) {
      Byte[] data = new Byte[fs.Length];
      fs.BeginRead(data, 0, data.Length, resultPort.Post, null);    
      yield return resultPort;
      IAsyncResult ar = resultPort;
      Int32 bytesRead = fs.EndRead(ar); }
    ProcessData(data);
  }
}

In addition to supporting iterators, CCR also provides extended support for exception handling and coordination primitives such as arbiters. For more information on CCR, check out the CCR User Guide (msdn2.microsoft.com/library/bb905447), the Concurrent Affairs column from September 2006 (msdn2.microsoft.com/magazine/cc163556), and CCR on Channel 9 (channel9.msdn.com/showpost.aspx?postid=219308).

—Howard Dierking, Editor-in-Chief