TPL

Escalabilidade horizontal para execução paralela de tarefas

Jesus Aguilar

Baixar o código de exemplo

A TPL (Biblioteca paralela de tarefas), apresentada no Microsoft .NET Framework 4, capacita os desenvolvedores de aplicativos para que criem soluções que aproveitam o poder do processamento paralelo em um computador com vários núcleos. Em muitos cenários, no entanto, a capacidade de escalar verticalmente (adicionando mais núcleos) é restrita por um número de fatores, incluindo custo e limitações de hospedagem. Em tais casos, se a escalabilidade for exigida, será desejável distribuir o processamento por uma matriz de servidores; a hospedagem em nuvem é um exemplo disso. Neste artigo, descreverei os principais aspectos (incluindo uma implementação) de uma solução conceitual para realizar isso usando muitos dos novos recursos do .NET Framework 4.5.

Suposições básicas

A abordagem que descreverei requer várias tecnologias além da TPL, que inclui:

  • TPL (Task Parallel Library)
  • WCF (Windows Communication Foundation)
  • MEF (Managed Extensibility Framework)

Observe que irei discuti-los apenas no contexto do problema que estou tentando resolver. Estou supondo que você tem um bom entendimento dessas tecnologias.

Cliente de tarefa remota, Coordenador de tarefa e Nós de execução de tarefa

O cliente de tarefa remota é a camada do lado do cliente que ocultará a complexidade resultante da semântica de se usar um ambiente distribuído. O cliente de tarefa remota interage diretamente com o coordenador de tarefa que, então, se torna o ponto de entrada da infraestrutura básica. Em alto nível, o coordenador de tarefa tem os seguintes atributos:

  1. É o único ponto de contato com os clientes.
  2. Ele expõe os serviços necessários para solicitar a execução de tarefas na plataforma escalável, além do cancelamento de uma determinada tarefa.
  3. Ele manipula as limitações e o enfileiramento das solicitações de execução de tarefas, que oferece suporte à operação íntegra do ambiente.

Os nós de execução de tarefa são os hosts dos processos nos quais as tarefas serão executadas. As implementações reais das tarefas que serão executadas pela TPL residem nos nós de execução de tarefa.

Aqui estão os aspectos principais dessas camadas lógicas e o fluxo de informações:

  1. O cliente de tarefa remota solicita a execução de uma ou mais tarefas.
  2. O coordenador de tarefa envia a solicitação para os nós de execução de tarefa.
  3. Os nós de execução de tarefa executam as tarefas e atualizam o status de cada solicitação no coordenador de tarefa.
  4. O coordenador de tarefa atualiza o cliente com os resultados da execução de cada solicitação.
  5. Os nós de execução de tarefa residem atrás de um balanceador de carga de modo que mais nós possam ser adicionados conforme necessário, fornecendo a capacidade de escalonar horizontalmente.

A Figura 1 representa as camadas lógicas e o fluxo de informações.


Figura 1 Escalonando as tarefas horizontalmente

Observe como os nós de execução de tarefa atualizam o coordenador de tarefa, que por sua vez atualiza o cliente de tarefa remota. Vou descrever uma implementação baseada em comunicação bidirecional entre o cliente e o coordenador de tarefa e entre o coordenador de tarefa e os nós de execução de tarefa. Em termos de WCF, isso implica o uso de um canal duplex que permite aos nós de execução de tarefa chamar de volta o coordenador de tarefa e, subsequentemente, o coordenador de tarefa fazer o mesmo para atualizar o cliente. Demonstrarei o uso de Websockets para se obter essa abordagem de comunicação bidirecional. O transporte de WebSockets é implementado como uma nova associação no .NET Framework 4.5 e está disponível para o Windows 8. Mais informações sobre a associação podem ser encontradas em bit.ly/SOLNiU.

O cliente e o coordenador de tarefa

Agora que você entende as três principais camadas lógicas, cliente de tarefa remota, coordenador de tarefa e nós de execução de tarefa, vamos começar discutindo a implementação do cliente de tarefa remota. Observe que quando uso o termo “cliente” ao longo deste artigo, estou me referindo ao cliente de tarefa remota.

Como mencionei anteriormente, a proposição de valor do cliente é a capacidade de ocultar a complexidade dos componente básicos. Uma maneira que isso é alcançado é fornecendo uma API que dê a impressão de execução local de tarefas, apesar do fato de que elas possam estar em execução em outro lugar. O código da Figura 2 mostra os métodos públicos da classe RemoteTaskClient.

Figura 2 Métodos públicos da classe RemoteTaskClient

public class RemoteTaskClient<TResult> : IDisposable
{
  public void AddRequest(string typeName, 
    string[] parameters, CancellationToken tk)
  {...}
  public void AddRequest(string typeName, string[] parameters)
  {...}
  public Task<TResult>[] SubmitRequests()
  {...}
  public RemoteTaskClient(string taskCoodinatorEndpointAddress)
  {...}
  public void Dispose()
  {...}
}

É possível usar o método AddRequest para adicionar solicitações para execução remota. Para cada solicitação é necessário especificar o typeName (que é o tipo da implementação real que contém o delegado que a infraestrutura executará remotamente com uma tarefa TPL) e os parâmetros associados. Em seguida, você pode enviar as solicitações por meio do método SubmitRequest. O resultado de enviar uma solicitação é uma matriz de tarefas TPL, uma para cada solicitação. Essa abordagem permitirá que você gerencie as tarefas TPL resultantes como faria se fossem locais. Por exemplo, você pode enviar várias solicitações e esperar que elas sejam concluídas, como a seguir:

using (var c = new RemoteTaskClient<int>("..."))
  {
    c.AddRequest("...", null);
    c.AddRequest("...", null);
    var ts = c.SubmitRequests();
    Task.WaitAll(ts);
    foreach (var t in ts)
      Console.WriteLine(t.Result);
  }

Antes de entrar nos detalhes da implementação da RemoteTaskClient, vamos dar uma olhada nas operações de serviço e nos contratos de dados que o coordenador de tarefa expõe. Entender esses contratos antes de examinar a implementação da RemoteTaskClient lhe dará contexto adicional, pois a implementação do cliente conta com esses serviços.

O código na Figura 3 mostra as operações de serviço que o coordenador de tarefa expõe para o cliente. Por meio da operação SubmitRequest, o cliente tem a capacidade de solicitar a execução de uma ou mais tarefas TPL. O cliente também pode solicitar o cancelamento de uma tarefa TPL específica que não esteja concluída, por meio da operação CancelTask. Observe que a operação UpdateStatus é um retorno de chamada. É por meio de uma implementação do lado do cliente desse contrato de retorno de chamada que o coordenador de tarefa atualizará o status no cliente.

Figura 3 Operações de serviço

[ServiceContract(CallbackContract = typeof(ITaskUpdateCallback))]
  public interface ITaskCoordinator
  {
    [OperationContract(IsOneWay = true)]
    void SubmitRequest(List<STask> stask);
    [OperationContract]
    bool CancelTask(string Id);       
  }
  public interface ITaskUpdateCallback
  {
    [OperationContract (IsOneWay = true)]
    void UpdateStatus(string id, STaskStatus status, string result);
  }

Vamos dar uma olhada no contrato de dados que representa a solicitação de execução de tarefa. Essa é a entidade de dados que o cliente enviará para o coordenador de tarefa, que por sua vez enviará a solicitação ao nó de execução de tarefa onde a execução real ocorrerá. A classe STask, mostrada na Figura 4, modela uma solicitação de execução de tarefa. Usando as propriedades STaskTypeName e STaskParameters, o cliente pode definir o tipo de tarefa que deseja executar, com os parâmetros relevantes. O coordenador de tarefa usará a Id da propriedade como um identificador exclusivo que as camadas lógicas podem usar para correlacionar a solicitação com a tarefa TPL real em execução no sistema.

Figura 4 A classe STask

[DataContract]
  public class STask
  {
    [DataMember]
    public string Id
    { get; set; }
    [DataMember]
    public string STaskTypeName
    { get; set; }
    [DataMember]
    public string[] STaskParameters
    { get; set; }
  }

Vamos agora voltar à RemoteTaskClient e discutir como estou planejando correlacionar a tarefa TPL local com o resultado da execução nos nós de execução de tarefa. A TPL tem uma classe conveniente, TaskCompletionSource<TResult>, que posso usar para criar uma tarefa TPL e controlar seu ciclo de vida. Esse mecanismo me permite sinalizar quando uma determinada tarefa é concluída, cancelada ou apresentou falha. A implicação aqui é que cada solicitação que vai para um nó de execução de tarefa (por meio do coordenador de tarefa) deve ser correlacionada a uma instância da TaskCompletionSource. Para isso, implementei a classe ClientRequestInfo, mostrada na Figura 5.

Figura 5 A classe ClientRequestInfo

internal class ClientRequestInfo<TResult>
{
  internal STask TaskExecutionRequest
  { get; set; }
  internal TaskCompletionSource<TResult> CompletionSource
  { get; set; }
  internal ClientRequestInfo(string typeName, string[] args)
  {
    TaskExecutionRequest = new STask()
      {Id = Guid.NewGuid().ToString(), STaskTypeName =typeName,
        STaskParameters = args };
    CompletionSource = new TaskCompletionSource<TResult>();
  }
}

A Figura 6 mostra a implementação do construtor dessa classe.

Figura 6 O construtor ClientRequestInfo

ITaskCoordinator _client;           
ConcurrentDictionary<string, ClientRequestInfo<TResult>> 
  _requests = new ConcurrentDictionary<string, 
  ClientRequestInfo<TResult>>();
  public RemoteTaskClient(string taskCoordinatorEndpointAddress)
  {           
    var factory = new DuplexChannelFactory<ITaskCoordinator>
       (new InstanceContext(new CallbackHandler<TResult>(_requests)),
       new NetHttpBinding(),
       new EndpointAddress(taskCoordinatorEndpointAddress));
    _client = factory.CreateChannel();
    ((IClientChannel)_client).Open();
  }

Observe que estou abrindo um canal duplex para o coordenador de tarefa e criando uma instância de retorno de chamada do tipo CallbackHandler. CallbackHandler recebe como parâmetro _requests, que contém instâncias de ClientRequestInfo. O racional é que o dicionário _requests contém todas as instâncias ativas das solicitações do cliente (e as instâncias de TaskCompletionSource que estão associadas a elas), e a CallbackHandler manipulará as atualizações do coordenador de tarefa. Como diversas solicitações de serviço irão atualizar o dicionário _requests, preciso garantir a segurança do thread, daí a necessidade de que isso seja criado como uma instância de ConcurrentDictionary.

A Figura 7 mostra a implementação da classe CallbackHandler.

Figura 7 A classe CallbackHandler

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class CallbackHandler<TResult> : ITaskUpdateCallback
{
  ConcurrentDictionary<string, ClientRequestInfo<TResult>> _requests;
  public void UpdateStatus(string id, STaskStatus status, Object result)
    {
      ClientRequestInfo<TResult> info;
      if (_requests.TryRemove(id, out info))
      {                           
        switch (status)
        {
          case STaskStatus.
            Completed: info.CompletionSource.SetResult(
              (TResult)result);
            break;
          case STaskStatus.Canceled: 
            info.CompletionSource.SetCanceled();
            break;
          case STaskStatus.Faulted: 
            info.CompletionSource.SetException(
              (Exception)result);
            break;
        }
      }
}
  internal CallbackHandler(ConcurrentDictionary<string,
    ClientRequestInfo<TResult>> requests)
  {
    requests = requests;
  }
}

A seguir, vamos dar uma olhada na implementação dos métodos AddRequest e SubmitRequest, como mostrado na Figura 8.

Figura 8 Métodos AddRequest e SubmitRequest

public void AddRequest(string typeName, string[] parameters, 
  CancellationToken tk)
{
  var info = new ClientRequestInfo<TResult>(typeName, args);
  _buffer.Add(info);
  tk.Register(()=> _client.CancelTask(info.TaskExecutionRequest.Id));
}
public void AddRequest(string typeName, string[] parameters)
  {
    _buffer.Add(new ClientRequestInfo<TResult>(typeName, parameters));
  }
public Task<TResult>[] SubmitRequests()
  {
    if (_buffer.Count == 0)
      return null;
    var req = _buffer.Select((r) =>
    {
      _requests.TryAdd(r.TaskExecutionRequest.Id, r);
      return r.TaskExecutionRequest;                
    });
    _client.SubmitRequest(req.ToList<STask>());
    var ret =  _buffer.Select(r =>
      r.CompletionSource.Task).ToArray<Task<TResult>>();
    _buffer.Clear();
    return ret;
  }

Controlando solicitações de cliente

Como visto na seção anterior, o cliente interage unicamente com o coordenador de tarefa e é responsabilidade do coordenador de tarefa manipular as solicitações do cliente e subsequentemente atualizar o cliente com os resultados da execução da tarefa TPL. Como com o cliente, isso requer que a solicitação original seja persistida de alguma forma. Também requer controlar a instância de retorno de chamada correspondente (que permite comunicação com o cliente); o canal para os nós de execução de tarefa associado com a conexão (necessário, como será visto posteriormente, em cenários de cancelamento); um identificador exclusivo que agrupa todas as solicitações de execução de tarefa associadas com uma única chamada para um nó de execução de tarefa (para determinar quando o canal não é mais necessário); além do status e do resultado da execução. A Figura 9 mostra a definição da classe STaskInfo, a entidade que conterá essas informações. Além disso, usarei uma instância única de ConcurrentDictionary<TKey,TValue> como o mecanismo de persistência.

Figura 9 As classes STaskInfo e CoordinatorContext

public class STaskInfo
  {
    public string ExecutionRequestId
    { get; set; }
    public STask ClientRequest
    { get; set; }
    public ITaskUpdateCallback CallbackChannel
    { get; private set; }
    public ITaskExecutionNode ExecutionRequestChannel
    { get; set; }
    public STaskInfo(ITaskUpdateCallback callback)
    {
      CallbackChannel = callback;
    }
  }
public static class CoordinatorContext
{
...
private static readonly ConcurrentDictionary<string, STaskInfo> 
  _submissionTracker =
  new ConcurrentDictionary<string, STaskInfo>();
...
}

Finalmente, observe que _submissionTracker está contido na classe CoordinatorContext. Usarei essa classe para implementar a principal funcionalidade do coordenador de tarefa.

Manipulando solicitações de cliente

O coordenador de tarefa é o único ponto de entrada dos clientes, o que significa que ele deve ser capaz de manipular tantas solicitações de cliente quanto possível enquanto impede os nós de execução de tarefa de se tornarem saturados (em termos de recursos). Não é tão fácil como parece. Para explicar melhor os potenciais desafios, vamos dar uma olhada em uma solução simples:

  1. O coordenador de tarefa expõe a operação de serviço pela qual os clientes enviam as solicitações de execução de tarefa.
  2. O coordenador de tarefa envia essas solicitações para os nós de execução de tarefa para sua execução e controla essas solicitações, isto é, ele persiste o estado.

A Figura 10 mostra uma implementação básica desse processo de envio.

Figura 10 Implementando o processo de envio

public class TaskCoordinatorService : ITaskCoordinator
{
...
  public void SubmitRequest(List<STask> stasks)
  {    
    CoordinatorContext.SendTasksToTaskHandler(stasks);
  }
...
}
public static class CoordinatorContext
{
...
  internal static void SendTaskRequestToTaskExecutionNode(List<STask> stasks)
  {
  var clientFactory = //Client factory creation logic.. 
  var channel = clientFactory.CreateChannel();
  foreach (var stask in stasks)
    _submissionTracker.TryAdd(stask.Id, stask);
  try
  {
    ((IClientChannel)channel).Open();
    channel.Start(stasks);
  }
  catch (CommunicationException ex)
  {   
    // Error handling and logging ...
  }
  finally
  {
    if (((IClientChannel)channel).State != CommunicationState.Faulted)               
       ((IClientChannel)channel).Close();
  }
}
...
}

No entanto, essa implementação simples não funcionaria muito bem em alguns cenários:

  • Se o cliente envia um grande número de tarefas em uma única solicitação, todas elas terminarão em um único nó de execução de tarefa, resultando em uma utilização desigual dos recursos disponíveis (supondo que haja mais de um nó de execução de tarefa disponível).
  • Em cenários de pico de carga, o sistema pode esgotar os recursos disponíveis nos nós de execução de tarefa se o número de tarefas TPL em execução exceder o que esses recursos podem manipular. Esse pode ser o caso quando o que está sendo executado como uma tarefa TPL estiver associado a um recurso específico (como memória) que em casos de pico podem aumentar o risco de tornar o sistema não responsivo.

Os limitadores

Uma maneira de abordar tais desafios é “gerenciar” de algum modo as solicitações de execução de tarefa à medida que passam pelo sistema. Nesse contexto, você pode pensar no coordenador de tarefa como um controlador de limitação. Antes de discutir o processo de limitação, no entanto, vamos examinar a semântica dos limitadores que, junto com os processos de limitação, usarei para atenuar esses riscos.

O primeiro cenário pode ser atenuado limitando o número de solicitações de execução de tarefa que o coordenador de tarefa pode enviar para os nós de execução de tarefa em uma única solicitação. Chamarei esse limitador de maxSTasks­PerRequest. Usando essa abordagem, o algoritmo do balanceador de carga poderá fazer seu trabalho de balanceamento da carga pelos nós de execução de tarefa disponíveis.

O segundo cenário é mais desafiador. Uma solução plausível é limitar o número de tarefas que os nós de execução de tarefa executarão a um número específico. Vou chamar esse limitador de maxNumberOfTasks.

Além desse limitador, a solução poderia se beneficiar de ter outro limitador que limite o número de tarefas sendo executadas com base em seu tipo. Para explicar por que isso é útil, vamos considerar um cenário em que os nós de execução de tarefa têm dois tipos de tarefas implantadas, T1 e T2. T1 está associada à CPU e T2 está associada à E/S de disco. Nesse cenário, a produtividade de um cliente que envia solicitações para a execução de tarefas T1 é mais provável de ser afetada por tarefas ativas que são ligadas pelo mesmo tipo de restrição. Quanto maior o número de tarefas T1, maior é o impacto. Como as tarefas T2 são ligadas por uma restrição diferente, o impacto que elas têm nas tarefas T1 não é o mesmo. Ter a capacidade de limitar a execução de tarefas por tipo significa que posso controlar quantas tarefas T1 podem estar em execução a qualquer momento, permitindo que eu maximize os recursos de CPU e, como resultado, a produtividade geral. Vou chamar esse limitador de maxNumberOfTasksByType.

Enfileirando e limitando

Agora que você entende a semântica dos controladores e como eles podem ser efetivos para manter a operação íntegra dos nós de execução de tarefa, vamos dar uma olhada no que acontece quando o limite especificado pelos limitadores é atingido, isto é, o processo de limitação real.

Uma opção é simplesmente gerar uma exceção. No entanto, isso afetaria a produtividade geral da solução, pois o cliente incorreria na sobrecarga de verificar um erro específico ou uma falha e, então, enviar novamente as solicitações até que o coordenador de tarefa pudesse manipulá-las com êxito. Uma alternativa seria usar enfileiramento do lado do servidor para guardar as solicitações do cliente e um processo parecido com monitor (um processo emissor) que, a intervalos regulares, leria as solicitações da fila e as enviaria para os nós de execução de tarefa. Usarei o processo emissor para executar a limitação real, pois o emissor lê da fila de solicitações considerando as seguintes regras:

  1. Limitar o número de solicitações que podem ser retiradas da fila a maxSTasksPerRequest.
  2. Se o maxNumberOfTasks do limitador for atingido, pare de retirar da fila as solicitações e a fila de solicitações permanecerá como está.
  3. Se o maxNumberOfTasksByType do limitador for atingido, retire da fila e, em seguida, coloque a solicitação de volta na fila de solicitações. Enfileirar a solicitação novamente permite a continuação do processamento de tarefas de outros tipos. Essa estratégia fornece oportunidade igual de execução a todas as tarefas na fila. Em alguns casos, no entanto, em vez disso, você deve considerar o uso de uma fila de prioridades. Você encontrará uma boa referência em bit.ly/NF0xQq.

A Figura 11 ilustra esse processo.


Figura 11 O processo de envio

Começarei a descrever a implementação desse processo mostrando o código (consulte a Figura 12) da operação de serviço SubmitRequest que enfileira as solicitações na fila de solicitações à medida que recebe as solicitações do cliente.

Figura 12 A operação de serviço SubmitRequest

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskCoordinatorService : ITaskCoordinator
  {
    public void SubmitRequest(List<STask> stasks)
    {           
      CoordinatorContext.EnqueueRequestsInRequestQ(stasks);
    }   
    ...
}
public static class CoordinatorContext
{
...
internal static void EnqueueRequestsInRequestQ(List<STask> stasks)
  {
    var callback =
      OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
    foreach (var stask in stasks)               
      _requestQ.Enqueue(new STaskInfo(callback) { ClientRequest = stask });
  }
...
}

A seguir, vamos dar uma olhada na implementação do processo emissor, mostrado na Figura 13.

Figura 13 Implementação do emissor

public static class CoordinatorContext
{
  ...
static CoordinatorContext()
  {
    Submitter(...);
  }
  private static async void Submitter(int interval)
  {
    while (true)
    {
      await Task.Delay(interval);
      SendTaskRequestToTaskExecutionNode(
      GetTasksFromRequestQ());
    }
  }
  ...
}

Na Figura 12 e na Figura 13, é possível ver a operação de serviço enfileirando (gravando) uma solicitação na fila de solicitações e a tarefa emissora retirando (lendo) da fila de solicitações. Nesse cenário, você precisa garantir que a estrutura de dados subjacente, a fila, seja thread-safe. Felizmente, há uma classe destinada exatamente para isso, a ConcurrentQueue<T>. Portanto, usarei uma instância única desse tipo como o repositório básico das solicitações.

public static class CoordinatorContext
{
  ...
private static readonly ConcurrentQueue<STaskInfo> _requestQ =
  new ConcurrentQueue<STaskInfo>();
  ...
}

Agora, vamos examinar a implementação do método GetTasksFromRequestQ, que lê as tarefas quando o intervalo de execução expira. É nesse método que o processo de limitação ocorre e onde os limitadores que descrevi anteriormente se aplicam. A Figura 14 mostra uma implementação desse processo.

Figura 14 Implementação de GetTasksFromRequestQ

public static class CoordinatorContext
{
  ...internal static List<STaskInfo> GetTasksFromRequestQ()
{
  var ret = new List<STaskInfo>();
  var maxSTasksPerRequest = //From a configuration
  var maxNumberOfTasks = //From a configuration
  var count =  // Count of submitted or executing tasks
  var countByType = // Enumerable of count by type
  for (int i = 0; i < maxSTasksPerRequest; i++)
  {
    STaskInfo info;
    if (count + i == maxNumberOfTasks || !_requestQ.TryDequeue(out info))
      return ret;
    var countTT = // Count of submitted or executing tasks of
                  // the type of the current item
    if (countTT == GetMaxNumberOfTasksByType(info.ClientRequest.STaskTypeName))
    { _requestQ.Enqueue(info); }
    else ret.Add(info);
  }
  return ret;
}
}
private static int GetMaxNumberOfTasksByType(string taskTypeName)
{
  // Logic to read from a configuration repository the value by task type name
}
...
}

O objetivo da implementação na Figura 14 é obter os números que permitem ao processo avaliar as condições de limitação. A Figura 15 mostra as consultas LINQ plausíveis que podem ser executadas no _submissionTracker, além de uma lista contendo os itens de retorno (ret) para obter esses valores. Observe que essa abordagem pode ser bem sucedida às custas do desempenho. Se esse for o caso, como alternativa você poderia implementar um conjunto de contadores thread-safe que incrementam ou decrementam à medida que os itens são adicionados ou removidos da instância do controlador de envio e usar esses contadores em vez de consultar o dicionário concorrente diretamente.

Figura 15 Os valores de limitação

var countByType = (from t in _submissionTracker.Values
                   group t by t.ClientRequest.STaskTypeName into g
                   select new
                   {
                      TypeName =  g.Key,
                      Count = g.Count()
                   });
var count = countByType.Sum(c => c.Count);
var countTT = (from tt in countByType
               where tt.TypeName == info.ClientRequest.STaskTypeName
               select tt.Count).SingleOrDefault()+ 
                   ret.Where((rt) => rt.ClientRequest.STaskTypeName == 
                   info.ClientRequest.STaskTypeName)
                   .Count();

Enviando solicitações para os nós de execução de tarefa e manipulando resultados

Até o momento discuti como o coordenador de tarefa gerencia as solicitações. Vamos dar uma olhada em como o coordenador de tarefa envia a solicitação para os nós de execução de tarefa, considerando agora o processo de limitação. Para fornecer um melhor contexto, vamos primeiro examinar as operações de serviço que os nós de execução de tarefa expõem (por meio do balanceador de carga):

[ServiceContract( CallbackContract = typeof(ITaskUpdateCallback))]
  public interface ITaskExecutionNode
  {
    [OperationContract]
    void Start(List<STask> stask);
    [OperationContract]
    void Cancel(string Id);
  }

Como seus nomes sugerem, os propósitos dessas operações são iniciar uma lista de solicitações de execução de tarefa e solicitar cancelamento de uma tarefa específica. O contrato de serviço aproveita o mesmo contrato de retorno de chamada para atualizar o coordenador de tarefa por meio de uma implementação do contrato.

A Figura 16 mostra uma implementação atualizada do método SendTaskToTaskExecutionNode em que o coordenador de tarefa armazena as instâncias STaskInfo no _submissionTracker e chama as operações de serviço Iniciar em um nó de execução de tarefa.

Figura 16 Os métodos SendTaskToTaskExecutionNode e o de suporte

internal static void SendTaskRequestToTaskExecutionNode(List<STaskInfo> staskInfos)
  {
  if (staskInfos.Count() == 0)
    return;
  var channel = new DuplexChannelFactory<ITaskExecutionNode>(
                new InstanceContext(new CallbackHandler()),
                new NetHttpBinding(), new EndpointAddress(“http://.../”))
                .CreateChannel();
  try
  {
    var requestId = Guid.NewGuid().ToString();
    var reqs = staskInfos.Select(s => AddRequestToTracker(requestId,s, channel))
      .Where(s => s != null);
    ((IChannel)channel).Open();
    channel.Start(reqs.ToList<STask>());
  }
  catch (CommunicationException ex)
  {
    foreach (var stask in staskInfos)
      HandleClientUpdate(stask.ClientRequest.Id, STaskStatus.Faulted, ex);
  }
  }
private static STask AddRequestToTracker(string requestId,
  STaskInfo info, ITaskExecutionNode channel)
{
  info.ExecutionRequestId = requestId;
  info.ExecutionRequestChannel = channel;
  if (_submissionTracker.TryAdd(info.ClientRequest.Id, info))
    return info.ClientRequest;
  HandleClientUpdate(info.ClientRequest.Id, STaskStatus.Faulted,
    new Exception(“Failed to add “));
  return null;
}

Observe que o método SendTaskToTaskExecutionNode cria uma instância de retorno de chamada para manipular o resultado da execução da tarefa em um nó de execução de tarefa:

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
  public class CallbackHandler : ITaskUpdateCallback
  {
    public void UpdateStatus(string id, STaskStatus status, string result)
    {
      CoordinatorContext.HandleClientUpdate (id, status, result);
    }
  }

O CallbackHandler manipula a operação de retorno de chamada chamando o método HandleClientUpdate. Esse método recupera e remove a instância STaskInfo correspondente do Controlador do emissor e executa um retorno de chamada para o cliente para atualizar o resultado. Além disso, se esta for a última solicitação do grupo, ele fecha o canal entre o coordenador de tarefa e o nó de execução de tarefa. A Figura 17 mostra a implementação do método HandleClientUpdate.

Figura 17 Métodos HandleClientUpdate e o de suporte

internal async static void HandleClientUpdate(
  string staskId, STaskStatus status, object result)
  {
    STaskInfo info;
    if (!_submissionTracker.TryGetValue(staskId, out info))
      throw new Exception(“Could not get task from the tracker”);
try
  {
    await Task.Run(() =>
      info.CallbackChannel.UpdateStatus(info.ClientRequest.Id, status, result));
    RemoveComplete(info.ClientRequest.Id);           
  }
catch(AggregateException ex)
  {
  // ...
  }
  }
private static void RemoveComplete(string staskId)
  {
    STaskInfo info;
    if (!_submissionTracker.TryRemove(staskId, out info))
      throw new Exception(“Failed to be removed from the tracking collection”);
    if (_submissionTracker.Values.Where((t) => t.ExecutionRequestId ==
      info.ExecutionRequestId).Count() == 0)
      CloseTaskRequestChannel((IChannel)info.ExecutionRequestChannel);
  }
  private static void CloseTaskRequestChannel(IChannel channel)
  {
    if (channel != null && channel.State != CommunicationState.Faulted)
      channel.Close();
  }

Implementador de tarefa

No código do cliente, typeName é um dos parâmetros necessários ao adicionar solicitações. No final, esse valor chega ao nó de execução de tarefa. O valor de typeName é o nome do tipo da implementação de uma interface que expõe uma função delegate que encapsula a funcionalidade destinada para execução como uma tarefa paralela e que reside em todos os nós de execução de tarefa. Chamarei essa interface de IRunnableTask. Implementadores dessa interface devem esperar receber como parâmetros um token de cancelamento e uma matriz de parâmetros do cliente. A função delegate deve também retornar o resultado da tarefa. Esta é a interface:

public interface IRunnableTask
{
  Func<Object> Run(CancellationToken ct, params string[] taskArgs );
}

Iniciando uma tarefa em um nó de execução de tarefa

Em um alto nível, um nó de execução de tarefa é responsável por “transformar” uma solicitação de execução de tarefa em uma tarefa real que a TPL possa executar, isso é, iniciar uma tarefa TPL. A Figura 18 mostra uma implementação desse processo, que irei discutir.

Figura 18 Iniciando uma tarefa

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskExecutionNodeHandler : ITaskExecutionNode
{              
  public void Start(List<STask> stasks)
  {
    var callback =
      OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
    foreach (var t in stasks)
      TaskExecutionContext.Start(t,callback);           
  }
...
}
public static class TaskExecutionContext
{
...
internal static void Start(STask stask, ITaskUpdateCallback callback)
{
  try
  {
    // Step 1.a
    var rtasks = CompositionUtil.ContainerInstance.GetExports<IRunnableTask>();
    // Step 1.b
    var rtask = from t in rtasks
                where t.Value.GetType().FullName == stask.STaskTypeName
                select t.Value;
    // Step 2
    var cs = new CancellationTokenSource();
    var ct = cs.Token;
    TaskExecutionContext._cancellationSources.TryAdd(stask.Id, cs);
    // Step 3 
    Task<Object>
      .Run(rtask.First().Run(ct, stask.STaskParameters), ct)
      .ContinueWith(tes => UpdateStatus(tes, stask, callback));
  }
  catch (Exception ex)
  {
    ...
  }
}
...
}

Etapa 1 (a e b): Neste estágio, o nó de execução de tarefa precisa criar uma instância de IRunnableTask, que retornará uma delegate que será executada como uma tarefa do tipo solicitado pelo cliente. Para isso, aproveito o MEF e um novo recurso do .NET Framework 4.5 que permite uma abordagem de configuração sem atributos. O código na Figura 19 cria uma instância única de contêiner que exporta todas as implementações de IRunnableTask localizadas no diretório “extensions.” Para obter mais informações sobre o MEF e a abordagem de configuração sem atributos, consulte o artigo da MSDN Magazine de junho de 2012, “Uma abordagem sem atributos para configurar o MEF” em msdn.microsoft.com/magazine/jj133818.

Figura 19 Criando um contêiner

internal static  class CompositionUtil
{
  private readonly static Lazy<CompositionContainer> _container =
    new Lazy<CompositionContainer>(() =>
    {
      var builder = new RegistrationBuilder();
      builder.ForTypesDerivedFrom<IRunnableTask>()                         
             .Export<IRunnableTask>()
             .SetCreationPolicy(CreationPolicy.NonShared);
      var cat = new DirectoryCatalog(“extensions”, builder);
      return new CompositionContainer(cat, true, null);
    }
  ,true);
  internal static CompositionContainer ContainerInstance
  {
    get { return _container.Value; }
  } 
}

Agora, vamos voltar para o código da Figura 18. O código usa o contêiner para obter as exportações do tipo IRunnableTask e, então, seleciona a instância com o nome do tipo que corresponde à solicitação do cliente. Observe que fiz a suposição-chave de que só há uma única instância de tarefa que corresponde ao tipo solicitado pelo cliente. Essa é a razão de usar a primeira instância que a consulta LINQ retorna.

Etapa 2: Antes de realmente criar a tarefa TPL, o código cria um Cancellation Token Source e um Cancellation Token. Controlarei o Cancellation Source em uma instância única de um ConcurrentDictionary<TKey,TValue>. O nó de execução de tarefa usará essa lista de fontes de cancelamento quando um cliente solicitar um cancelamento. Veja a definição dessa instância:

public static class TaskExecutionContext
{
...
private readonly static ConcurrentDictionary<string, 
  CancellationTokenSource> _cancellationSources =
  new ConcurrentDictionary<string, CancellationTokenSource>();
...
}

Etapa 3: Nesse ponto, executo a tarefa, com o token de cancelamento que acabei de criar. A tarefa é seguida de uma tarefa de continuação. A necessidade de tal tarefa aparece porque o coordenador de tarefa deve ser atualizado, fazendo uma chamada de serviço, com o resultado da execução depois de concluir a tarefa TPL (com êxito ou com uma falha). Como a Figura 20 mostra, encapsulo o processo de atualização do coordenador de tarefa em uma função delegate que recebe como um parâmetro a tarefa TPL, a solicitação de execução de tarefa e uma instância de retorno de chamada para o coordenador de tarefa.

Figura 20. Encapsulando o processo de atualização

private static Action<Task<Object>, STask, 
  ITaskUpdateCallback> UpdateStatus = (t, st, cb) =>
  {
    try
    {
      STaskStatus s;
      Object r = null;
      switch (t.Status)
      {
        case TaskStatus.Canceled: s = STaskStatus.Canceled;
          break;
        case TaskStatus.Faulted:
          s = STaskStatus.Faulted;
          r = t.Exception.Flatten();
          break;
        case TaskStatus.RanToCompletion:
          s = STaskStatus.Completed;
          r = t.Result;
          break;
        default:
          s = STaskStatus.Faulted;
          r = new Exception("Invalid Status");
          break;
      }
      CancellationTokenSource cs;
      TaskExecutionContext._cancellationSources.TryRemove(st.Id, out cs);
      cb.UpdateStatus(st.Id, s, r);
    }
    catch (Exception ex)
  {
  // Error handling
  }
};

Solicitando e manipulando um cancelamento

A TPL fornece um mecanismo para implementar o cancelamento de tarefa. Para isso, a delegate que encapsula o processo real sendo executado como uma tarefa TPL precisa responder à solicitação de cancelamento e terminar a execução. Para obter mais informações sobre o cancelamento de tarefa, consulte o artigo da Biblioteca MSDN, “Cancelamento de tarefa,” em bit.ly/NYVTO0.

Um dos parâmetros da interface IRunnableTask é um token de cancelamento. O nó de execução de tarefa criará um token para cada tarefa e cabe ao implementador da interface determinar quando verificar se há uma solicitação de cancelamento e terminar o processo naturalmente. O código na Figura 21 mostra uma tarefa simples que calcula o número de números pares em um intervalo, enquanto verifica se um cancelamento foi solicitado.

Figura 21 Verificando um cancelamento

public class MySimpleCTask : IRunnableTask
{
  public Func<Object> Run(Nullable<CancellationToken> ct, 
    params string[] taskArgs)
  {
    var j = int.Parse(taskArgs[0]);
    var z = 0;
    return (() =>
  {
      for (int i = 0; i < j; i++)
      {
        if (i % 2 != 0)
        {
          z++;
          ct.Value.ThrowIfCancellationRequested();
        }
      }
      return z;
   });
  }
}

Como visto quando discuti o cliente, é possível adicionar uma solicitação com um token de cancelamento e, internamente, o cliente executa a inscrição necessária. Portanto, quando um cancelamento é gerado, uma solicitação de cancelamento é enviada para o coordenador de tarefa. Depois de receber a solicitação de cancelamento, o coordenador de tarefa verifica se a solicitação foi enviada para um nó de execução de tarefa e envia uma solicitação de cancelamento. O nó de execução de tarefa procura então a fonte de cancelamento que corresponda à tarefa solicitada pela Id do cliente. Enviar a solicitação de cancelamento para o nó de execução de tarefa é relativamente simples. Você simplesmente precisa localizar o canal que corresponde à solicitação em que o coordenador de tarefa inicialmente enviou a solicitação de execução de tarefa. Esses canais precisam ser mantidos abertos para as chamadas de retorno que atualizam o status da solicitação de execução.

A Figura 22 mostra a implementação das operações de serviço no coordenador de tarefa.

Figura 22 Implementando as operações de serviço no coordenador de tarefa

public class TaskCoordinatorService : ITaskCoordinator
{
...
public bool CancelTask(string Id)
  {
    return CoordinatorContext.CancelTask(Id);
  }
  ...}
public static class CoordinatorContext
{
...
internal static bool CancelTask(string Id)
{
STaskInfo info;
if(_submissionTracker.TryGetValue(
  Id, out info) && info.ExecutionRequestChannel != null)
{
  info.ExecutionRequestChannel.Cancel(Id);
  return true;
}
return false;
}
  ...
}

Finalmente, a Figura 23 mostra a implementação das operações de serviço nos nós de execução de tarefa.

Figura 23 Implementando as operações de serviço nos nós de execução de tarefa

class CancellationHandler : ICancellationHandler
  {
  public void Cancel(STask stask)
  {
    TaskExecutionContext.CanceTask(stask.Id);
  }
  }
public static class TaskExecutionContext
{
...
  internal static void CancelTask(string Id)
  {
    CancellationTokenSource tknSrc;
    if (_cancellationSources.TryGetValue(Id, out tknSrc))
      tknSrc.Cancel(); }
...
}

Escalabilidade do coordenador de tarefa e outras considerações

Vale a pena observar que essa implementação supõe que o coordenador de tarefa é executado em um único nó, mas é bastante possível expandir o coordenador de tarefa (isso exigiria, pelo menos, as seguintes alterações):

  • Um balanceador de carga para acessar o coordenador de tarefa precisaria ser introduzido.
  • Como descrevi anteriormente, a chave para a abordagem de limitação é ter uma contagem precisa do número de tarefas em execução, no total e por tipo. Em um cenário com mais de um nó sendo executado como coordenadores de tarefa, esses contadores precisarão ser mantidos centralmente (por exemplo, em um banco de dados) e ao mesmo tempo poder serem atualizados ou lidos de forma sincronizada (evitando condições de corrida, deadlocks, etc.)

Finalmente, observe que como com qualquer abordagem de desenvolvimento, o risco e valor precisam ser pesados em relação a outras alternativas que possam atender às suas necessidades e que estão disponíveis para uso imediato. Por exemplo, você deve considerar tecnologias como o servidor Microsoft HPC como uma solução plausível para muitos cenários que você, de outro modo, talvez pense em adotar com base na abordagem descrita neste artigo.

Otimizando recursos

A TPL fornece a infraestrutura necessária para alcançar a utilização mais ideal de recursos de CPU em um único computador de vários núcleos, e também é útil para implementar uma abordagem que dimensione os limites do computador. Isso pode ser útil para cenários de automação de carga de trabalho e processamento em lotes em que o paralelismo é necessário, não apenas em um único servidor de vários núcleos, mas também por vários servidores.

Para alcançar essa escalabilidade horizontal, diversas considerações de arquitetura precisam ser levadas em conta. Dentre elas, as principais são: a necessidade de balancear a carga pelos recursos existentes tendo, ao mesmo tempo, a capacidade de adicionar mais recursos ao farm existente, e a capacidade de limitar os recursos de acordo com a semântica das tarefas que precisam ser executadas. As ferramentas e tecnologias de desenvolvimento da Microsoft fornecem os blocos de construção necessários para implementar uma arquitetura que leva em conta essas considerações importantes.

Jesus Aguilar trabalha para a Microsoft na área de Premier Support para desenvolvedores como gerente de desenvolvimento de aplicativos sênior.

Agradecemos aos seguintes especialistas técnicos pela revisão deste artigo: Ryan Berry, Steve Case, Rick Claude e Piyush Joshi