Información general sobre BlockingCollection
BlockingCollection<T> es una clase de colección segura para subprocesos que proporciona las siguientes características:
Una implementación del patrón productor-consumidor.
Agregar y quitar elementos en varios subprocesos de forma simultánea.
Capacidad máxima opcional.
Operaciones de inserción y eliminación que se bloquean cuando la colección está vacía o completa.
Inserción y eliminación de operaciones "try" que no se bloquean o que se bloquean en un período de tiempo especificado.
Encapsula cualquier tipo de colección que implementa IProducerConsumerCollection<T>.
Cancelación con tokens de cancelación.
Dos tipos de enumeraciones con
foreach
(For Each
en Visual Basic):Enumeración de solo lectura.
Enumeración que quita los elementos que se enumeran.
Compatibilidad con límites y bloqueos
BlockingCollection<T> admite límites y bloqueos. Los límites implican que puede establecer la capacidad máxima de la colección. Los límites son importantes en ciertos escenarios, porque le permiten controlar el tamaño máximo de la colección en memoria y evitan que los subprocesos de producción vayan demasiado por delante de los subprocesos de consumo.
Varios subprocesos o tareas pueden agregar elementos a la colección de forma simultánea y, si la colección alcanza su capacidad máxima especificada, los subprocesos de producción se bloquearán hasta que se quite un elemento. Varios consumidores pueden quitar elementos de forma simultánea y, si la colección está vacía, los subprocesos de consumo se bloquearán hasta que un productor agregue un elemento. Un subproceso de producción puede llamar a CompleteAdding para indicar que no se agregarán más elementos. Los consumidores supervisan la propiedad IsCompleted para saber cuándo está vacía la colección y no se agregarán más elementos. En el ejemplo siguiente, se muestra una clase BlockingCollection sencilla con una capacidad limitada de 100. Una tarea de producción agrega elementos a la colección siempre que alguna condición externa sea true y después llama a CompleteAdding. La tarea de consumidor toma elementos hasta que la propiedad IsCompleted es true.
// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);
// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
while (!dataItems.IsCompleted)
{
Data data = null;
// Blocks if dataItems.Count == 0.
// IOE means that Take() was called on a completed collection.
// Some other thread can call CompleteAdding after we pass the
// IsCompleted check but before we call Take.
// In this example, we can simply catch the exception since the
// loop will break on the next iteration.
try
{
data = dataItems.Take();
}
catch (InvalidOperationException) { }
if (data != null)
{
Process(data);
}
}
Console.WriteLine("\r\nNo more items to take.");
});
// A simple blocking producer with no cancellation.
Task.Run(() =>
{
while (moreItemsToAdd)
{
Data data = GetData();
// Blocks if numbers.Count == dataItems.BoundedCapacity
dataItems.Add(data);
}
// Let consumer know we are done.
dataItems.CompleteAdding();
});
' A bounded collection. It can hold no more
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)
' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
While dataItems.IsCompleted = False
Dim dataItem As Data = Nothing
Try
dataItem = dataItems.Take()
Catch e As InvalidOperationException
' IOE means that Take() was called on a completed collection.
' In this example, we can simply catch the exception since the
' loop will break on the next iteration.
End Try
If (dataItem IsNot Nothing) Then
Process(dataItem)
End If
End While
Console.WriteLine(vbCrLf & "No more items to take.")
End Sub)
' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
While moreItemsToAdd = True
Dim item As Data = GetData()
' Blocks if dataItems.Count = dataItems.BoundedCapacity.
dataItems.Add(item)
End While
' Let consumer know we are done.
dataItems.CompleteAdding()
End Sub)
Para obtener un ejemplo completo, vea Cómo: Agregar y tomar elementos de forma individual en una clase BlockingCollection.
Operaciones de bloqueo cronometradas
En las operaciones de bloqueo cronometradas TryAdd y TryTake en colecciones limitadas, el método intenta agregar o quitar un elemento. Si un elemento está disponible, se coloca en la variable que ha pasado la referencia y el método devuelve true. Si no se recupera ningún elemento después de un período de tiempo de espera especificado, el método devuelve false. El subproceso es libre para realizar otro trabajo útil antes de intentar acceder de nuevo a la colección. Para obtener un ejemplo de acceso de bloqueo cronometrado, vea el segundo ejemplo de Cómo: Agregar y tomar elementos de forma individual en una clase BlockingCollection.
Cancelar operaciones de agregar y tomar
Las operaciones de agregar y tomar se realizan normalmente en un bucle. Puede cancelar un bucle al pasar CancellationToken a los métodos TryAdd o TryTake, y después comprobar el valor de la propiedad IsCancellationRequested del token en cada iteración. Si el valor es true, puede decidir si responde a la solicitud de cancelación limpiando los recursos y saliendo del bucle. En el ejemplo siguiente, se muestra una sobrecarga de TryAdd que toma un token de cancelación y el código que lo usa:
do
{
// Cancellation causes OCE. We know how to handle it.
try
{
success = bc.TryAdd(itemToAdd, 2, ct);
}
catch (OperationCanceledException)
{
bc.CompleteAdding();
break;
}
//...
} while (moreItems == true);
Do While moreItems = True
' Cancellation causes OCE. We know how to handle it.
Try
success = bc.TryAdd(itemToAdd, 2, ct)
Catch ex As OperationCanceledException
bc.CompleteAdding()
Exit Do
End Try
Loop
Para obtener un ejemplo de cómo agregar compatibilidad con la cancelación, vea el segundo ejemplo de Cómo: Agregar y tomar elementos de forma individual en una clase BlockingCollection.
Especificar el tipo de colección
Cuando se crea una BlockingCollection<T>, puede especificar no solo la capacidad limitada, sino también el tipo de colección que se usará. Por ejemplo, puede especificar un objeto ConcurrentQueue<T> para un comportamiento FIFO (primero en entrar, primero en salir) o un objeto ConcurrentStack<T> para un comportamiento LIFO (último en entrar, primero en salir). Puede usar cualquier clase de colección que implemente la interfaz IProducerConsumerCollection<T>. El tipo de colección predeterminado para BlockingCollection<T> es ConcurrentQueue<T>. En el ejemplo de código siguiente, se muestra cómo crear una BlockingCollection<T> de cadenas que tiene una capacidad de 1000 y usa un objeto ConcurrentBag<T>:
Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );
Para obtener más información, vea Cómo: Agregar la funcionalidad de límite y bloqueo a una colección.
Compatibilidad con IEnumerable
BlockingCollection<T> proporciona un método GetConsumingEnumerable que permite a los consumidores usar foreach
(For Each
en Visual Basic) para quitar elementos hasta que la colección está completa, lo que significa que está vacía y no se agregan más elementos. Para obtener más información, vea Cómo: Usar ForEach para quitar elementos de BlockingCollection.
Usar muchas BlockingCollections como una
Para escenarios en los que un consumidor necesita tomar elementos de varias colecciones de forma simultánea, se pueden crear matrices de BlockingCollection<T> y usar los métodos estáticos, como TakeFromAny y AddToAny, que agregarán a cualquiera de las colecciones de la matriz o tomarán desde ellas. Si se bloquea una colección, el método intenta otra de forma inmediata hasta que encuentra una que pueda realizar la operación. Para obtener más información, vea Cómo: Usar matrices de colecciones de bloqueo en una canalización.