Compartir a través de


Importante neto

Acceso A una operación por turnos para el ThreadPool

Stephen Toub

 

P ESTOY utilizando actualmente el ThreadPool de Microsoft .NET Framework y ha ejecutar en una situación no estoy seguro de cómo resolver. Inicie fuera con un gran lote de elementos de trabajo que obtener en la cola y, a continuación, llega un segundo (lote más pequeño) tras el primer lote inicia su procesamiento. Inicialmente, parte del trabajo en el lote grande se distribuirá a todos los subprocesos de trabajo de la ThreadPool. Sin embargo, cuando llega el lote de segundo, desea que ser justo, con cada lote obtener atiende igualmente, en vez del primer lote obtener atención completa debido a que llega en primer lugar el la distribución.

 

Cuando uno de los lotes finaliza, me gustaría que uno sigue necesitando el procesamiento, a continuación, la atención de todos los subprocesos de trabajo. ¿Hay algo que pueda hacer a esta funcionalidad lotes encima de la ThreadPool?

A en columnas anteriores, he ha mostrado cómo a distintos tipos de funcionalidad encima de la ThreadPool de .NET existentes. En el número de octubre de 2004 de MSDN Magazine, he mostrado cómo agregar compatibilidad con esta forma hasta la ThreadPool de en espera de los elementos de trabajo en cola (consulte" ThreadPoolWait y HandleLeakTracker "). En el número de noviembre de 2004, he mostrado cómo agregar compatibilidad con las prioridades de elemento de trabajo (consulte" ThreadPoolPriority y MethodImplAttribute "). Y en el número de marzo de 2006, he mostrado cómo agregar compatibilidad de cancelación (consulte" Grupo de subprocesos abortable "). En el futuro, le poder mirar el problema de 2009 de enero al y decir que hay he mostrado cómo agregar compatibilidad con programación de una operación por turnos encima de la ThreadPool.

El problema que pretende para solucionar primero requiere un conocimiento de cómo el ThreadPool envía el trabajo. Internamente, mantiene una cola de los elementos de trabajo que se haya en la cola a ella. Cuando un subproceso en el grupo está disponible para ejecutar el trabajo, se vuelve a la cola de trabajo y extrae desactivar el elemento siguiente. El orden en el que tiene lugar este procesamiento es sin documentar y debe más definitivamente no se utilizó al (como podría y es probable que se cambiar en versiones futuras).

Hoy en día se implementa de manera muy sencilla: una cola de primero en primer lugar fuera (FIFO). Por lo tanto, el trabajo de primero pondrán en cola será el primer trabajo seleccionarse por un subproceso. En el escenario lotes, esto significa que todo el trabajo desde el primer lote será en la cola delante de todo el trabajo del segundo lote. Y, por tanto, todo trabajo del primer lote se distribuirá antes que el lote de segundo. Para algunos escenarios, esto es bien y óptimo. Para su situación, necesita más control.

Una de las formas más fáciles obtener ese control desde el ThreadPool es sustituir su propio delegado para una el usuario realmente desea ejecutada. Como ejemplo, vamos a decir que deseaba detectar excepciones no controladas todos los fuera del trabajo en cola y provocar un evento de cada uno. Para ello, podría escribir código como en la figura 1 . A continuación, en lugar de usar ThreadPool.QueueUserWorkItem, utilice ExceptionThreadPool.QueueUserWorkItem. Aún se podría ejecutar el trabajo mediante la ThreadPool, pero de subprocesos el grupo se realmente se ejecutar el delegado en la cola en lugar de la proporciona el usuario. Invocar el delegado a continuación, se invoque el delegado proporcionado por el usuario, detectar las excepciones y provocar el evento de destino.

Figura 1 Shimming el ThreadPool

public static class ExceptionThreadPool {
  public static void QueueUserWorkItem(
      WaitCallback callback, object state) {
    ThreadPool.QueueUserWorkItem(delegate {
      try { callback(state); }
      catch (Exception exc) {
        var handler = UnhandledException;
        if (handler != null) 
          handler(null, 
            new UnhandledExceptionEventArgs(exc, false));
      }
    });
  }

  public static event 
    UnhandledExceptionEventHandler UnhandledException;
}

Tenga en cuenta que esta técnica, aunque eficaz, tienen un costo: un delegado adicional necesita que se va a asignar, un delegado adicional debe invocarse y así sucesivamente. Si el costo es prohibitivo pueden estar determinado sólo por y los escenarios, pero este tipo de capas es normalmente más rentable que escribir su propio grupo de subprocesos desde el principio.

Esto es, por supuesto, un ejemplo muy sencillo, pero puede hacer cosas más complicadas. En el ejemplo la prioridad de grupo se a la que conoce antes almacena los delegados proporcionado por el usuario en sus propias estructuras de datos. Pone en la, a continuación, cola al fondo de un delegado de sustitución que vuelve y busca las estructuras de datos para el delegado correcto que se va a ejecutar, prefieren ejecutar en primer lugar que tienen las prioridades de mayor. Puede emplear una técnica similar para solucionar el problema de lotes.

Dedique unos instantes a suponga en lugar de tener una sola cola, haya una cola por lote. Cada lote coloca trabajo en su propia cola relevante. A continuación, se utilizaría subprocesos del grupo de la operación por turnos entre todos de las colas. El delegado vuelve, cola la ThreadPool real en las estructuras de datos y comenzando por la siguiente cola examinarse, busca de trabajo. Si se encuentra algunos, ejecuta de dicha cola. Si no es así, pasa de observar la siguiente cola.

De esta manera, es posible proporcionar gran programación entre los lotes. Si hay sólo un lote, los subprocesos se van a siempre actúan trabajo desde esa cola una. Si hay varios lotes, los subprocesos visite cada una de las colas, ofreciéndoles atención aproximadamente igual. la figura 2 proporciona una descripción general de esta solución aspecto que tendrá.

netmatters,fig02.gif

La Figura 2 proceso RoundRobinThreadPool

Para obtener las cosas van, primero necesita una estructura de datos para almacenar el delegado proporcionado por el usuario. La representación se muestra en la figura 3 . Esta estructura de datos contiene tres propiedades. Los dos primeros deben resultarle familiares; son el estado que del usuario proporciona a QueueUserWorkItem, por lo por supuesto que es necesario almacenar en caché fuera. La tercera propiedad puede ser no resultan familiares, sin embargo. Asociado con cada subproceso de ejecución en .NET es un System.Threading.ExecutionContext que representa la información, como el usuario actual, cualquier estado asociado con el subproceso lógico de ejecución, información de seguridad de acceso de código y así sucesivamente. Es importante que este contexto fluyen a través de puntos asincrónicos de ejecución.

La figura 3 capturas de un elemento de trabajo

internal class WorkItem {
  public WaitCallback WaitCallback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = state => {
    var item = (WorkItem)state;
    item.WaitCallback(item.State);
  };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else WaitCallback(State);
  }
}

Por ejemplo, si se está suplantando a una identidad de Windows específica y se llama ThreadPool.QueueUserWorkItem, debe ejecutar el trabajo en la cola en esa misma identidad de Windows. Si no es así, es un problema de seguridad potencial. A partir de la 2.0 de Framework .NET, este contexto fluye automáticamente de forma predeterminada en todos los puntos de asincrónico en el código: ThreadPool.QueueUserWorkItem, crear un nuevo subproceso, delegado asincrónico invocación y así sucesivamente.

Sin embargo, está reproduciendo por un conjunto diferente de las reglas con la implementación se tratan aquí. Si cambia el orden en el que se ponen en cola los delegados del orden en que se ejecutan, ya no hay una correspondencia directa entre este flujo ExecutionContext y los elementos de trabajo proporciona al usuario. Como tal, su implementación debe correctamente la caché la ExecutionContext con el delegado proporcionado por el usuario y, a continuación, utilizar ese contexto capturado para ejecutar ese delegado.

Ahora que tiene un elemento de trabajo, echemos un vistazo a la cola en el se retiene (se muestra en la figura 4 ). La estructura de datos RoundRobinThreadPool.Queue sí es bastante sencilla. Internamente, que contiene una cola <workitem> para almacenar todos los elementos de trabajo para que, una referencia a la instancia Round­RobinThreadPool con la que está asociada, la cola y un valor de tipo Boolean que indica si se ha llamado del método Dispose en la cola. A continuación, proporciona los métodos de QueueUserWorkItem con la misma firma que el ThreadPool.

Figura 4 turnos cola

public sealed class RoundRobinThreadPool {  
  private List<Queue> _queues;
  ...

  public sealed class Queue : IDisposable {
    internal Queue(RoundRobinThreadPool pool) { _pool = pool; }

    internal Queue<WorkItem> _workItems = new Queue<WorkItem>();
    private RoundRobinThreadPool _pool;
    internal bool _disposed;

    public void QueueUserWorkItem(WaitCallback callback) { 
      QueueUserWorkItem(callback, null); 
    }

    public void QueueUserWorkItem(WaitCallback callback, object state) {
      if (_disposed) 
        throw new ObjectDisposedException(GetType().Name);
      var item = new WorkItem { 
        Context = ExecutionContext.Capture(), 
        WaitCallback = callback, State = state };
      lock (_pool._queues) _workItems.Enqueue(item);
      _pool.NotifyNewWorkItem();
    }

    public void Dispose() {
      if (!_disposed)  {
        lock (_pool._queues) {
          if (_workItems.Count == 0) 
            _pool.RemoveQueueNeedsLock(this);
        }
        _disposed = true;
      }
    }
  }
}

Cuando se llama QueueUserWorkItem, la devolución de llamada y el estado proporcionado por el usuario (junto con la ExecutionContext actual) se capturan en un elementos de trabajo. Este trabajo, a continuación, se almacena en la cola genérica, y el grupo relevante se notifica que ha llegado el nuevo trabajo. Es importante que tenga en cuenta que un bloqueo se utiliza para proteger la cola de elemento de trabajo, QueueUserWorkItem puede llamarse simultáneamente desde varios subprocesos, y debe asegurarse de que se mantienen invariables.

Demasiado, tenga en cuenta que el objeto está bloqueado es una lista de colas global proviene del grupo. ESTOY utilizando un bloqueo concreto para el curso bastante para la implementación completa. Una implementación más eficaz es probable que utilizaría más detalladas bloqueo, como el uso de bloqueos individuales por cola lugar uno para el todo todo hasta RoundRobinThreadPool. Para facilitar la implementación y la simplicidad ha decidido para el bloqueo único.

El método Dispose se utiliza cuando ya no es necesaria esta cola. En un escenario típico de lotes, se creará una cola, el coloca trabajo en cola en él y, a continuación, se se elimina la cola. Si el método Dispose simplemente quitar esta cola del grupo, es probable que se debe quitarse mientras aún hubiera los elementos de trabajo en ella que se va a procesar.

Como tal, Dispose hace dos cosas. En primer lugar, comprueba si existen los elementos de trabajo restante. Si no existe, la cola llama en el grupo para quitar a sí mismo. En segundo lugar, marca sí mismo como tener ha eliminado. Podrá ver en un momento cómo el grupo trata una situación donde entra en un grupo desechado que no se ha quitado.

Figura 5 muestra el resto de la implementación, que de la Round­RobinThreadPool clase propia. El grupo contiene cuatro campos:

  • Una lista de las colas individuales mantenidos por el grupo (que también actúa como el bloqueo mencionado anteriormente).
  • Una cola predeterminada para el grupo.
  • Entero que representa la siguiente cola que se va a buscar por el trabajo.
  • La devolución de llamada delegado que realmente está en cola el ThreadPool.

La figura 5 RoundRobinThreadPool

public sealed class RoundRobinThreadPool {
  private List<Queue> _queues;
  private Queue _defaultQueue;
  private int _nextQueue;
  private WaitCallback _callback;

  public RoundRobinThreadPool() {
    _queues = new List<Queue>();
    _callback = DequeueAndExecuteWorkItem;
    _nextQueue = 0;
    _defaultQueue = CreateQueue();
  }

  public Queue CreateQueue() {
    var createdQueue = new Queue(this);
    lock (_queues) _queues.Add(createdQueue);
    return createdQueue;
  }

  public void QueueUserWorkItem(WaitCallback callback)  { 
    QueueUserWorkItem(callback, null); 
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _defaultQueue.QueueUserWorkItem(callback, state);
  }

  private void RemoveQueueNeedsLock(Queue queue) {
    int index = _queues.IndexOf(queue);
    if (_nextQueue >= index) _nextQueue--;
    _queues.RemoveAt(index);
  }

  private void NotifyNewWorkItem() { 
    ThreadPool.UnsafeQueueUserWorkItem(_callback, null); 
  }

  private void DequeueAndExecuteWorkItem(object ignored) {
    WorkItem item = null;

    lock (_queues) {
      var searchOrder = 
        Enumerable.Range(_nextQueue, _queues.Count - _nextQueue).
        Concat(Enumerable.Range(0, _nextQueue));

      foreach (int i in searchOrder) {
        var items = _queues[i]._workItems;
        if (items.Count > 0) {
          item = items.Dequeue();
          _nextQueue = i; 
          if (queue._disposed && 
              items.Count == 0) RemoveQueueNeedsLock(_queues[i]);
          break;
        }
      }
      _nextQueue = (_nextQueue + 1) % _queues.Count;
    }
    if (item != null) item.Execute();
  }

  ... // RoundRobinThreadPool.Queue and .WorkItem, already shown
}

Cuando se inicializa un RoundRobinThreadPool, todo este estado está configurado. En concreto, la cola predeterminado se inicializa mediante una llamada al método CreateQueue. Este método CreateQueue es el mismo método que se expone públicamente a permitir que un desarrollador para agregar otra cola al grupo (como cuando llega un nuevo lote de trabajo necesita su propia cola aislado). Simplemente crea una nueva instancia de Round­RobinThreadPool.Queue (el tipo se han examinado en la figura 3 ), lo agrega a la lista de las colas y se devuelve.

Para facilitar su uso, RoundRobinThreadPool expone sus propios métodos QueueUserWorkItem; estos destino simplemente la cola predeterminada que se creó cuando el grupo se crea una instancia.

A continuación examinar es el método NotifyNewWorkItem. Le recuerde que cuando se llama en una cola, QueueUserWorkItem después almacenar el elemento de trabajo, la cola llama este método NotifyNewWorkItem en el fondo. Este método sencillamente delega en la ThreadPool real, enviar un delegado que se regresar al método DequeueAndExecuteWorkItem (que se examinará en breve) que se, como su nombre implica acertadamente, dequeue y ejecutar un elemento de trabajo desde el fondo de una operación por turnos.

Observe que está llamando a NotifyNewWorkItem ThreadPool.UnsafeQueueUserWorkItem en lugar de ThreadPool.QueueUserWorkItem. El prefijo "no seguros" simplemente implica que no es fluye ExecutionContext; hacerlo no tiene una ventaja de rendimiento. Y como esta implementación ya está controlando ExecutionContext flujo manualmente, es necesario para la ThreadPool intentar hacer tan bien como.

DequeueAndExecuteWorkItem es donde tiene lugar la magia real. Este método en primer lugar genera una orden en que deben buscar las colas. El orden de búsqueda va desde la cola que se examinará hacia arriba hasta el final de la lista siguiente y, a continuación, círculos volver alrededor, comienza al principio de la lista y vaya hasta a la cola en el que inició. Para simplificar la implementación, el método enumerable.Range LINQ se sirve para generar las dos listas, a continuación, se concatenan entre sí mediante el método enumerable.Concat LINQ.

Una vez que tiene el orden de búsqueda, va idea para los elementos de trabajo. Se examina cada cola en el orden especificado y tan pronto como encuentra un elemento de trabajo, se quita el elemento de trabajo y el puntero siguiente está actualizado. El elemento de trabajo, a continuación, se invoca mediante el método Execute que se muestra en la figura 3 .

Hay una línea especialmente interesante de código aquí, y eso es el cheque para ver si la cola desde el que sólo se ha recuperado un elemento es desechado y vacíos. Si el grupo encuentra dicha una cola, sabe que alguna vez no tenga más elementos agregado a la (como se ha eliminado) y, por lo tanto, no es necesario mantenerlo alrededor de ser suyo. En ese momento, RemoveQueueNeedsLock se utiliza para quitar la cola de destino de la lista de las colas y potencialmente actualizar el puntero de cola siguiente en el caso de es ahora fuera del intervalo.

Tenga en cuenta que este método no lo utiliza internamente un bloqueo pero tiene acceso a había compartido estado; por lo tanto ha denominado el método con un sufijo "NeedsLock" para recordarle que yo mismo que debe llamarse mientras se mantiene el bloqueo. Observará que ambos llamar a sitios para RemoveQueueNeedLock: una en método Dispose de la cola y otra en DequeueAndExecuteWorkItem método el grupo, llame a mientras se mantiene el bloqueo de las colas.

Con la implementación completa, puede ahora probar esto en el código. En el ejemplo siguiente, he creado una única instancia estática de la RoundRobinThreadPool. Cuando llega un lote de trabajo que se va a procesar, se crea una nueva cola, todo el trabajo en cola a esta cola y se elimina la cola:

 

private static RoundRobinThreadPool _pool = 
  new RoundRobinThreadPool();
...
private void ProcessBatch(Batch b) {
  using(var queue = _pool.CreateQueue()) {
    foreach(var item in b) {
      queue.QueueUserWorkItem(() => ProcessItem(item));
    }
  }
}

Incluso si todo el trabajo para el primer lote a que llegue se programará en primer lugar, tan pronto como llega la segunda por lotes, se iniciará obteniendo un aproximadamente el mismo recurso compartido de los recursos de procesamiento.

Hay Accesorios más adicionales que se puede agregar a esta implementación y la implementación es probable que podría mejorarse desde una perspectiva de rendimiento. Sin embargo, con muy poco código, ha sido capaz crear una abstracción que aprovecha el ThreadPool de .NET y todas las funciones y la funcionalidad que proporciona, todavía sigue obtener compatibilidad con la fairness en función de lote que desea conocer.

Envíe sus preguntas y comentarios de Stephen para netqa@Microsoft.com .

Stephen Toub es un administrador de programas jefe en el equipo de plataforma informática paralela en Microsoft. También es un editor colaborador de MSDN Magazine.