October 2012

Volume 27 Number 10

TPL - Horizontal Scalability for Parallel Execution of Tasks

By Jesus Aguilar | October 2012

The Task Parallel Library (TPL), introduced in the Microsoft .NET Framework 4, empowers application developers to create solutions that leverage the power of parallel processing in a multicore computer. In many scenarios, however, the ability to scale vertically (by adding more cores) is constrained by a number of factors, including cost and hosting limitations. In such cases, if scalability is required, it’s desirable to distribute the processing across an array of servers; cloud hosting is an example of this. In this article I’ll describe the key aspects (including an implementation) of a conceptual solution to accomplish this using many of the new features of the .NET Framework 4.5.

Basic Assumptions

The approach I’ll describe requires several technologies beyond the TPL, which include:

  • Task Parallel Library (TPL)
  • Windows Communication Foundation (WCF)
  • Managed Extensibility Framework (MEF)

Note that I’ll discuss these only in the context of the problem I’m trying to solve. I’m assuming you have a good understanding of these technologies.

Remote Task Client, Task Coordinator and Task Execution Nodes

The remote task client is the client-side layer that will hide the complexity resulting from the semantics of using a distributed environment. The remote task client interacts directly with the task coordinator, which then becomes the entry point to the underlying infrastructure. At a high level, the task coordinator has the following attributes:

  1. It’s the only point of contact with the clients.
  2. It exposes the necessary services to request the execution of tasks on the scalable platform, as well as the cancellation of a given task.
  3. It handles the throttling and queuing of the task execution requests, which supports the healthy operation of the environment.

The task execution nodes are the hosts of the processes in which the tasks will run. The actual implementations of the tasks that will be executed by the TPL reside in the task execution nodes.

Here are the key aspects of these logical layers and the flow of information:

  1. The remote task client requests the execution of one or more tasks.
  2. The task coordinator submits the request to the task execution nodes.
  3. The task execution nodes execute the tasks and update the status of each request in the task coordinator.
  4. The task coordinator updates the client with the results of the execution for each request.
  5. The task execution nodes reside behind a load balancer so more nodes can be added as needed, providing the ability to scale horizontally.

Figure 1depicts the logical layers and the flow of information.

Scaling Tasks Horizontally
Figure 1 Scaling Tasks Horizontally

Note how the task execution nodes update the task coordinator, which then updates the remote task client. I’m going to describe an implementation based on bi-directional communication between the client and the task coordinator and between the task coordinator and the task execution nodes. In WCF terms, this implies the use of a duplex channel that allows the task execution nodes to call back the task coordinator and, subsequently, the task coordinator to do the same to update the client. I’ll showcase the use of WebSockets to achieve this bi-directional communication approach. The WebSockets transport is implemented as a new binding in the .NET Framework 4.5 and is available for Windows 8. You’ll find more information about the binding at bit.ly/SOLNiU.

The Client and the Task Coordinator

Now that you understand the three main logical layers—remote task client, task coordinator and task execution nodes—let’s start by discussing the implementation of the remote task client. Note that when I use the term “client” throughout this article, I’m referring to the remote task client.

As I mentioned earlier, the value proposition of the client is the ability to hide the complexity of the underlying components. One way it achieves this is by providing an API that gives the impression of local execution of tasks, despite the fact they might be executing elsewhere. The code in Figure 2 shows the public methods of the RemoteTaskClient class.

Figure 2 Public Methods of the Class RemoteTaskClient

public class RemoteTaskClient<TResult> : IDisposable
{
  public void AddRequest(string typeName, 
    string[] parameters, CancellationToken tk)
  {...}
  public void AddRequest(string typeName, string[] parameters)
  {...}
  public Task<TResult>[] SubmitRequests()
  {...}
  public RemoteTaskClient(string taskCoodinatorEndpointAddress)
  {...}
  public void Dispose()
  {...}
}

You can use the AddRequest method to add requests for remote execution. For each request you need to specify the typeName (which is the type of the actual implementation containing the delegate that the infrastructure will run remotely as a TPL task) and the associated parameters. Then you can submit the requests via the SubmitRequest method. The result of submitting a request is an array of TPL tasks, one for each request. This approach will allow you to manage the resulting TPL tasks as you would if they were local. For example, you can submit various requests and wait for them to complete, like so:

using (var c = new RemoteTaskClient<int>("..."))
  {
    c.AddRequest("...", null);
    c.AddRequest("...", null);
    var ts = c.SubmitRequests();
    Task.WaitAll(ts);
    foreach (var t in ts)
      Console.WriteLine(t.Result);
  }

Before going into the details of the implementation of the RemoteTaskClient, let’s look at the service operations and the data contracts that the task coordinator exposes. Understanding these contracts before reviewing the implementation of the RemoteTaskClient will give you additional context because the implementation of the client relies on these services.

The code in Figure 3 shows the service operations the task coordinator exposes to the client. Through the SubmitRequest operation, the client has the ability to request the execution of one or more TPL tasks. The client can also request the cancellation of a particular TPL task that isn’t complete, via the CancelTask operation. Note that the UpdateStatus operation is a callback. It’s through a client-side implementation of this callback contract that the task coordinator will update the status at the client.

Figure 3 Service Operations

[ServiceContract(CallbackContract = typeof(ITaskUpdateCallback))]
  public interface ITaskCoordinator
  {
    [OperationContract(IsOneWay = true)]
    void SubmitRequest(List<STask> stask);
    [OperationContract]
    bool CancelTask(string Id);       
  }
  public interface ITaskUpdateCallback
  {
    [OperationContract (IsOneWay = true)]
    void UpdateStatus(string id, STaskStatus status, string result);
  }

Let’s look at the data contract that represents the task execution request. This is the data entity the client will send to the task coordinator, which in turn will submit the request to the task execution node where the actual execution will occur. The class STask, shown in Figure 4, models a task execution request. Using the properties STaskTypeName and STaskParameters, the client can set the type of task it wants to execute, with the relevant parameters. The task coordinator will use the property Id as a unique identifier that the logical layers can use to correlate the request with the actual TPL task running in the system.

Figure 4 The STask Class

[DataContract]
  public class STask
  {
    [DataMember]
    public string Id
    { get; set; }
    [DataMember]
    public string STaskTypeName
    { get; set; }
    [DataMember]
    public string[] STaskParameters
    { get; set; }
  }

Now let’s go back to the RemoteTaskClient and discuss how I’m planning to correlate the local TPL task with the result of the execution in the task execution nodes. The TPL has a convenient class, TaskCompletionSource<TResult>, I can use to create a TPL task and control its lifecycle. This mechanism lets me signal when a given task is completed, canceled or faulted. The implication here is that each request that goes to a task execution node (via the task coordinator) must be correlated to an instance of the TaskCompletionSource. For this, I implemented the class ClientRequestInfo, shown in Figure 5.

Figure 5 The Class ClientRequestInfo

internal class ClientRequestInfo<TResult>
{
  internal STask TaskExecutionRequest
  { get; set; }
  internal TaskCompletionSource<TResult> CompletionSource
  { get; set; }
  internal ClientRequestInfo(string typeName, string[] args)
  {
    TaskExecutionRequest = new STask()
      {Id = Guid.NewGuid().ToString(), STaskTypeName =typeName,
        STaskParameters = args };
    CompletionSource = new TaskCompletionSource<TResult>();
  }
}

Figure 6 shows the implementation of the constructor of this class.

Figure 6 The ClientRequestInfo Constructor

ITaskCoordinator _client;           
ConcurrentDictionary<string, ClientRequestInfo<TResult>> 
  _requests = new ConcurrentDictionary<string, 
  ClientRequestInfo<TResult>>();
  public RemoteTaskClient(string taskCoordinatorEndpointAddress)
  {           
    var factory = new DuplexChannelFactory<ITaskCoordinator>
       (new InstanceContext(new CallbackHandler<TResult>(_requests)),
       new NetHttpBinding(),
       new EndpointAddress(taskCoordinatorEndpointAddress));
    _client = factory.CreateChannel();
    ((IClientChannel)_client).Open();
  }

Notice I’m opening a duplex channel to the task coordinator and creating a callback instance of the type CallbackHandler. CallbackHandler receives as a parameter _requests, which contains instances of ClientRequestInfo. The rationale is that the _requests dictionary holds all the active instances of the client requests (and the instances of TaskCompletionSource that are associated with them), and the CallbackHandler will handle the updates from the task coordinator. Because multiple service requests will be updating the _requests dictionary, I need to guarantee thread-safety, hence the need for creating this as an instance of ConcurrentDictionary.

Figure 7 shows the implementation of the CallbackHandler class.

Figure 7 The CallbackHandler Class

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class CallbackHandler<TResult> : ITaskUpdateCallback
{
  ConcurrentDictionary<string, ClientRequestInfo<TResult>> _requests;
  public void UpdateStatus(string id, STaskStatus status, Object result)
    {
      ClientRequestInfo<TResult> info;
      if (_requests.TryRemove(id, out info))
      {                           
        switch (status)
        {
          case STaskStatus.
            Completed: info.CompletionSource.SetResult(
              (TResult)result);
            break;
          case STaskStatus.Canceled: 
            info.CompletionSource.SetCanceled();
            break;
          case STaskStatus.Faulted: 
            info.CompletionSource.SetException(
              (Exception)result);
            break;
        }
      }
}
  internal CallbackHandler(ConcurrentDictionary<string,
    ClientRequestInfo<TResult>> requests)
  {
    requests = requests;
  }
}

Next, let’s look at the implementation of the AddRequest and SubmitRequest methods, as shown in Figure 8.

Figure 8 AddRequest and SubmitRequest Methods

public void AddRequest(string typeName, string[] parameters, 
  CancellationToken tk)
{
  var info = new ClientRequestInfo<TResult>(typeName, args);
  _buffer.Add(info);
  tk.Register(()=> _client.CancelTask(info.TaskExecutionRequest.Id));
}
public void AddRequest(string typeName, string[] parameters)
  {
    _buffer.Add(new ClientRequestInfo<TResult>(typeName, parameters));
  }
public Task<TResult>[] SubmitRequests()
  {
    if (_buffer.Count == 0)
      return null;
    var req = _buffer.Select((r) =>
    {
      _requests.TryAdd(r.TaskExecutionRequest.Id, r);
      return r.TaskExecutionRequest;                
    });
    _client.SubmitRequest(req.ToList<STask>());
    var ret =  _buffer.Select(r =>
      r.CompletionSource.Task).ToArray<Task<TResult>>();
    _buffer.Clear();
    return ret;
  }

Tracking Client Requests

As you saw in the previous section, the client interacts solely with the task coordinator and it’s the responsibility of the task coordinator to handle the requests from the client and subsequently update the client with the results of the execution of the TPL task. As with the client, this requires persisting the original request in some form. It also requires keeping track of the corresponding callback instance (which allows communication with the client); the channel to the task execution nodes associated with the connection (needed, as you’ll see later, in cancellation scenarios); a unique identifier that groups all the task execution requests associated with a single call to a task execution node (to determine when the channel is no longer needed); as well as the status and result of the execution. Figure 9 shows the definition of the STaskInfo class, the entity that will hold this information. In addition, I’ll use a single instance of the ConcurrentDictionary<TKey,TValue> as the persistence mechanism.

Figure 9 The STaskInfo and CoordinatorContext Classes

public class STaskInfo
  {
    public string ExecutionRequestId
    { get; set; }
    public STask ClientRequest
    { get; set; }
    public ITaskUpdateCallback CallbackChannel
    { get; private set; }
    public ITaskExecutionNode ExecutionRequestChannel
    { get; set; }
    public STaskInfo(ITaskUpdateCallback callback)
    {
      CallbackChannel = callback;
    }
  }
public static class CoordinatorContext
{
...
private static readonly ConcurrentDictionary<string, STaskInfo> 
  _submissionTracker =
  new ConcurrentDictionary<string, STaskInfo>();
...
}

Finally, note that _submissionTracker is contained in class CoordinatorContext. I will use this class to implement the main functionality of the task coordinator.

Handling Client Requests

The task coordinator is the only point of entry for the clients, which means it must be able to handle as many client requests as possible while keeping the task execution nodes from becoming saturated (in terms of resources). This isn’t as easy as it might seem. To better explain the potential challenges, let’s look at a simplistic solution:

  1. The task coordinator exposes the service operation through which the clients submit task execution requests.
  2. The task coordinator submits these requests to the task execution nodes for their execution and keeps track of these requests—that is, it persists the state.

Figure 10 shows a basic implementation of this submission process.

Figure 10 Implementing the Submission Process

public class TaskCoordinatorService : ITaskCoordinator
{
...
  public void SubmitRequest(List<STask> stasks)
  {    
    CoordinatorContext.SendTasksToTaskHandler(stasks);
  }
...
}
public static class CoordinatorContext
{
...
  internal static void SendTaskRequestToTaskExecutionNode(List<STask> stasks)
  {
  var clientFactory = //Client factory creation logic.. 
  var channel = clientFactory.CreateChannel();
  foreach (var stask in stasks)
    _submissionTracker.TryAdd(stask.Id, stask);
  try
  {
    ((IClientChannel)channel).Open();
    channel.Start(stasks);
  }
  catch (CommunicationException ex)
  {   
    // Error handling and logging ...
  }
  finally
  {
    if (((IClientChannel)channel).State != CommunicationState.Faulted)               
       ((IClientChannel)channel).Close();
  }
}
...
}

However, this simplistic implementation wouldn’t work very well in some scenarios:

  • If the client submits a large number of tasks in a single request, all of them will end up in a single task execution node, resulting in an uneven utilization of the available resources (assuming there’s more than one task execution node available).
  • In peak load scenarios, the system might exhaust the available resources in the task execution nodes if the number of executing TPL tasks exceeds what those resources can handle. This might be the case when what’s been executed as a TPL task is bound to a particular resource (such as memory) that in peak cases can increase the risk of making the system unresponsive.

The Throttles

A way to address such challenges is to somehow “manage” the task execution requests as they go through the system. In this context, you can think of task coordinator as a throttling controller. Before I discuss the throttling process, however, let’s review the semantics of the throttles that, in conjunction with the throttling process, I’ll use to mitigate these risks.

The first scenario can be mitigated by capping the number of task execution requests the task coordinator can submit to the task execution nodes in a single request. I’ll call this throttle maxSTasks­PerRequest. Using this approach, the load balancer algorithm will be able to do its job of balancing the load across the available task execution nodes.

The second scenario is more challenging. A plausible solution is to cap the number of tasks the task execution nodes will execute at a particular number. I’ll refer to this throttle as maxNumberOfTasks.

In addition to this throttle, the solution could benefit from having another throttle that limits the number of tasks being executed based on their type. To explain why this is useful, let’s consider a scenario in which the task execution nodes have two types of tasks deployed, T1 and T2. T1 is CPU-bound and T2 is disk I/O-bound. In this scenario, the throughput of a client submitting requests for the execution of T1 tasks is more likely to be affected by active tasks that are bound by the same type of constraint—so the higher the number of T1 tasks the greater the impact. Because T2 tasks are bound by a different constraint, the impact they have on T1 tasks isn’t the same. Having the ability to limit the execution of tasks by type means I can control how many T1 tasks can be running at any particular time, allowing me to maximize CPU resources and, as a result, the overall throughput. I will refer to this throttle as maxNumberOfTasksByType.

Queuing and Throttling

Now that you understand the semantics of the throttles and how throttles can be effective for maintaining the healthy operation of the task execution nodes, let’s take a look at what happens when the limit specified by the throttles is reached—that is, the actual throttling process.

One option is to simply throw an exception. However, this would affect the overall throughput of the solution, because the client would incur the overhead of checking for a specific error or fault and then resubmitting the requests until the task coordinator could handle them successfully. An alternative would be to use server-side queuing to temporarily hold the requests from the client and a monitor-like process (a submitter process) that, at regular intervals, would read the requests from the queue and submit them to the task execution nodes. I’ll use the submitter process to perform the actual throttling because the submitter reads from the request queue by considering the following rules:

  1. Cap the number of requests that can be de-queued to maxSTasksPerRequest.
  2. If the throttle maxNumberOfTasks is reached, stop de-queueing requests and the request queue will remain as is.
  3. If the throttle maxNumberOfTasksByType is reached, de-queue and then enqueue the request back to the request queue. Enqueuing the request again allows the continuation of processing for tasks of other types. This strategy provides equal opportunity of execution for all the tasks in the queue. In some cases, however, you might consider using a priority queue instead. You’ll find a good reference at bit.ly/NF0xQq.

Figure 11illustrates this process.

The Submission Process
Figure 11 The Submission Process

I’ll start describing the implementation of this process by showing the code (see Figure 12) for the SubmitRequest service operation that enqueues the requests in the request queue as it receives the requests from the client.

Figure 12 The SubmitRequest Service Operation

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskCoordinatorService : ITaskCoordinator
  {
    public void SubmitRequest(List<STask> stasks)
    {           
      CoordinatorContext.EnqueueRequestsInRequestQ(stasks);
    }   
    ...
}
public static class CoordinatorContext
{
...
internal static void EnqueueRequestsInRequestQ(List<STask> stasks)
  {
    var callback =
      OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
    foreach (var stask in stasks)               
      _requestQ.Enqueue(new STaskInfo(callback) { ClientRequest = stask });
  }
...
}

Next, let’s look at the implementation of the submitter process, shown in Figure 13.

Figure 13 Submitter Implementation

public static class CoordinatorContext
{
  ...
static CoordinatorContext()
  {
    Submitter(...);
  }
  private static async void Submitter(int interval)
  {
    while (true)
    {
      await Task.Delay(interval);
      SendTaskRequestToTaskExecutionNode(
      GetTasksFromRequestQ());
    }
  }
  ...
}

In Figure 12 and Figure 13, you can see the service operation enqueing (writing) a request in the request queue and the submitter task de-queuing (reading) from the request queue. In this scenario, you need to ensure that the underlying data structure—the queue—­is thread-safe. Fortunately, there’s a class geared precisely for this, the ConcurrentQueue<T>. So I’ll use a single instance of this type as the underlying repository for the requests.

public static class CoordinatorContext
{
  ...
private static readonly ConcurrentQueue<STaskInfo> _requestQ =
  new ConcurrentQueue<STaskInfo>();
  ...
}

Now, let’s review the implementation of the method GetTasksFromRequestQ, which reads the tasks when the execution interval elapses. It’s in this method that the throttling process occurs and where the throttles I described earlier apply. Figure 14 shows an implementation of this process.

Figure 14 GetTasksFromRequestQ Implementation

public static class CoordinatorContext
{
  ...internal static List<STaskInfo> GetTasksFromRequestQ()
{
  var ret = new List<STaskInfo>();
  var maxSTasksPerRequest = //From a configuration
  var maxNumberOfTasks = //From a configuration
  var count =  // Count of submitted or executing tasks
  var countByType = // Enumerable of count by type
  for (int i = 0; i < maxSTasksPerRequest; i++)
  {
    STaskInfo info;
    if (count + i == maxNumberOfTasks || !_requestQ.TryDequeue(out info))
      return ret;
    var countTT = // Count of submitted or executing tasks of
                  // the type of the current item
    if (countTT == GetMaxNumberOfTasksByType(info.ClientRequest.STaskTypeName))
    { _requestQ.Enqueue(info); }
    else ret.Add(info);
  }
  return ret;
}
}
private static int GetMaxNumberOfTasksByType(string taskTypeName)
{
  // Logic to read from a configuration repository the value by task type name
}
...
}

The goal of the implementation in Figure 14 is obtaining the numbers that allow the process to assess the throttling conditions. Figure 15 shows the plausible LINQ queries that can be executed against the _submissionTracker, as well as a list containing the return items (ret) to obtain these values. Note that this approach might succeed at the expense of performance. If so, as an alternative you could implement a set of thread-safe counters that increment or decrement as items are added or removed from the submission tracker instance and use those counters instead of querying the concurrent dictionary directly.

Figure 15  The Throttling Values

var countByType = (from t in _submissionTracker.Values
                   group t by t.ClientRequest.STaskTypeName into g
                   select new
                   {
                      TypeName =  g.Key,
                      Count = g.Count()
                   });
var count = countByType.Sum(c => c.Count);
var countTT = (from tt in countByType
               where tt.TypeName == info.ClientRequest.STaskTypeName
               select tt.Count).SingleOrDefault()+ 
                   ret.Where((rt) => rt.ClientRequest.STaskTypeName == 
                   info.ClientRequest.STaskTypeName)
                   .Count();

Sending Requests to the Task Execution Nodes and Handling Results

So far I’ve discussed how the task coordinator manages the requests. Let’s look at how the task coordinator submits the request to the task execution nodes, now considering the throttling process. To provide better context, let’s first review the service operations that the task execution nodes expose (through the load balancer):

[ServiceContract( CallbackContract = typeof(ITaskUpdateCallback))]
  public interface ITaskExecutionNode
  {
    [OperationContract]
    void Start(List<STask> stask);
    [OperationContract]
    void Cancel(string Id);
  }

As their names suggest, the purposes of these operations are to start a list of task execution requests and to request cancellation of a particular task. The service contract leverages the same callback contract to update the task coordinator via an implementation of the contract.

Figure 16 shows an updated implementation of the method SendTaskToTaskExecutionNode where the task coordinator stores the STaskInfo instances in the _submissionTracker and calls the Start service operations on a task execution node.

Figure 16 The SendTaskToTaskExecutionNode and Supporting Methods

internal static void SendTaskRequestToTaskExecutionNode(List<STaskInfo> staskInfos)
  {
  if (staskInfos.Count() == 0)
    return;
  var channel = new DuplexChannelFactory<ITaskExecutionNode>(
                new InstanceContext(new CallbackHandler()),
                new NetHttpBinding(), new EndpointAddress(“https://.../”))
                .CreateChannel();
  try
  {
    var requestId = Guid.NewGuid().ToString();
    var reqs = staskInfos.Select(s => AddRequestToTracker(requestId,s, channel))
      .Where(s => s != null);
    ((IChannel)channel).Open();
    channel.Start(reqs.ToList<STask>());
  }
  catch (CommunicationException ex)
  {
    foreach (var stask in staskInfos)
      HandleClientUpdate(stask.ClientRequest.Id, STaskStatus.Faulted, ex);
  }
  }
private static STask AddRequestToTracker(string requestId,
  STaskInfo info, ITaskExecutionNode channel)
{
  info.ExecutionRequestId = requestId;
  info.ExecutionRequestChannel = channel;
  if (_submissionTracker.TryAdd(info.ClientRequest.Id, info))
    return info.ClientRequest;
  HandleClientUpdate(info.ClientRequest.Id, STaskStatus.Faulted,
    new Exception(“Failed to add “));
  return null;
}

Note that the method SendTaskToTaskExecutionNode creates a callback instance to handle the result of the execution of the task in a task execution node:

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
  public class CallbackHandler : ITaskUpdateCallback
  {
    public void UpdateStatus(string id, STaskStatus status, string result)
    {
      CoordinatorContext.HandleClientUpdate (id, status, result);
    }
  }

The CallbackHandler handles the callback operation by calling the HandleClientUpdate method. This method retrieves and removes the corresponding STaskInfo instance from the submitterTracker and performs a callback to the client to update the result. In addition, if this is the last request in the group, it closes the channel between the task coordinator and the task execution node. Figure 17 shows the implementation of the HandleClientUpdate method.

Figure 17 HandleClientUpdate and Supporting Methods

internal async static void HandleClientUpdate(
  string staskId, STaskStatus status, object result)
  {
    STaskInfo info;
    if (!_submissionTracker.TryGetValue(staskId, out info))
      throw new Exception(“Could not get task from the tracker”);
try
  {
    await Task.Run(() =>
      info.CallbackChannel.UpdateStatus(info.ClientRequest.Id, status, result));
    RemoveComplete(info.ClientRequest.Id);           
  }
catch(AggregateException ex)
  {
  // ...
  }
  }
private static void RemoveComplete(string staskId)
  {
    STaskInfo info;
    if (!_submissionTracker.TryRemove(staskId, out info))
      throw new Exception(“Failed to be removed from the tracking collection”);
    if (_submissionTracker.Values.Where((t) => t.ExecutionRequestId ==
      info.ExecutionRequestId).Count() == 0)
      CloseTaskRequestChannel((IChannel)info.ExecutionRequestChannel);
  }
  private static void CloseTaskRequestChannel(IChannel channel)
  {
    if (channel != null && channel.State != CommunicationState.Faulted)
      channel.Close();
  }

Task Implementer

In the client code, typeName is one of the required parameters when adding requests. This value eventually makes it to the task execution node. The value of typeName is the type name of the implementation of an interface that exposes a function delegate that encapsulates the functionality intended to run as a parallel task and that resides in all the task execution nodes. I’ll call this interface IRunnableTask. Implementers of this interface should expect to receive as parameters a cancellation token and an array of parameters from the client. The delegate should also return the result of the task. Here’s the interface:

public interface IRunnableTask
{
  Func<Object> Run(CancellationToken ct, params string[] taskArgs );
}

Starting a Task in a Task Execution Node

At a high level, a task execution node is responsible for “transforming” a task execution request into an actual task that the TPL can execute—that is, starting a TPL task. Figure 18 shows an implementation of this process, which I’ll discuss.

Figure 18 Starting a Task

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskExecutionNodeHandler : ITaskExecutionNode
{              
  public void Start(List<STask> stasks)
  {
    var callback =
      OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
    foreach (var t in stasks)
      TaskExecutionContext.Start(t,callback);           
  }
...
}
public static class TaskExecutionContext
{
...
internal static void Start(STask stask, ITaskUpdateCallback callback)
{
  try
  {
    // Step 1.a
    var rtasks = CompositionUtil.ContainerInstance.GetExports<IRunnableTask>();
    // Step 1.b
    var rtask = from t in rtasks
                where t.Value.GetType().FullName == stask.STaskTypeName
                select t.Value;
    // Step 2
    var cs = new CancellationTokenSource();
    var ct = cs.Token;
    TaskExecutionContext._cancellationSources.TryAdd(stask.Id, cs);
    // Step 3 
    Task<Object>
      .Run(rtask.First().Run(ct, stask.STaskParameters), ct)
      .ContinueWith(tes => UpdateStatus(tes, stask, callback));
  }
  catch (Exception ex)
  {
    ...
  }
}
...
}

Step 1 (a and b): At this stage, the task execution node needs to create an instance of IRunnableTask, which will return a delegate that will run as a task of the type requested by the client. For this, I leverage MEF and a new feature in the .NET Framework 4.5 that allows an attribute-free configuration approach. The code in Figure 19 creates a single container instance that exports all the implementations of IRunnableTask located in the directory “extensions.” For more information about MEF and the attribute-free configuration approach, please see the June 2012 MSDN Magazine article, “An Attribute-Free Approach to Configuring MEF” at msdn.microsoft.com/magazine/jj133818.

Figure 19 Creating a Container

internal static  class CompositionUtil
{
  private readonly static Lazy<CompositionContainer> _container =
    new Lazy<CompositionContainer>(() =>
    {
      var builder = new RegistrationBuilder();
      builder.ForTypesDerivedFrom<IRunnableTask>()                         
             .Export<IRunnableTask>()
             .SetCreationPolicy(CreationPolicy.NonShared);
      var cat = new DirectoryCatalog(“extensions”, builder);
      return new CompositionContainer(cat, true, null);
    }
  ,true);
  internal static CompositionContainer ContainerInstance
  {
    get { return _container.Value; }
  } 
}

Now let’s go back to the code in Figure 18. The code uses the container to get the exports of the type IRunnableTask, then selects the instance with the type name that matches the client request. Note that I make the key assumption that there’s only one task instance that corresponds to the type requested by the client. This is the reason I use the first instance that the LINQ query returns.

Step 2: Before actually creating the TPL task, the code creates a Cancellation Token Source and a Cancellation Token. I’ll keep track of the Cancellation Source in a single instance of a ConcurrentDictionary<TKey,TValue>. The task execution node will use this list of cancellation sources when a client requests a cancellation. Here’s the definition of this instance:

public static class TaskExecutionContext
{
...
private readonly static ConcurrentDictionary<string, 
  CancellationTokenSource> _cancellationSources =
  new ConcurrentDictionary<string, CancellationTokenSource>();
...
}

Step 3: At this point, I run the task, with the cancellation token I just created. The task is followed by a continuation task.The need for a continuation task arises because the task coordinator must be updated, by making a service call, with the result of the execution once the TPL task has completed (successfully or with a fault). As Figure 20 shows, I encapsulate the process of updating the task coordinator in a delegate that receives as a parameter the TPL task, the task execution request and a callback instance to the task coordinator.

Figure 20 Encapsulating the Update Process

private static Action<Task<Object>, STask, 
  ITaskUpdateCallback> UpdateStatus = (t, st, cb) =>
  {
    try
    {
      STaskStatus s;
      Object r = null;
      switch (t.Status)
      {
        case TaskStatus.Canceled: s = STaskStatus.Canceled;
          break;
        case TaskStatus.Faulted:
          s = STaskStatus.Faulted;
          r = t.Exception.Flatten();
          break;
        case TaskStatus.RanToCompletion:
          s = STaskStatus.Completed;
          r = t.Result;
          break;
        default:
          s = STaskStatus.Faulted;
          r = new Exception("Invalid Status");
          break;
      }
      CancellationTokenSource cs;
      TaskExecutionContext._cancellationSources.TryRemove(st.Id, out cs);
      cb.UpdateStatus(st.Id, s, r);
    }
    catch (Exception ex)
  {
  // Error handling
  }
};

Requesting and Handling a Cancellation

The TPL provides a mechanism for implementing task cancellation. For this, the delegate that encapsulates the actual process that’s running as a TPL task needs to respond to the cancellation request and terminate the execution. For more information about task cancellation, see the MSDN Library article, “Task Cancellation,” at bit.ly/NYVTO0.

One of the parameters in the IRunnableTask interface is a cancellation token. The task execution node will create a token for each task and it’s up to the implementer of the interface to determine when to check for a cancellation request and terminate the process gracefully. The code in Figure 21 shows a simple task that calculates the number of even numbers in a range, while checking if a cancellation has been requested.

Figure 21 Checking for a Cancellation

public class MySimpleCTask : IRunnableTask
{
  public Func<Object> Run(Nullable<CancellationToken> ct, 
    params string[] taskArgs)
  {
    var j = int.Parse(taskArgs[0]);
    var z = 0;
    return (() =>
  {
      for (int i = 0; i < j; i++)
      {
        if (i % 2 != 0)
        {
          z++;
          ct.Value.ThrowIfCancellationRequested();
        }
      }
      return z;
   });
  }
}

As you saw when I discussed the client, you can add a request with a cancellation token and internally the client performs the necessary subscription. So when a cancellation is raised, a cancel request is sent to the task coordinator. Upon receiving the cancellation request, the task coordinator checks whether the request has been submitted to a task execution node and sends a cancellation request. The task execution node then looks for the cancellation source that corresponds to the task requested by the client Id. Submitting the cancellation request to the task execution node is relatively simple—you just need to locate the channel that corresponds to the request where the task coordinator initially submitted the task execution request. These channels need to be kept open for the callbacks that update the status of the execution request.

Figure 22 shows the implementation of the service operations in the task coordinator.

Figure 22 Implementing the Service Operations in the Task Coordinator

public class TaskCoordinatorService : ITaskCoordinator
{
...
public bool CancelTask(string Id)
  {
    return CoordinatorContext.CancelTask(Id);
  }
  ...}
public static class CoordinatorContext
{
...
internal static bool CancelTask(string Id)
{
STaskInfo info;
if(_submissionTracker.TryGetValue(
  Id, out info) && info.ExecutionRequestChannel != null)
{
  info.ExecutionRequestChannel.Cancel(Id);
  return true;
}
return false;
}
  ...
}

Finally, Figure 23 shows the implementation of the service operations in the task execution nodes.

Figure 23 Implementing the Service Operations in the Task Execution Nodes

class CancellationHandler : ICancellationHandler
  {
  public void Cancel(STask stask)
  {
    TaskExecutionContext.CanceTask(stask.Id);
  }
  }
public static class TaskExecutionContext
{
...
  internal static void CancelTask(string Id)
  {
    CancellationTokenSource tknSrc;
    if (_cancellationSources.TryGetValue(Id, out tknSrc))
      tknSrc.Cancel(); }
...
}

Scalability of the Task Coordinator and Other Considerations

It’s worth noting that this implementation assumes the task coordinator runs on a single node, but it’s quite possible to scale out the task coordinator (this would require, at the least, the following changes):

  • A load balancer for accessing the task coordinator would need to be introduced.
  • As I described earlier, the key to the throttling approach is having an accurate count of the number of tasks that are running, in total and by type. In a scenario with more than one node running as task coordinators, these counters will need to be maintained centrally (for example, in a database) while still being able to be updated or read in a synchronized way (avoiding race conditions, deadlocks and so forth).

Finally, let me note that as with any development approach, the risk and value need to be weighed against other alternatives that might meet your needs and that are available off the shelf. For instance, you might want to consider technologies such as the Microsoft HPC server as a plausible solution for many scenarios that you otherwise might think of addressing based on the approach described in this article.

Optimizing Resources

The TPL provides the necessary infrastructure to achieve the most optimal utilization of CPU resources within a single multicore computer, and it’s also useful for implementing an approach that scales across computer boundaries. This can be helpful for workload automation and batch-processing scenarios where parallelism is required not only in a single multicore server, but across multiple servers as well.

To achieve this horizontal scalability, several architectural considerations need to be taken into account. Key among them: the need to balance the load across the existing resources while having the ability to add more resources to the existing farm, and the ability to throttle the resources according to the semantics of the tasks that need to be executed. Microsoft development tools and technologies provide the necessary building blocks to implement an architecture that takes into account these key considerations.


Jesus Aguilar works for Microsoft in the area of Premier Support for Developers as a senior application development manager.

Thanks to the following technical experts for reviewing this article: Ryan Berry, Steve Case, Rick Claude and Piyush Joshi