Compartir a través de


TPL

Escalabilidad horizontal para la ejecución paralela de tareas

Jesus Aguilar

Descargar el ejemplo de código

La biblioteca TPL (Task Parallel Library, biblioteca de procesamiento paralelo basado en tareas), presentada en Microsoft .NET Framework 4, permite que los desarrolladores de aplicaciones creen soluciones que sacan partido del poder del procesamiento en paralelo de los equipos de varios núcleos. En muchos casos, sin embargo, la capacidad de escalar verticalmente (al agregar más núcleos) está limitada por varios factores, los que incluyen limitaciones de costo y de hospedaje. En tales casos, si se necesita escalar, conviene distribuir el procesamiento en una matriz de servidores; un ejemplo de esto es el hospedaje en nube. En este artículo, describiré los aspectos claves (junto con una implementación) de una solución conceptual para lograr esto usando muchas de las nuevas características de .NET Framework 4.5.

Supuestos básicos

El enfoque que describiré exige varias tecnologías que van más allá del alcance de TPL, e incluyen:

  • Biblioteca TPL
  • Windows Communication Foundation (WCF)
  • Managed Extensibility Framework (MEF)

Observe que solo analizaré estos marcos en el contexto del problema que pretendo solucionar. Aquí se da por entendido que tiene un buen dominio de estas tecnologías.

Cliente de tareas remotas, coordinador de tareas y nodos de ejecución de tareas

El cliente de tareas remotas es la capa del lado cliente que oculta la complejidad que resulta de la semántica asociada al uso de un entorno distribuido. El cliente de tareas remotas interactúa directamente con el coordinador de tareas, que en ese momento se convierte en el punto de entrada a la infraestructura subyacente. En un nivel alto, el coordinador de tareas tiene los siguientes atributos:

  1. Es el único punto de contacto con los clientes.
  2. Expone los servicios necesarios para solicitar la ejecución de tareas en la plataforma escalable, además de la cancelación de una tarea dada.
  3. Manipula la limitación y puesta en cola de las solicitudes de ejecución de tareas, lo que aporta a la operación saludable del entorno.

Los nodos de ejecución de tareas son los hosts de los procesos en los cuales se ejecutarán las tareas. Las implementaciones que efectivamente se generan de las tareas que ejecutará TPL residen en los nodos de ejecución de tareas.

Estos son algunos aspectos claves de esas capas lógicas y del flujo de información:

  1. El cliente de tareas remotas solicita la ejecución de una o más tareas.
  2. El coordinador de tareas envía la solicitud a los nodos de ejecución de tareas.
  3. Los nodos de ejecución de tareas ejecutan las tareas y actualizan el estado de cada solicitud en el coordinador de tareas.
  4. El coordinador de tareas actualiza al cliente con los resultados de la ejecución de cada solicitud.
  5. Los nodos de ejecución de tareas residen detrás de un equilibrador de carga, de manera que se pueden agregar más nodos cuando sea necesario, lo que brinda la capacidad de escalar horizontalmente.

En la Ilustración 1 se describen las capas lógicas y el flujo de la información.

Scaling Tasks HorizontallyIlustración 1 Escalado horizontal de las tareas

Observe cómo los nodos de ejecución actualizan al coordinador de tareas, que luego actualiza a su vez al cliente de tareas remotas. Voy a describir una implementación basada en una comunicación bidireccional entre el cliente y el coordinador de tareas y, además, del coordinador de tareas y los nodos de ejecución de tareas. En términos de WCF, esto implica el uso de un canal dúplex que permite que los nodos de ejecución de tareas devuelvan la llamada al coordinador de tareas y, posteriormente, que el coordinador de tareas haga lo mismo para actualizar al cliente. Mostraré el uso de WebSockets para lograr esta comunicacional bidireccional. El transporte de WebSockets está implementado como un nuevo enlace en .NET Framework 4.5 y está disponible para Windows 8. Encontrará más información acerca del enlace en bit.ly/SOLNiU.

Cliente y coordinador de tareas

Ahora que entiende las tres principales capas lógicas: el cliente de tareas remotas, el coordinador de tareas y el nodo de ejecución de tareas, comencemos por analizar la implementación del cliente de tareas remotas. Observe que, cuando uso el término “cliente” a lo largo del artículo, me refiero al cliente de tareas remotas.

Tal como mencioné anteriormente, la proposición de valor del cliente es la capacidad de ocultar la complejidad de los componentes subyacentes. Una manera de lograr esto es al proporcionar una API que dé la impresión de una ejecución local de las tareas, pese a que se puedan estar ejecutando en otros lugares. En el código de la Ilustración 2 aparecen los métodos públicos de la clase RemoteTaskClient.

Ilustración 2 Métodos públicos de la clase RemoteTaskClient

public class RemoteTaskClient<TResult> : IDisposable
{
  public void AddRequest(string typeName, 
    string[] parameters, CancellationToken tk)
  {...}
  public void AddRequest(string typeName, string[] parameters)
  {...}
  public Task<TResult>[] SubmitRequests()
  {...}
  public RemoteTaskClient(string taskCoodinatorEndpointAddress)
  {...}
  public void Dispose()
  {...}
}

Puede usar el método AddRequest para agregar solicitudes de ejecución remota. Para cada solicitud deberá especificar el typeName (el tipo de la implementación propiamente tal, que contiene el delegado que la infraestructura ejecutará en forma remota como una tarea de TPL) y los parámetros pertinentes. Después, podrá enviar las solicitudes mediante el método SubmitRequest. El resultado de enviar una solicitud es una matriz de tareas de TPL, una para cada solicitud. Este sistema le permitirá administrar las tareas de TPL restantes igual como si fueran tareas locales. Por ejemplo, puede enviar varias solicitudes y esperar que se completen:

using (var c = new RemoteTaskClient<int>("..."))
  {
    c.AddRequest("...", null);
    c.AddRequest("...", null);
    var ts = c.SubmitRequests();
    Task.WaitAll(ts);
    foreach (var t in ts)
      Console.WriteLine(t.Result);
  }

Antes de profundizar más en los detalles de la implementación de RemoteTaskClient, observemos las operaciones de servicio y los contratos de datos que expone el coordinador de tareas. Si entiende estos contratos antes de revisar la implementación de RemoteTaskClient, obtendrá contexto adicional, ya que la implementación del cliente se vale de estos servicios.

En el código que aparece en la Ilustración 3 se pueden apreciar las operaciones de servicio que expone el coordinador de tareas al cliente. Mediante la operación SubmitRequest, el cliente tiene la capacidad de solicitar la ejecución de una o más tareas de TPL. El cliente también puede solicitar la cancelación de una tarea TPL específica que no esté completa, mediante la operación CancelTask. Observe que la operación UpdateStatus es una devolución de llamada. El coordinador de tareas actualiza el estado en el cliente mediante la implementación del lado cliente de este contrato de devolución de llamada.

Ilustración 3 Operaciones de servicio

[ServiceContract(CallbackContract = typeof(ITaskUpdateCallback))]
  public interface ITaskCoordinator
  {
    [OperationContract(IsOneWay = true)]
    void SubmitRequest(List<STask> stask);
    [OperationContract]
    bool CancelTask(string Id);       
  }
  public interface ITaskUpdateCallback
  {
    [OperationContract (IsOneWay = true)]
    void UpdateStatus(string id, STaskStatus status, string result);
  }

Echemos una mirada al contrato de datos que representa la solicitud de ejecución de tarea. Esta es la entidad de datos que el cliente enviará al coordinador de tareas que, como respuesta, enviará la solicitud al nodo de ejecución de tareas, donde se producirá la ejecución propiamente tal. La clase STask, que se muestra en la Ilustración 4, modela una solicitud de ejecución de tarea. Al usar las propiedades STaskTypeName y STaskParameters, el cliente puede configurar el tipo de tarea que desea ejecutar, con los parámetros pertinentes. El coordinador de tareas usará el identificador de propiedad como un identificador único que se emplea en las capas lógicas para correlacionar la solicitud con la tarea de TPL concreta que se ejecuta en el sistema.

Ilustración 4 Clase STask

[DataContract]
  public class STask
  {
    [DataMember]
    public string Id
    { get; set; }
    [DataMember]
    public string STaskTypeName
    { get; set; }
    [DataMember]
    public string[] STaskParameters
    { get; set; }
  }

Regresemos ahora a RemoteTaskClient y veamos cómo pretendo correlacionar la tarea de TPL local con el resultado de la ejecución en los nodos de ejecución de tareas. TPL tiene una clase conveniente, TaskCompletionSource<TResult>, que me resulta útil para crear una tarea de TPL y controlar su ciclo de vida. Este mecanismo me permite señalar cuándo se completa, cancela u ocurre un error en una tarea dada. Esto implica que cada solicitud que va al nodo de ejecución de tareas (a través del coordinador de tareas) se debe correlacionar con una instancia de TaskCompletionSource. Para lograr esto, implementé la clase ClientRequestInfo, que se muestra en la Ilustración 5.

Ilustración 5 Clase ClientRequestInfo

internal class ClientRequestInfo<TResult>
{
  internal STask TaskExecutionRequest
  { get; set; }
  internal TaskCompletionSource<TResult> CompletionSource
  { get; set; }
  internal ClientRequestInfo(string typeName, string[] args)
  {
    TaskExecutionRequest = new STask()
      {Id = Guid.NewGuid().ToString(), STaskTypeName =typeName,
        STaskParameters = args };
    CompletionSource = new TaskCompletionSource<TResult>();
  }
}

En la Ilustración 6 se muestra la implementación del constructor de esta clase.

Ilustración 6 Constructor de la clase ClientRequestInfo

ITaskCoordinator _client;           
ConcurrentDictionary<string, ClientRequestInfo<TResult>> 
  _requests = new ConcurrentDictionary<string, 
  ClientRequestInfo<TResult>>();
  public RemoteTaskClient(string taskCoordinatorEndpointAddress)
  {           
    var factory = new DuplexChannelFactory<ITaskCoordinator>
       (new InstanceContext(new CallbackHandler<TResult>(_requests)),
       new NetHttpBinding(),
       new EndpointAddress(taskCoordinatorEndpointAddress));
    _client = factory.CreateChannel();
    ((IClientChannel)_client).Open();
  }

Observe que aquí abro un canal dúplex hacia el coordinador de tareas y creo una instancia de devolución de llamadas del tipo CallbackHandler. CallbackHandler recibe _requests como parámetro, el cual contiene instancias de ClientRequestInfo. La razón detrás de esto es que el diccionario _requests contiene todas las instancias de las solicitudes de los clientes (y las instancias de TaskCompletionSource asociadas a estas) y CallbackHandler procesará las actualizaciones del coordinador de tareas. Como el diccionario _requests se va a actualizar con varias solicitudes de servicio, debo garantizar la seguridad de los subprocesos. Por esta razón se crea como una instancia de ConcurrentDictionary.

En la Ilustración 7 se puede apreciar la implementación de la clase CallbackHandler.

Ilustración 7 Clase CallbackHandler

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class CallbackHandler<TResult> : ITaskUpdateCallback
{
  ConcurrentDictionary<string, ClientRequestInfo<TResult>> _requests;
  public void UpdateStatus(string id, STaskStatus status, Object result)
    {
      ClientRequestInfo<TResult> info;
      if (_requests.TryRemove(id, out info))
      {                           
        switch (status)
        {
          case STaskStatus.
            Completed: info.CompletionSource.SetResult(
              (TResult)result);
            break;
          case STaskStatus.Canceled: 
            info.CompletionSource.SetCanceled();
            break;
          case STaskStatus.Faulted: 
            info.CompletionSource.SetException(
              (Exception)result);
            break;
        }
      }
}
  internal CallbackHandler(ConcurrentDictionary<string,
    ClientRequestInfo<TResult>> requests)
  {
    requests = requests;
  }
}

A continuación, pasemos a la implementación de los métodos AddRequest y SubmitRequest, que aparecen en la Ilustración 8.

Ilustración 8 Métodos AddRequest y SubmitRequest

public void AddRequest(string typeName, string[] parameters, 
  CancellationToken tk)
{
  var info = new ClientRequestInfo<TResult>(typeName, args);
  _buffer.Add(info);
  tk.Register(()=> _client.CancelTask(info.TaskExecutionRequest.Id));
}
public void AddRequest(string typeName, string[] parameters)
  {
    _buffer.Add(new ClientRequestInfo<TResult>(typeName, parameters));
  }
public Task<TResult>[] SubmitRequests()
  {
    if (_buffer.Count == 0)
      return null;
    var req = _buffer.Select((r) =>
    {
      _requests.TryAdd(r.TaskExecutionRequest.Id, r);
      return r.TaskExecutionRequest;                
    });
    _client.SubmitRequest(req.ToList<STask>());
    var ret =  _buffer.Select(r =>
      r.CompletionSource.Task).ToArray<Task<TResult>>();
    _buffer.Clear();
    return ret;
  }

Seguimiento de solicitudes de los clientes

Como pudo apreciar en la sección pasada, el cliente interactúa únicamente con el coordinador de tareas y este es el responsable de manipular las solicitudes del cliente y actualizar posteriormente al cliente con los resultados de la ejecución de la tarea de TPL. Igual que ocurre con el cliente, para esto hay que conservar la solicitud original de alguna forma. También hay que realizar un seguimiento de la instancia de devolución de llamada correspondiente (que permite la comunicación con el cliente); el canal hacia los nodos de ejecución de tareas asociados con la conexión (necesario, como verá más adelante, en las situaciones de cancelación); un identificador único que agrupe todas las solicitudes de ejecución de tareas asociadas con una misma llamada a un nodo de ejecución de tareas (para definir si el canal ya no es necesario); además del estado y el resultado de la ejecución. En la Ilustración 9 aparece la definición de la clase STaskInfo, la entidad que contiene toda esta información. Además, usaré una instancia única de ConcurrentDictionary<TKey,TValue> como mecanismo de persistencia.

Ilustración 9 Clases STaskInfo y CoordinatorContext

public class STaskInfo
  {
    public string ExecutionRequestId
    { get; set; }
    public STask ClientRequest
    { get; set; }
    public ITaskUpdateCallback CallbackChannel
    { get; private set; }
    public ITaskExecutionNode ExecutionRequestChannel
    { get; set; }
    public STaskInfo(ITaskUpdateCallback callback)
    {
      CallbackChannel = callback;
    }
  }
public static class CoordinatorContext
{
...
private static readonly ConcurrentDictionary<string, STaskInfo> 
  _submissionTracker =
  new ConcurrentDictionary<string, STaskInfo>();
...
}

Por último, observe que _submissionTracker está contenido en la clase CoordinatorContext. Usaré esta clase para implementar la funcionalidad principal del coordinador de tareas.

Control de las solicitudes de los clientes

El coordinador de tareas es el único punto de entrada de los clientes, lo que significa que debe ser capaz de controlar todas las solicitudes posibles de los clientes y evitar, al mismo tiempo, que los nodos de ejecución de tareas se saturen (en términos de recursos). Esto no es tan fácil como se podría creer. Para explicar mejor las potenciales dificultades, veamos una solución simplista:

  1. El coordinador de tareas expone la operación del servicio a través de la cual el cliente envía las solicitudes de ejecución de tarea.
  2. El coordinador de tareas envía estas solicitudes a los nodos de ejecución de tareas para ejecutarlas y realiza un seguimiento de estas; es decir, conserva el estado.

En la Ilustración 10 se observa una implementación básica de este proceso de envío.

Ilustración 10 Implementación del proceso de envío

public class TaskCoordinatorService : ITaskCoordinator
{
...
  public void SubmitRequest(List<STask> stasks)
  {    
    CoordinatorContext.SendTasksToTaskHandler(stasks);
  }
...
}
public static class CoordinatorContext
{
...
  internal static void SendTaskRequestToTaskExecutionNode(List<STask> stasks)
  {
  var clientFactory = //Client factory creation logic.. 
  var channel = clientFactory.CreateChannel();
  foreach (var stask in stasks)
    _submissionTracker.TryAdd(stask.Id, stask);
  try
  {
    ((IClientChannel)channel).Open();
    channel.Start(stasks);
  }
  catch (CommunicationException ex)
  {   
    // Error handling and logging ...
  }
  finally
  {
    if (((IClientChannel)channel).State != CommunicationState.Faulted)               
       ((IClientChannel)channel).Close();
  }
}
...
}

Pero esta implementación simplista no funcionará muy bien en algunas situaciones:

  • Si el cliente envía una gran cantidad de tareas en una sola solicitud, todas ellas irán a dar a un solo nodo de ejecución de tareas, lo que resulta en el uso desigual de los recursos disponibles (si suponemos que hay más de un nodo de ejecución de tareas disponible).
  • En las situaciones de demanda máxima, el sistema puede agotar los recursos disponibles en los nodos de ejecución de tareas si la cantidad de tareas de TPL que se ejecuta supera la capacidad de los recursos. Esto puede ocurrir cuando lo que se ejecuta como una tarea de TPL está enlazado a un recurso determinado (por ejemplo, memoria), lo que en el caso de demanda máxima puede aumentar el riesgo de que el sistema deje de responder.

Limitadores

Una forma de enfrentar estos problemas es “administrar” de alguna forma las solicitudes de ejecución de tareas, a medida que atraviesan el sistema. En este contexto, podemos entender los coordinadores de tareas como controladores limitantes. Pero antes de analizar el proceso de limitación, revisemos la semántica de los limitadores que emplearé, en conjunto con el proceso de limitación, para mitigar esos riesgos.

El primer caso se puede mitigar al poner un tope al número de solicitudes de ejecución de tarea que puede enviar el coordinador de tareas a los nodos de ejecución de tareas dentro de una misma solicitud. Denominaré a este limitador maxSTasks­PerRequest. Así, el algoritmo del equilibrador de carga podrá cumplir su función de equilibrar la carga entre los nodos de ejecución disponibles.

El segundo caso es más difícil. Una solución posible es poner un límite al número de tareas que pueden ejecutar los nodos de ejecución de tareas. Me referiré a esta limitación como maxNumberOfTasks.

Además de esta limitación, la solución podría mejorar con una limitación adicional que limite el número de tareas que se ejecutan, según su tipo. Para explicar la utilidad de esto, veamos un caso donde se implementaron dos tipos de tareas en los nodos de ejecución de tareas, T1 y T2. T1 está limitada por la CPU y T2 está limitada por la E/S de disco. En este caso, es más probable que la capacidad de proceso de un cliente que envía solicitudes de ejecución de tareas del tipo T1 se vea afectada por otras tareas activas que están limitadas del mismo modo; así que mientras más tareas del tipo T1, mayor será su impacto. Como las tareas del tipo T2 están limitadas por un recurso diferente, el impacto que tienen en las tareas del tipo T1 no es el mismo. Al contar con la posibilidad de limitar la ejecución de las tareas según su tipo, puedo controlar la cantidad de tareas del tipo T1 que se pueden ejecutar en un momento dado, lo que me permite maximizar los recursos de procesamiento y, por ende, la capacidad de proceso general. Me referiré a esta limitación como maxNumberOfTasksByType.

Colas y limitaciones

Ahora que ya conoce la semántica de las limitaciones y como estas pueden resultar útiles para mantener la operación de los nodos de ejecución de tareas en buenas condiciones, echemos un vistazo a lo que pasa cuando se alcanza el límite especificado por las limitaciones, es decir, al proceso de limitación propiamente tal.

Una opción posible sería generar simplemente una excepción. Sin embargo, esto podría afectar la capacidad de proceso general de la solución, puesto que el cliente deberá gastar recursos adicionales en comprobar un error específico y después volver a enviar las solicitudes hasta que el coordinador de tareas pueda controlarlas en forma satisfactoria. Una alternativa sería usar colas del lado servidor para suspender temporalmente las solicitudes del cliente, junto con un proceso parecido a un monitor (un proceso de envío) que, en intervalos regulares, lea las solicitudes de la cola y las envíe a los nodos de ejecución de tareas. Usaré el proceso de envío para realizar la limitación propiamente tal, ya que el remitente para leer de la cola de solicitud tiene en cuenta las siguientes reglas:

  1. Poner tope al número de solicitudes que se pueden retirar de la cola: maxSTasksPerRequest.
  2. Si se alcanza el límite maxNumberOfTasks, detener las solicitudes de retiro de cola y conservar la cola tal cual está.
  3. Si se alcanza el límite maxNumberOfTasksByType, retirar la solicitud de la cola y luego ponerla en la cola de solicitudes. Al volver a poner la solicitud en la cola, se pueden seguir procesando las tareas de otros tipos. Esta estrategia proporciona igualdad de oportunidades para la ejecución de todas las tareas que se encuentran en la cola. En algunos casos, sin embargo, le podría convenir el uso de una cola de prioridad en vez de esto. Encontrará una buena referencia en bit.ly/NF0xQq.

En la Ilustración 11 se muestra este proceso.

The Submission Process
Ilustración 11 Proceso de envío

Comenzaré la descripción de la implementación de este proceso con el código (ver Ilustración 12) de la operación de servicio SubmitRequest, que pone las solicitudes en la cola de solicitudes, a medida que va recibiendo las solicitudes del cliente.

Ilustración 12 Operación de servicio SubmitRequest

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskCoordinatorService : ITaskCoordinator
  {
    public void SubmitRequest(List<STask> stasks)
    {           
      CoordinatorContext.EnqueueRequestsInRequestQ(stasks);
    }   
    ...
}
public static class CoordinatorContext
{
...
internal static void EnqueueRequestsInRequestQ(List<STask> stasks)
  {
    var callback =
      OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
    foreach (var stask in stasks)               
      _requestQ.Enqueue(new STaskInfo(callback) { ClientRequest = stask });
  }
...
}

A continuación, veamos la implementación del proceso del remitente, que aparece en la Ilustración 13.

Ilustración 13 Implementación del remitente

public static class CoordinatorContext
{
  ...
static CoordinatorContext()
  {
    Submitter(...);
  }
  private static async void Submitter(int interval)
  {
    while (true)
    {
      await Task.Delay(interval);
      SendTaskRequestToTaskExecutionNode(
      GetTasksFromRequestQ());
    }
  }
  ...
}

En la Ilustración 12 y en la Ilustración 13 se puede apreciar cómo la operación de servicio pone una solicitud en la cola de solicitudes (escritura) y cómo la tarea remitente la saca de la cola de solicitudes (lectura). En esta situación, debemos asegurarnos de que la estructura de datos subyacente, es decir, la cola, sea segura para los subprocesos. Afortunadamente, existe una clase preparada precisamente para esto: ConcurrentQueue<T>. Por lo tanto, usaré una única instancia de este tipo como el repositorio subyacente para las solicitudes.

public static class CoordinatorContext
{
  ...
private static readonly ConcurrentQueue<STaskInfo> _requestQ =
  new ConcurrentQueue<STaskInfo>();
  ...
}

Ahora, revisemos la implementación del método GetTasksFromRequestQ, que lee las tareas cuando concluye el intervalo de ejecución. Es en este método que se produce el proceso de limitación y donde se aplican los limitadores que describí anteriormente. En la Ilustración 14 se muestra una implementación básica de este proceso.

Ilustración 14 Implementación de GetTasksFromRequestQ

public static class CoordinatorContext
{
  ...internal static List<STaskInfo> GetTasksFromRequestQ()
{
  var ret = new List<STaskInfo>();
  var maxSTasksPerRequest = //From a configuration
  var maxNumberOfTasks = //From a configuration
  var count =  // Count of submitted or executing tasks
  var countByType = // Enumerable of count by type
  for (int i = 0; i < maxSTasksPerRequest; i++)
  {
    STaskInfo info;
    if (count + i == maxNumberOfTasks || !_requestQ.TryDequeue(out info))
      return ret;
    var countTT = // Count of submitted or executing tasks of
                  // the type of the current item
    if (countTT == GetMaxNumberOfTasksByType(info.ClientRequest.STaskTypeName))
    { _requestQ.Enqueue(info); }
    else ret.Add(info);
  }
  return ret;
}
}
private static int GetMaxNumberOfTasksByType(string taskTypeName)
{
  // Logic to read from a configuration repository the value by task type name
}
...
}

El objetivo de la implementación en la Ilustración 14 es obtener los números que permitan que el proceso pueda evaluar las condiciones de limitación. En la Ilustración 15 vemos las consultas LINQ posibles que se pueden ejecutar frente a _submissionTracker, además de una lista que contiene los elementos devueltos (ret) para obtener estos valores. Observe que este sistema puede funcionar a costa del rendimiento. De ser así, podría implementar, como alternativa, un conjunto de contadores seguros para subprocesos que aumentan o disminuyen en la medida que se agregan o retiran elementos de la instancia de seguimiento de envío y usar esos contadores en vez de consultar el diccionario concurrente en forma directa.

Ilustración 15 Valores de limitación

var countByType = (from t in _submissionTracker.Values
                   group t by t.ClientRequest.STaskTypeName into g
                   select new
                   {
                      TypeName =  g.Key,
                      Count = g.Count()
                   });
var count = countByType.Sum(c => c.Count);
var countTT = (from tt in countByType
               where tt.TypeName == info.ClientRequest.STaskTypeName
               select tt.Count).SingleOrDefault()+ 
                   ret.Where((rt) => rt.ClientRequest.STaskTypeName == 
                   info.ClientRequest.STaskTypeName)
                   .Count();

Envío de solicitudes a los nodos de tarea y resultados de control

Hasta ahora analicé la forma en que el coordinador de tareas controla las solicitudes. Veamos cómo el coordinador de tareas envía la solicitud a los nodos de ejecución de tareas, al tomar en cuenta, esta vez, el proceso de limitación. Para proporcionar un contexto más adecuado, revisemos primero las operaciones de servicio que exponen los nodos de ejecución de tareas (a través del equilibrador de carga):

[ServiceContract( CallbackContract = typeof(ITaskUpdateCallback))]
  public interface ITaskExecutionNode
  {
    [OperationContract]
    void Start(List<STask> stask);
    [OperationContract]
    void Cancel(string Id);
  }

Como lo sugiere el nombre, la finalidad de estas operaciones es iniciar una lista de solicitudes de ejecución de tareas y solicitar la cancelación de una tarea puntual. El contrato del servicio aprovecha el mismo contrato de devolución de llamada para actualizar al coordinador de tareas mediante una implementación del contrato.

En la Ilustración 16 se muestra una implementación actualizada del método SendTaskToTaskExecutionNode, donde el coordinador de tareas almacena las instancias de STaskInfo en _submissionTracker y llama a las operaciones de servicio Start en los nodos de ejecución de tareas.

Ilustración 16 Método SendTaskToTaskExecutionNode y métodos auxiliares

internal static void SendTaskRequestToTaskExecutionNode(List<STaskInfo> staskInfos)
  {
  if (staskInfos.Count() == 0)
    return;
  var channel = new DuplexChannelFactory<ITaskExecutionNode>(
                new InstanceContext(new CallbackHandler()),
                new NetHttpBinding(), new EndpointAddress(“http://.../”))
                .CreateChannel();
  try
  {
    var requestId = Guid.NewGuid().ToString();
    var reqs = staskInfos.Select(s => AddRequestToTracker(requestId,s, channel))
      .Where(s => s != null);
    ((IChannel)channel).Open();
    channel.Start(reqs.ToList<STask>());
  }
  catch (CommunicationException ex)
  {
    foreach (var stask in staskInfos)
      HandleClientUpdate(stask.ClientRequest.Id, STaskStatus.Faulted, ex);
  }
  }
private static STask AddRequestToTracker(string requestId,
  STaskInfo info, ITaskExecutionNode channel)
{
  info.ExecutionRequestId = requestId;
  info.ExecutionRequestChannel = channel;
  if (_submissionTracker.TryAdd(info.ClientRequest.Id, info))
    return info.ClientRequest;
  HandleClientUpdate(info.ClientRequest.Id, STaskStatus.Faulted,
    new Exception(“Failed to add “));
  return null;
}

Observe que el método SendTaskToTaskExecutionNode crea una instancia de una devolución de llamada para controlar el resultado de la ejecución de la tarea en un nodo de ejecución de tareas:

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
  public class CallbackHandler : ITaskUpdateCallback
  {
    public void UpdateStatus(string id, STaskStatus status, string result)
    {
      CoordinatorContext.HandleClientUpdate (id, status, result);
    }
  }

Para controlar la operación de devolución de llamada, CallbackHandler llama al método HandleClientUpdate. Este método recupera y retira la instancia STaskInfo de submitterTracker y realiza una devolución de llamada al cliente para actualizar el resultado. Además, si esta es la última solicitud del grupo, cierra el canal entre el coordinador de tareas y el nodo de ejecución de tareas. En la Ilustración 17 podemos ver la implementación del método HandleClientUpdate.

Ilustración 17 Método HandleClientUpdate y métodos auxiliares

internal async static void HandleClientUpdate(
  string staskId, STaskStatus status, object result)
  {
    STaskInfo info;
    if (!_submissionTracker.TryGetValue(staskId, out info))
      throw new Exception(“Could not get task from the tracker”);
try
  {
    await Task.Run(() =>
      info.CallbackChannel.UpdateStatus(info.ClientRequest.Id, status, result));
    RemoveComplete(info.ClientRequest.Id);           
  }
catch(AggregateException ex)
  {
  // ...
  }
  }
private static void RemoveComplete(string staskId)
  {
    STaskInfo info;
    if (!_submissionTracker.TryRemove(staskId, out info))
      throw new Exception(“Failed to be removed from the tracking collection”);
    if (_submissionTracker.Values.Where((t) => t.ExecutionRequestId ==
      info.ExecutionRequestId).Count() == 0)
      CloseTaskRequestChannel((IChannel)info.ExecutionRequestChannel);
  }
  private static void CloseTaskRequestChannel(IChannel channel)
  {
    if (channel != null && channel.State != CommunicationState.Faulted)
      channel.Close();
  }

Implementador de tarea

En el código del cliente, typeName es uno de los parámetros necesarios cuando se agregan solicitudes. En última instancia, este valor llega al nodo de ejecución de tareas. El valor de typeName es el nombre del tipo de la implementación de una interfaz que expone un delegado de función que encapsula la funcionalidad que se pretende ejecutar como una tarea paralela y que reside en todos los nodos de ejecución de tareas. A esta interfaz le daré el nombre IRunnableTask. Al implementar esta interfaz, deberíamos recibir como parámetros del cliente un token de cancelación y una matriz de parámetros. El delegado también debería devolver el resultado de la tarea. Esta es la interfaz:

public interface IRunnableTask
{
  Func<Object> Run(CancellationToken ct, params string[] taskArgs );
}

Inicio de una tarea en un nodo de ejecución de tareas

En un nivel superior, el nodo de ejecución de tareas es el responsable de “transformar” una solicitud de ejecución de tareas en una tarea propiamente tal, que TPL pueda ejecutar; es decir, iniciar una tarea de TPL. En la Ilustración 18 vemos una implementación básica de este proceso; la voy a analizar a continuación.

Ilustración 18 Inicio de una tarea

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskExecutionNodeHandler : ITaskExecutionNode
{              
  public void Start(List<STask> stasks)
  {
    var callback =
      OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
    foreach (var t in stasks)
      TaskExecutionContext.Start(t,callback);           
  }
...
}
public static class TaskExecutionContext
{
...
internal static void Start(STask stask, ITaskUpdateCallback callback)
{
  try
  {
    // Step 1.a
    var rtasks = CompositionUtil.ContainerInstance.GetExports<IRunnableTask>();
    // Step 1.b
    var rtask = from t in rtasks
                where t.Value.GetType().FullName == stask.STaskTypeName
                select t.Value;
    // Step 2
    var cs = new CancellationTokenSource();
    var ct = cs.Token;
    TaskExecutionContext._cancellationSources.TryAdd(stask.Id, cs);
    // Step 3 
    Task<Object>
      .Run(rtask.First().Run(ct, stask.STaskParameters), ct)
      .ContinueWith(tes => UpdateStatus(tes, stask, callback));
  }
  catch (Exception ex)
  {
    ...
  }
}
...
}

Paso 1 (a y b): En esta etapa, el nodo de ejecución de tareas tiene que crear una instancia de IRunnableTask que devolverá un delegado, que a su vez se ejecutará como una tarea con el tipo solicitado por el cliente. Para esto, sacaré partido de MEF y de una característica nueva de .NET Framework 4.5 que permite una configuración sin atributos. En el código que aparece en la Ilustración 19, se crea una sola instancia del contenedor que exporta todas las implementaciones de IRunnableTask ubicadas en el directorio “extensions”. Para obtener más información acerca de MEF y del método de configuración sin atributos, consulte el artículo de MSDN Magazine de junio de 2012 en msdn.microsoft.com/magazine/jj133818.

Ilustración 19 Creación de un contenedor

internal static  class CompositionUtil
{
  private readonly static Lazy<CompositionContainer> _container =
    new Lazy<CompositionContainer>(() =>
    {
      var builder = new RegistrationBuilder();
      builder.ForTypesDerivedFrom<IRunnableTask>()                         
             .Export<IRunnableTask>()
             .SetCreationPolicy(CreationPolicy.NonShared);
      var cat = new DirectoryCatalog(“extensions”, builder);
      return new CompositionContainer(cat, true, null);
    }
  ,true);
  internal static CompositionContainer ContainerInstance
  {
    get { return _container.Value; }
  } 
}

Regresemos al código de la Ilustración 18. Este usa el contenedor para obtener las exportaciones del tipo IRunnableTask y luego selecciona la instancia con el tipo que tiene el mismo nombre que la solicitud del cliente. Observe que el supuesto fundamental aquí es que solo existe una instancia de tarea que corresponde al tipo solicitado por el cliente. Esta es la razón por la cual uso la primera instancia que devuelve la consulta LINQ.

Paso 2: Antes de crear realmente la tarea de TPL, el código crea un origen de tokens de cancelación y un token de cancelación. Llevaré la cuenta del origen de cancelación en una instancia única de ConcurrentDictionary<TKey,TValue>. De esta forma, el nodo de ejecución de tareas usará esta lista de orígenes de cancelación cuando un cliente solicite una cancelación. Esta es la definición de esta instancia:

public static class TaskExecutionContext
{
...
private readonly static ConcurrentDictionary<string, 
  CancellationTokenSource> _cancellationSources =
  new ConcurrentDictionary<string, CancellationTokenSource>();
...
}

Paso 3: Llegado a este punto, ejecuto la tarea con el token de cancelación que acabo de crear. Posteriormente, la tarea recibe una tarea de continuación. Esta última es necesaria, ya que el coordinador de tareas se debe actualizar mediante una llamada de servicio, con lo que esta se ejecuta una vez que se complete la tarea de TPL (ya sea con o sin errores). Como se puede apreciar en la Ilustración 20, encapsulé el proceso de actualización del coordinador de tareas en un delegado que recibe un parámetro de la tarea de TPL, la solicitud de ejecución de tarea y una instancia de devolución de llamada al coordinador de tareas.

Ilustración 20 Encapsulación del proceso de actualización

private static Action<Task<Object>, STask, 
  ITaskUpdateCallback> UpdateStatus = (t, st, cb) =>
  {
    try
    {
      STaskStatus s;
      Object r = null;
      switch (t.Status)
      {
        case TaskStatus.Canceled: s = STaskStatus.Canceled;
          break;
        case TaskStatus.Faulted:
          s = STaskStatus.Faulted;
          r = t.Exception.Flatten();
          break;
        case TaskStatus.RanToCompletion:
          s = STaskStatus.Completed;
          r = t.Result;
          break;
        default:
          s = STaskStatus.Faulted;
          r = new Exception("Invalid Status");
          break;
      }
      CancellationTokenSource cs;
      TaskExecutionContext._cancellationSources.TryRemove(st.Id, out cs);
      cb.UpdateStatus(st.Id, s, r);
    }
    catch (Exception ex)
  {
  // Error handling
  }
};

Solicitud y control de una cancelación

TPL proporciona un mecanismo para implementar la cancelación de tareas. Para esto, el delegado que encapsula el proceso que se ejecuta como una tarea de TPL debe responder a la solicitud de cancelación y terminar la ejecución. Para obtener más información sobre la cancelación de tareas, consulte el artículo de la Biblioteca de MSDN “Cancelación de tareas” en bit.ly/NYVTO0.

Uno de los parámetros de la interfaz IRunnableTask es el token de cancelación. El nodo de ejecución de tareas creará un token para cada tarea y es responsabilidad del consumidor que implementa la interfaz definir el momento en que se determina si existe una solicitud de cancelación y terminar el proceso correctamente. En el código de la Ilustración 21, se muestra una tarea sencilla que calcula la cantidad de números pares de un intervalo, mientras que al mismo tiempo se revisa si se solicitó una cancelación.

Ilustración 21 Revisión de cancelación

public class MySimpleCTask : IRunnableTask
{
  public Func<Object> Run(Nullable<CancellationToken> ct, 
    params string[] taskArgs)
  {
    var j = int.Parse(taskArgs[0]);
    var z = 0;
    return (() =>
  {
      for (int i = 0; i < j; i++)
      {
        if (i % 2 != 0)
        {
          z++;
          ct.Value.ThrowIfCancellationRequested();
        }
      }
      return z;
   });
  }
}

Como pudimos apreciar al analizar el cliente, podemos agregar una solicitud con un token de cancelación y el cliente realizará internamente la suscripción necesaria. Por lo tanto, cuando se genera una cancelación, se envía una solicitud de cancelación al coordinador de tareas. Al recibir la solicitud de cancelación, el coordinador de tareas revisa si la solicitud se envió a un nodo de ejecución de tareas y envía una solicitud de cancelación. Entonces, el nodo de ejecución de tareas busca el origen de cancelación que corresponde a la tarea solicitada por el identificador del cliente. El envío de la solicitud de cancelación al nodo de ejecución de tareas es relativamente sencillo; solo hay que ubicar el canal que corresponde a la solicitud donde el coordinador de tareas envió inicialmente la solicitud de ejecución de tarea. Estos canales se deben mantener abiertos para las devoluciones de llamadas que actualizan el estado de la solicitud de ejecución.

En la Ilustración 22 vemos la implementación de las operaciones de servicio del coordinador de tareas.

Ilustración 22 Implementación de las operaciones de servicio en el coordinador de tareas

public class TaskCoordinatorService : ITaskCoordinator
{
...
public bool CancelTask(string Id)
  {
    return CoordinatorContext.CancelTask(Id);
  }
  ...}
public static class CoordinatorContext
{
...
internal static bool CancelTask(string Id)
{
STaskInfo info;
if(_submissionTracker.TryGetValue(
  Id, out info) && info.ExecutionRequestChannel != null)
{
  info.ExecutionRequestChannel.Cancel(Id);
  return true;
}
return false;
}
  ...
}

En la Ilustración 23, finalmente, se aprecia la implementación de las operaciones de servicio en los nodos de ejecución de tareas.

Ilustración 23 Implementación de las operaciones de servicio en los nodos de ejecución de tareas

class CancellationHandler : ICancellationHandler
  {
  public void Cancel(STask stask)
  {
    TaskExecutionContext.CanceTask(stask.Id);
  }
  }
public static class TaskExecutionContext
{
...
  internal static void CancelTask(string Id)
  {
    CancellationTokenSource tknSrc;
    if (_cancellationSources.TryGetValue(Id, out tknSrc))
      tknSrc.Cancel(); }
...
}

Escalabilidad del coordinador de tareas y otras cuestiones

Conviene tomar en cuenta que esta implementación supone que el coordinador de tareas se ejecuta en un solo nodo, pero es totalmente factible escalar el coordinador de tareas horizontalmente (esto exigiría, como mínimo, los siguientes cambios):

  • Se debe agregar un equilibrador de carga para obtener acceso al coordinador de tareas.
  • Como describí anteriormente, la clave para el método de limitación es tener un recuento preciso del número de tareas en ejecución, en total y por tipo. En un contexto con más de un nodo que funciona como coordinador de tareas, estos contadores se deben mantener en forma central (por ejemplo, en una base de datos) y se debe conservar la capacidad de actualizarlos o leerlos en forma sincronizada (evitando condiciones de carrera, bloqueos, etc.).

Para terminar, permítame hacer una observación. Igual que con cualquier método de desarrollo, se debe evaluar el riesgo y el valor frente a otras alternativas que puedan satisfacer las necesidades y que estén disponibles en forma comercial. Por ejemplo, podría evaluar tecnologías como Microsoft HPC Server como una solución viable para muchas de las situaciones que se pueden abordar con las técnicas descritas en este artículo.

Optimización de recursos

La biblioteca TPL proporciona la infraestructura necesaria para lograr la utilización más óptima de los recursos de la CPU con un solo equipo de varios núcleos, y también resulta útil para implementar un sistema que sea escalable más allá de los límites de un equipo. Esto puede ser útil para las situaciones de automatización de carga de trabajo y procesamiento por lotes, donde el paralelismo no solo se necesita para los servidores de un solo núcleo, sino que también para varios servidores.

Para lograr esta escalabilidad horizontal, se deben tomar en cuenta varios puntos relacionados con la arquitectura. Algunas de las más fundamentales son: la necesidad de equilibrar la carga entre los recursos existentes mientras se tiene la posibilidad de agregar más recursos a la granja existente y la posibilidad de limitar los recursos a partir de la semántica de las tareas que se deben ejecutar. Las herramientas y tecnologías de desarrollo de Microsoft entregan los elementos de construcción necesarios para implementar una arquitectura que toma en cuenta estos aspectos claves.

Jesus Aguilar trabaja en Microsoft en el área de soporte técnico Premier para desarrolladores como gerente superior de desarrollo de aplicaciones.

Gracias a los siguientes expertos técnicos por su ayuda en la revisión de este artículo: Ryan Berry, Steve Case, Rick Claude y Piyush Joshi