Compartir a través de


Flujo de datos (biblioteca paralela de tareas)

La biblioteca paralela de tareas (TPL) proporciona componentes de flujo de datos para ayudar a aumentar la solidez de las aplicaciones habilitadas para simultaneidad. Estos componentes de flujo de datos se conocen colectivamente como biblioteca de flujos de datos de TPL. Este modelo de flujo de datos promueve la programación basada en actores al proporcionar el paso de mensajes en proceso para tareas de flujo de datos general y canalización. Los componentes de flujo de datos se basan en los tipos y la infraestructura de programación de TPL y se integran con la compatibilidad del lenguaje C#, Visual Basic y F# para la programación asincrónica. Estos componentes de flujo de datos son útiles cuando tiene varias operaciones que deben comunicarse entre sí de forma asincrónica o cuando desea procesar datos a medida que esté disponible. Por ejemplo, considere una aplicación que procesa los datos de imagen de una cámara web. Mediante el modelo de flujo de datos, la aplicación puede procesar fotogramas de imagen a medida que estén disponibles. Si la aplicación mejora fotogramas de imagen, por ejemplo, corrigiendo la luz o reduciendo ojos rojos, puede crear una canalización de los componentes de flujo de datos. Cada fase de la canalización puede utilizar más funcionalidad de paralelismo de grano grueso, como la funcionalidad proporcionada por la biblioteca TPL, para transformar la imagen.

En este documento se proporciona información general sobre la biblioteca de flujos de datos de TPL. Describe el modelo de programación, los tipos de bloques de flujo de datos predefinidos y cómo configurar bloques de flujo de datos para satisfacer los requisitos específicos de las aplicaciones.

Nota:

La biblioteca TPL Dataflow (el espacio de nombres System.Threading.Tasks.Dataflow) no se distribuye con .NET. Para instalar el System.Threading.Tasks.Dataflow espacio de nombres en Visual Studio, abra su proyecto, elija Administrar paquetes NuGet en el menú Proyecto y busque en línea el paquete System.Threading.Tasks.Dataflow. Como otra opción, para instalarlo mediante la CLI de .NET Core, ejecute dotnet add package System.Threading.Tasks.Dataflow.

Modelo de programación

La biblioteca de flujos de datos de TPL proporciona una base para el paso de mensajes y paralelización de aplicaciones con uso intensivo de CPU e E/S que tienen un alto rendimiento y una latencia baja. También proporciona un control explícito sobre cómo se almacena en búfer los datos y se mueven alrededor del sistema. Para comprender mejor el modelo de programación de flujos de datos, considere la posibilidad de una aplicación que cargue imágenes de forma asincrónica desde el disco y cree una composición de esas imágenes. Los modelos de programación tradicionales suelen requerir que use callbacks y objetos de sincronización, como bloqueos de seguridad, para coordinar tareas y acceder a datos compartidos. Mediante el modelo de programación de flujos de datos, puede crear objetos de flujo de datos que procesen imágenes a medida que se leen desde el disco. En el modelo de flujo de datos, se declara cómo se controlan los datos cuando están disponibles y también las dependencias entre los datos. Dado que el tiempo de ejecución administra las dependencias entre datos, a menudo puede evitar el requisito de sincronizar el acceso a los datos compartidos. Además, dado que las programaciones en tiempo de ejecución funcionan en función de la llegada asincrónica de datos, el flujo de datos puede mejorar la capacidad de respuesta y el rendimiento mediante la administración eficaz de los subprocesos subyacentes. Para obtener un ejemplo que usa el modelo de programación de flujo de datos para implementar el procesamiento de imágenes en una aplicación de Windows Forms, vea Tutorial: Uso del flujo de datos en una aplicación de Windows Forms.

Orígenes y destinos

La biblioteca de flujos de datos TPL consta de bloques de flujo de datos, que son estructuras de datos que almacenan en búfer y procesan datos. El TPL define tres tipos de bloques de flujo de datos: bloques de origen, bloques de destino y bloques de propagador. Un bloque de origen actúa como un origen de datos y se puede leer desde él. Un bloque de destino actúa como un receptor de datos y se puede escribir en él. Un bloque de propagador actúa tanto como un bloque de origen como uno de destino, y se puede leer y escribir. El TPL define la System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> interfaz para representar orígenes, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> representar destinos y System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> representar propagadores. IPropagatorBlock<TInput,TOutput> hereda de ISourceBlock<TOutput>, y ITargetBlock<TInput>.

La biblioteca de flujos de datos TPL proporciona varios tipos de bloques de flujo de datos predefinidos que implementan las interfaces ISourceBlock<TOutput>, ITargetBlock<TInput> y IPropagatorBlock<TInput,TOutput>. Estos tipos de bloques de flujo de datos se describen en este documento en la sección Tipos de bloques de flujo de datos predefinidos.

Conectar bloques

Puede conectar bloques de flujo de datos para formar canalizaciones, que son secuencias lineales de bloques de flujo de datos, o redes, que son grafos de bloques de flujo de datos. Una canalización es una forma de red. En una canalización o red, los orígenes propagan de forma asincrónica los datos a los destinos a medida que los datos están disponibles. El ISourceBlock<TOutput>.LinkTo método vincula un bloque de flujo de datos de origen a un bloque de destino. Un origen se puede vincular a cero o más destinos; Los destinos se pueden vincular desde cero o más orígenes. Puede agregar o quitar bloques de flujo de datos a una canalización o red simultáneamente. Los tipos de bloques de flujo de datos predefinidos controlan todos los aspectos de la seguridad para subprocesos de vinculación y desvinculación.

Para obtener un ejemplo que conecta bloques de flujo de datos para formar una canalización básica, consulte Tutorial: Creación de una canalización de flujo de datos. Para obtener un ejemplo que conecta bloques de flujo de datos para formar una red más compleja, consulte Tutorial: Uso del flujo de datos en una aplicación de Windows Forms. Para obtener un ejemplo que desvincule un destino de un origen después de que el origen ofrezca al destino un mensaje, vea Cómo: Desvincular bloques de flujo de datos.

Filtros

Al llamar al ISourceBlock<TOutput>.LinkTo método para vincular un origen a un destino, puede proporcionar un delegado que determine si el bloque de destino acepta o rechaza un mensaje basado en el valor de ese mensaje. Este mecanismo de filtrado es una manera útil de garantizar que un bloque de flujo de datos recibe solo determinados valores. Para la mayoría de los tipos de bloques de flujo de datos predefinidos, si un bloque de origen está conectado a varios bloques de destino, cuando un bloque de destino rechaza un mensaje, el origen ofrece ese mensaje al siguiente destino. El orden en el que un origen ofrece mensajes a destinos se define mediante el origen y puede variar según el tipo del origen. La mayoría de los tipos de bloques de origen dejan de ofrecer un mensaje después de que un destino acepte ese mensaje. Una excepción a esta regla es la BroadcastBlock<T> clase , que ofrece cada mensaje a todos los destinos, incluso si algunos destinos rechazan el mensaje. Para obtener un ejemplo que usa el filtrado para procesar solo determinados mensajes, vea Tutorial: Uso del flujo de datos en una aplicación de Windows Forms.

Importante

Dado que cada tipo de bloque de flujo de datos de origen predefinido garantiza que los mensajes se propagan en el orden en que se reciben, todos los mensajes deben leerse desde el bloque de origen antes de que el bloque de origen pueda procesar el siguiente mensaje. Por lo tanto, al usar el filtrado para conectar varios destinos a un origen, asegúrese de que al menos un bloque de destino recibe cada mensaje. De lo contrario, la aplicación podría quedar en un estado de interbloqueo.

Paso de mensajes

El modelo de programación de flujos de datos está relacionado con el concepto de paso de mensajes, donde los componentes independientes de un programa se comunican entre sí mediante el envío de mensajes. Una manera de propagar mensajes entre los componentes de la aplicación es llamar a los Post métodos (sincrónicos) y SendAsync (asincrónicos) para enviar mensajes a bloques de flujo de datos de destino y los Receivemétodos , ReceiveAsyncy TryReceive para recibir mensajes de bloques de origen. Puede combinar estos métodos con canalizaciones de flujo de datos o redes mediante el envío de datos de entrada al nodo principal (un bloque de destino) y mediante la recepción de datos de salida desde el nodo terminal de la canalización o los nodos de terminal de la red (uno o varios bloques de origen). También puede usar el método Choose para leer desde el primero de los orígenes proporcionados que tiene datos disponibles y ejecutar una acción en esos datos.

Los bloques de origen ofrecen datos a bloques de destino llamando al ITargetBlock<TInput>.OfferMessage método . El bloque de destino responde a un mensaje ofrecido de una de estas tres maneras: puede aceptar el mensaje, rechazar el mensaje o posponer el mensaje. Cuando el destino acepta el mensaje, el OfferMessage método devuelve Accepted. Cuando el destino rechaza el mensaje, el OfferMessage método devuelve Declined. Cuando el destino requiere que ya no reciba ningún mensaje del origen, OfferMessage devuelve DecliningPermanently. Los tipos de bloque de origen predefinidos no ofrecen mensajes a destinos vinculados después de recibir este valor devuelto y desvinculan automáticamente de dichos destinos.

Cuando un bloque de destino pospone el mensaje para su uso posterior, el OfferMessage método devuelve Postponed. Un bloque de destino que pospone un mensaje puede llamar posteriormente al ISourceBlock<TOutput>.ReserveMessage método para intentar reservar el mensaje ofrecido. En este momento, el mensaje sigue estando disponible y se puede usar en el bloque de destino, o bien otro destino ha tomado el mensaje. Cuando el bloque de destino posteriormente necesita el mensaje o ya no lo necesita, llama al método ISourceBlock<TOutput>.ConsumeMessage o ReleaseReservation, respectivamente. La reserva de mensajes la utilizan normalmente los tipos de bloques de flujo de datos que trabajan en modo no expansivo. El modo no codicioso se explica más adelante en este documento. En lugar de reservar un mensaje pospuesto, un bloque de destino también puede usar el ISourceBlock<TOutput>.ConsumeMessage método para intentar consumir directamente el mensaje pospuesto.

Finalización del bloque de flujo de datos

Los bloques de flujo de datos también admiten el concepto de finalización. Un bloque de flujo de datos que está en estado completado no realiza ningún trabajo adicional. Cada bloque de flujo de datos tiene un objeto asociado System.Threading.Tasks.Task , conocido como tarea de finalización, que representa el estado de finalización del bloque. Dado que puede esperar a que un Task objeto finalice, mediante tareas de finalización, puede esperar a que finalicen uno o varios nodos de terminal de una red de flujo de datos. La IDataflowBlock interfaz define el Complete método , que informa al bloque de flujo de datos de una solicitud para que se complete y la Completion propiedad , que devuelve la tarea de finalización del bloque de flujo de datos. Tanto ISourceBlock<TOutput> como ITargetBlock<TInput> heredan la interfaz IDataflowBlock.

Hay dos maneras de determinar si un bloque de flujo de datos se completó sin error, encontró uno o varios errores o se canceló. La primera manera es llamar al método Task.Wait en la tarea de finalización dentro de un bloque try-catch (Try-Catch en Visual Basic). En el ejemplo siguiente se crea un ActionBlock<TInput> objeto que produce ArgumentOutOfRangeException si su valor de entrada es menor que cero. AggregateException se lanza cuando este ejemplo invoca Wait en la tarea de finalización. El ArgumentOutOfRangeException se accede a través de la propiedad InnerExceptions del objeto AggregateException.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

En este ejemplo se muestra el caso en el que una excepción no está controlada en el delegado de un bloque de flujo de datos de ejecución. Se recomienda controlar las excepciones en los cuerpos de estos bloques. Sin embargo, si no puede hacerlo, el bloque se comporta como si se cancelara y no procesara los mensajes entrantes.

Cuando se cancela explícitamente un bloque de flujo de datos, el AggregateException objeto contiene OperationCanceledException en la InnerExceptions propiedad . Para obtener más información sobre la cancelación del flujo de datos, consulte la sección Habilitación de la cancelación .

La segunda manera de determinar el estado de finalización de un bloque de flujo de datos es usar una continuación de la tarea de finalización o usar las características de lenguaje asincrónico de C# y Visual Basic para esperar de forma asincrónica la tarea de finalización. El delegado que proporcione en el método Task.ContinueWith acepta un objeto Task que representa la tarea antecedente. En el caso de la propiedad Completion, el delegado de continuación toma la propia tarea de finalización. En el ejemplo siguiente se parece al anterior, salvo que también usa el ContinueWith método para crear una tarea de continuación que imprima el estado de la operación de flujo de datos global.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

También puede usar propiedades como IsCanceled en el cuerpo de la tarea de continuación para determinar información adicional sobre el estado de finalización de un bloque de flujo de datos. Para obtener más información sobre las tareas de continuación y cómo se relacionan con la cancelación y el control de errores, vea Encadenar tareas mediante tareas de continuación, cancelación de tareas y control de excepciones.

Tipos de bloques de flujo de datos predefinidos

La biblioteca de flujos de datos de TPL proporciona varios tipos de bloques de flujo de datos predefinidos. Estos tipos se dividen en tres categorías: bloques de almacenamiento en búfer, bloques de ejecución y bloques de agrupación. En las secciones siguientes se describen los tipos de bloque que componen estas categorías.

Bloques de almacenamiento en búfer

Los bloques de almacenamiento en búfer contienen datos para su uso por parte de los consumidores de datos. La biblioteca de flujos de datos de TPL proporciona tres tipos de bloques de almacenamiento en búfer: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>y System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

La BufferBlock<T> clase representa una estructura de mensajería asincrónica de uso general. Esta clase almacena una cola FIFO (primero en entrar, primero en salir) de mensajes donde varios orígenes pueden escribir o de los que varios destinos pueden leer. Cuando un destino recibe un mensaje de un BufferBlock<T> objeto , ese mensaje se quita de la cola de mensajes. Por lo tanto, aunque un BufferBlock<T> objeto puede tener varios destinos, solo un destino recibirá cada mensaje. La BufferBlock<T> clase es útil cuando desea pasar varios mensajes a otro componente y ese componente debe recibir cada mensaje.

En el siguiente ejemplo básico se escriben varios Int32 valores en un BufferBlock<T> objeto y, a continuación, se leen esos valores de nuevo desde ese objeto.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Para obtener un ejemplo completo que muestra cómo escribir y leer mensajes desde un BufferBlock<T> objeto , vea How to: Write Messages to and Read Messages from a Dataflow Block.

BroadcastBlock<T>

La BroadcastBlock<T> clase es útil cuando debe pasar varios mensajes a otro componente, pero ese componente solo necesita el valor más reciente. Esta clase también es útil cuando se quiere difundir un mensaje a varios componentes.

En el siguiente ejemplo básico se publica un Double valor en un BroadcastBlock<T> objeto y, a continuación, se lee ese valor de nuevo desde ese objeto varias veces. Dado que los valores no se quitan de BroadcastBlock<T> los objetos después de leerlos, el mismo valor está disponible cada vez.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Para obtener un ejemplo completo que muestra cómo usar BroadcastBlock<T> para difundir un mensaje a varios bloques de destino, vea How to: Specify a Task Scheduler in a Dataflow Block.

WriteOnceBlock<T>

La WriteOnceBlock<T> clase es similar a la BroadcastBlock<T> clase , salvo que un WriteOnceBlock<T> objeto solo se puede escribir en una sola vez. Puede considerar WriteOnceBlock<T> que es similar a la palabra clave Readonly de C# (ReadOnly en Visual Basic), excepto que un WriteOnceBlock<T> objeto se vuelve inmutable después de recibir un valor en lugar de en la construcción. Al igual que la BroadcastBlock<T> clase , cuando un destino recibe un mensaje de un WriteOnceBlock<T> objeto , ese mensaje no se quita de ese objeto. Por lo tanto, varios destinos reciben una copia del mensaje. La WriteOnceBlock<T> clase es útil cuando se desea propagar solo el primero de varios mensajes.

En el siguiente ejemplo básico se publican varios String valores en un WriteOnceBlock<T> objeto y, a continuación, se vuelve a leer el valor desde ese objeto. Dado que un WriteOnceBlock<T> objeto solo se puede escribir en una sola vez, después de que un WriteOnceBlock<T> objeto reciba un mensaje, descarta los mensajes posteriores.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Para obtener un ejemplo completo que muestra cómo usar WriteOnceBlock<T> para recibir el valor de la primera operación que finaliza, vea Cómo: Desvincular bloques de flujo de datos.

Bloques de ejecución

Los bloques de ejecución llaman a un delegado proporcionado por el usuario para cada fragmento de datos recibidos. La biblioteca de flujos de datos de TPL proporciona tres tipos de bloques de ejecución: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>y System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

La ActionBlock<TInput> clase es un bloque de destino que llama a un delegado cuando recibe datos. Piense en un ActionBlock<TInput> objeto como delegado que se ejecuta de forma asincrónica cuando los datos están disponibles. El delegado que se proporciona a un objeto ActionBlock<TInput> puede ser de tipo Action<T> o tipo System.Func<TInput, Task>. Cuando se usa un ActionBlock<TInput> objeto con Action<T>, el procesamiento de cada elemento de entrada se considera completado cuando el delegado devuelve. Cuando se usa un ActionBlock<TInput> objeto con System.Func<TInput, Task>, el procesamiento de cada elemento de entrada solo se considera completado cuando se completa el objeto devuelto Task . Mediante estos dos mecanismos, puede usar ActionBlock<TInput> para el procesamiento sincrónico y asincrónico de cada elemento de entrada.

En el siguiente ejemplo básico se envían varios Int32 valores a un objeto ActionBlock<TInput>. El ActionBlock<TInput> objeto imprime esos valores en la consola. A continuación, este ejemplo establece el bloque en el estado completado y espera a que finalicen todas las tareas de flujo de datos.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Para obtener ejemplos completos que muestran cómo usar delegados con la ActionBlock<TInput> clase , vea Cómo: Realizar acción cuando un bloque de flujo de datos recibe datos.

TransformBlock<TInput, TOutput>

La TransformBlock<TInput,TOutput> clase es similar a la ActionBlock<TInput> clase, excepto que actúa como origen y como destino. El delegado que pasa a un objeto TransformBlock<TInput,TOutput> devuelve un valor de tipo TOutput. El delegado que se proporciona a un objeto TransformBlock<TInput,TOutput> puede ser de tipo System.Func<TInput, TOutput> o tipo System.Func<TInput, Task<TOutput>>. Cuando se usa un TransformBlock<TInput,TOutput> objeto con System.Func<TInput, TOutput>, el procesamiento de cada elemento de entrada se considera completado cuando el delegado devuelve. Cuando se usa un TransformBlock<TInput,TOutput> objeto usado con System.Func<TInput, Task<TOutput>>, el procesamiento de cada elemento de entrada solo se considera completado cuando se completa el objeto devuelto Task<TResult> . Al igual que con ActionBlock<TInput>, mediante estos dos mecanismos, puede usar TransformBlock<TInput,TOutput> para el procesamiento sincrónico y asincrónico de cada elemento de entrada.

En el ejemplo básico siguiente se crea un TransformBlock<TInput,TOutput> objeto que calcula la raíz cuadrada de su entrada. El TransformBlock<TInput,TOutput> objeto toma Int32 valores como entrada y genera Double valores como salida.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Para obtener ejemplos completos que usan TransformBlock<TInput,TOutput> en una red de bloques de flujo de datos que realizan el procesamiento de imágenes en una aplicación de Windows Forms, consulte Tutorial: Uso del flujo de datos en una aplicación de Windows Forms.

TransformManyBlock<TInput, TOutput>

La TransformManyBlock<TInput,TOutput> clase es similar a la TransformBlock<TInput,TOutput> clase, excepto que TransformManyBlock<TInput,TOutput> genera cero o más valores de salida para cada valor de entrada, en lugar de solo un valor de salida para cada valor de entrada. El delegado que se proporciona a un objeto TransformManyBlock<TInput,TOutput> puede ser de tipo System.Func<TInput, IEnumerable<TOutput>> o tipo System.Func<TInput, Task<IEnumerable<TOutput>>>. Cuando se usa un TransformManyBlock<TInput,TOutput> objeto con System.Func<TInput, IEnumerable<TOutput>>, el procesamiento de cada elemento de entrada se considera completado cuando el delegado devuelve. Cuando se usa un TransformManyBlock<TInput,TOutput> objeto con System.Func<TInput, Task<IEnumerable<TOutput>>>, el procesamiento de cada elemento de entrada se considera completo solo cuando se completa el objeto devuelto System.Threading.Tasks.Task<IEnumerable<TOutput>> .

En el ejemplo básico siguiente se crea un TransformManyBlock<TInput,TOutput> objeto que divide las cadenas en sus secuencias de caracteres individuales. El TransformManyBlock<TInput,TOutput> objeto toma String valores como entrada y genera Char valores como salida.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Para obtener ejemplos completos que usan TransformManyBlock<TInput,TOutput> para generar varias salidas independientes para cada entrada en una canalización de flujo de datos, consulte Tutorial: Creación de una canalización de flujo de datos.

Grado de paralelismo

Cada ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput> almacenan en búfer los mensajes de entrada hasta que el bloque esté listo para procesarlos. De forma predeterminada, estas clases procesan los mensajes en el orden en que se reciben, un mensaje cada vez. También puede especificar el grado de paralelismo para permitir que los objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput> procesen varios mensajes simultáneamente. Para obtener más información sobre la ejecución simultánea, vea la sección Especificación del grado de paralelismo más adelante en este documento. Para obtener un ejemplo que establece el grado de paralelismo para permitir que un bloque de flujo de datos de ejecución procese más de un mensaje a la vez, vea How to: Specify the Degree of Parallelism in a Dataflow Block.

Resumen de tipos delegados

En la tabla siguiente se resumen los tipos de delegado que puede proporcionar a los objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput>. Esta tabla también especifica si el tipo de delegado funciona de forma sincrónica o asincrónica.

Tipo Tipo de delegado sincrónico Tipo de delegado asincrónico
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

También puede usar expresiones lambda al trabajar con tipos de bloques de ejecución. Para obtener un ejemplo que muestra cómo usar una expresión lambda con un bloque de ejecución, vea Cómo: Realizar acción cuando un bloque de flujo de datos recibe datos.

Agrupación de bloques

Los bloques de agrupación combinan datos de uno o varios orígenes y bajo varias restricciones. La biblioteca de flujos de datos de TPL proporciona tres tipos de bloque de combinación: BatchBlock<T>, JoinBlock<T1,T2>y BatchedJoinBlock<T1,T2>.

BatchBlock<T>

La BatchBlock<T> clase combina conjuntos de datos de entrada, que se conocen como lotes, en matrices de datos de salida. Especifique el tamaño de cada lote al crear un BatchBlock<T> objeto. Cuando el BatchBlock<T> objeto recibe el recuento especificado de elementos de entrada, propaga de forma asincrónica una matriz que contiene esos elementos. Si un BatchBlock<T> objeto se establece en el estado completado, pero no contiene suficientes elementos para formar un lote, propaga una matriz final que contiene los elementos de entrada restantes.

La BatchBlock<T> clase funciona en modo codicioso o no codicioso. En modo codicioso, que es el valor predeterminado, un BatchBlock<T> objeto acepta todos los mensajes que se le ofrecen y propaga una matriz después de recibir la cantidad especificada de elementos. En modo no expansivo, un objeto BatchBlock<T> pospone todos los mensajes entrantes hasta que haya suficientes orígenes que proporcionen mensajes al bloque para formar un lote. Normalmente, el modo codicioso funciona mejor que el modo no codicioso porque requiere menos carga de procesamiento. Sin embargo, se puede usar el modo no expansivo cuando se debe coordinar el consumo de varios orígenes en modo atómico. Especifique el modo no expansivo estableciendo Greedy en False en el parámetro dataflowBlockOptions del constructor BatchBlock<T>.

En el siguiente ejemplo básico se exponen varios valores Int32 a un objeto BatchBlock<T> que contiene diez elementos en un lote. Para garantizar que todos los valores se propagan fuera de BatchBlock<T>, en este ejemplo se llama al Complete método . El Complete método establece el BatchBlock<T> objeto en el estado completado y, por lo tanto, el BatchBlock<T> objeto propaga los elementos restantes como un lote final.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");

Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Para obtener un ejemplo completo que usa BatchBlock<T> para mejorar la eficacia de las operaciones de inserción de base de datos, vea Tutorial: Uso de BatchBlock y BatchedJoinBlock para mejorar la eficiencia.

JoinBlock<T1, T2, ...>

Las clases JoinBlock<T1,T2> y JoinBlock<T1,T2,T3> recopilan elementos de entrada y propagan objetos System.Tuple<T1,T2> o System.Tuple<T1,T2,T3> que contienen esos elementos. Las JoinBlock<T1,T2> clases y JoinBlock<T1,T2,T3> no heredan de ITargetBlock<TInput>. En su lugar, proporcionan propiedades, Target1, Target2y Target3, que implementan ITargetBlock<TInput>.

Al igual que BatchBlock<T>, JoinBlock<T1,T2> y JoinBlock<T1,T2,T3> operan en modo codicioso o no codicioso. En modo expansivo, que es el valor predeterminado, un objeto JoinBlock<T1,T2> o JoinBlock<T1,T2,T3> acepta cada mensaje que se proporciona y propaga una tupla después de que cada uno de sus destinos reciba por lo menos un mensaje. En modo no expansivo, un objeto JoinBlock<T1,T2> o JoinBlock<T1,T2,T3> pospone todos los mensajes entrantes hasta que todos los destinos han proporcionado los datos necesarios para crear una tupla. En este punto, el bloque se involucra en un protocolo de confirmación en dos fases para recuperar atómicamente todos los elementos necesarios de los orígenes. Esta posposición permite que otra entidad consuma los datos mientras tanto, para permitir que el sistema general avance.

En el ejemplo básico siguiente se muestra un caso en el que un JoinBlock<T1,T2,T3> objeto requiere varios datos para calcular un valor. En este ejemplo se crea un JoinBlock<T1,T2,T3> objeto que requiere dos Int32 valores y un Char valor para realizar una operación aritmética.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
         break;
      case '-':
         Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
         break;
      default:
         Console.WriteLine($"Unknown operator '{data.Item3}'.");
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Para obtener un ejemplo completo que usa JoinBlock<T1,T2> objetos en modo no codicioso para compartir de forma cooperativa un recurso, vea Cómo: Usar JoinBlock para leer datos de varios orígenes.

BatchedJoinBlock<T1, T2, ...>

Las clases BatchedJoinBlock<T1,T2> y BatchedJoinBlock<T1,T2,T3> recopilan lotes de elementos de entrada y propagan objetos System.Tuple(IList(T1), IList(T2)) o System.Tuple(IList(T1), IList(T2), IList(T3)) que contienen esos elementos. Piense en BatchedJoinBlock<T1,T2> como una combinación de BatchBlock<T> y JoinBlock<T1,T2>. Especifique el tamaño de cada lote al crear un BatchedJoinBlock<T1,T2> objeto. BatchedJoinBlock<T1,T2> también proporciona propiedades y Target1Target2, que implementan ITargetBlock<TInput>. Cuando se recibe el recuento especificado de elementos de entrada de todos los destinos, el BatchedJoinBlock<T1,T2> objeto propaga de forma asincrónica un System.Tuple(IList(T1), IList(T2)) objeto que contiene esos elementos.

En el ejemplo básico siguiente se crea un objeto BatchedJoinBlock<T1,T2> que contiene resultados, valores Int32 y errores que son objetos Exception. En este ejemplo, se realizan varias operaciones y se escriben los resultados en la propiedad Target1, y los errores en la propiedad Target2 del objeto BatchedJoinBlock<T1,T2>. Dado que el recuento de operaciones correctas y con errores es desconocido de antemano, los IList<T> objetos permiten que cada destino reciba cero o más valores.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Para obtener un ejemplo completo que usa BatchedJoinBlock<T1,T2> para capturar los resultados y las excepciones que se producen mientras el programa lee desde una base de datos, vea Tutorial: Uso de BatchBlock y BatchedJoinBlock para mejorar la eficiencia.

Configuración del comportamiento del bloque de flujo de datos

Puede habilitar opciones adicionales proporcionando un System.Threading.Tasks.Dataflow.DataflowBlockOptions objeto al constructor de tipos de bloques de flujo de datos. Estas opciones controlan el comportamiento del programador que administra la tarea subyacente y el grado de paralelismo. DataflowBlockOptions También tiene tipos derivados que especifican el comportamiento específico de determinados tipos de bloques de flujo de datos. En la tabla siguiente se resumen los tipos de opciones asociados a cada tipo de bloque de flujo de datos.

Tipo de bloque de flujo de datos DataflowBlockOptions tipo
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

En las secciones siguientes se proporciona información adicional sobre los tipos importantes de opciones de bloque de flujo de datos disponibles a través de las clases System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions y System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.

Especificar el programador de tareas

Cada bloque de flujo de datos predefinido usa el mecanismo de programación de tareas TPL para realizar actividades como propagar datos a un destino, recibir datos de un origen y ejecutar delegados definidos por el usuario cuando los datos estén disponibles. TaskScheduler es una clase abstracta que representa un planificador de tareas que ordena las tareas en hilos. El programador de tareas predeterminado, Default, usa la clase ThreadPool para poner en cola y ejecutar el trabajo. Puede invalidar el programador de tareas predeterminado estableciendo la TaskScheduler propiedad al construir un objeto de bloque de flujo de datos.

Cuando el mismo programador de tareas administra varios bloques de flujo de datos, puede aplicar directivas entre ellos. Por ejemplo, si varios bloques de flujo de datos están configurados para tener como destino el programador exclusivo del mismo ConcurrentExclusiveSchedulerPair objeto, todo el trabajo que se ejecuta en estos bloques se serializa. Del mismo modo, si estos bloques están configurados para tener como destino el programador simultáneo del mismo ConcurrentExclusiveSchedulerPair objeto y ese programador está configurado para tener un nivel de simultaneidad máximo, todo el trabajo de estos bloques se limita a ese número de operaciones simultáneas. Para obtener un ejemplo en donde se usa la clase ConcurrentExclusiveSchedulerPair para permitir que las operaciones de lectura se produzcan en paralelo, pero las operaciones de escritura sean exclusivas del resto de operaciones, vea Procedimiento: Especificación de un Programador de tareas en un bloque de flujo de datos. Para obtener más información sobre los programadores de tareas en el TPL, consulte el tema de la clase TaskScheduler.

Especificar el grado de paralelismo

De forma predeterminada, los tres tipos de bloque de ejecución que proporciona la biblioteca de flujos de datos de TPL, ActionBlock<TInput>, TransformBlock<TInput,TOutput>y TransformManyBlock<TInput,TOutput>, procesan un mensaje cada vez. Estos tipos de bloques de flujo de datos también procesan los mensajes en el orden en que se reciben. Para habilitar estos bloques de flujo de datos para procesar mensajes simultáneamente, establezca la ExecutionDataflowBlockOptions.MaxDegreeOfParallelism propiedad al construir el objeto de bloque de flujo de datos.

El valor predeterminado de MaxDegreeOfParallelism es 1, lo que garantiza que el bloque de flujo de datos procesa un mensaje cada vez. Establecer esta propiedad en un valor mayor que 1 permite que el bloque de flujo de datos procese varios mensajes simultáneamente. Establecer esta propiedad en DataflowBlockOptions.Unbounded permite al programador de tareas subyacente administrar el grado máximo de simultaneidad.

Importante

Cuando se especifica un grado máximo de paralelismo mayor que 1, se procesan simultáneamente varios mensajes y, por tanto, es posible que los mensajes no se procesen en el orden en que se reciben. El orden en el que se generan los mensajes del bloque es, sin embargo, el mismo en el que se reciben.

Dado que la MaxDegreeOfParallelism propiedad representa el grado máximo de paralelismo, el bloque de flujo de datos puede ejecutarse con un menor grado de paralelismo que el especificado. El bloque de flujo de datos puede usar un menor grado de paralelismo para cumplir sus requisitos funcionales o porque faltan recursos del sistema disponibles. Un bloque de flujo de datos nunca elige más paralelismo que el especificado.

El valor de la MaxDegreeOfParallelism propiedad es exclusivo de cada objeto de bloque de flujo de datos. Por ejemplo, si cuatro objetos de bloque de flujo de datos especifican 1 para el grado máximo de paralelismo, los cuatro objetos de bloque de flujo de datos pueden ejecutarse en paralelo.

Para obtener un ejemplo que establece el grado máximo de paralelismo para permitir que se produzcan operaciones largas en paralelo, vea How to: Specify the Degree of Parallelism in a Dataflow Block.

Especificar el número de mensajes por tarea

Los tipos de bloques de flujo de datos predefinidos usan tareas para procesar varios elementos de entrada. Esto ayuda a minimizar el número de objetos de tarea necesarios para procesar datos, lo que permite a las aplicaciones ejecutarse de forma más eficaz. Sin embargo, cuando las tareas de un conjunto de bloques de flujo de datos están procesando datos, es posible que las tareas de otros bloques de flujo de datos tengan que esperar el tiempo de procesamiento en la cola mensajes. Para habilitar una mejor equidad entre las tareas de flujo de datos, establezca la MaxMessagesPerTask propiedad . Cuando MaxMessagesPerTask se establece en DataflowBlockOptions.Unbounded, que es el valor por defecto, la tarea usada por un bloque de flujo de datos procesa tantos mensajes como estén disponibles. Cuando MaxMessagesPerTask se establece en un valor distinto de Unbounded, el bloque de flujo de datos procesa como máximo este número de mensajes por objeto Task. Aunque establecer la propiedad puede aumentar la MaxMessagesPerTask equidad entre las tareas, puede hacer que el sistema cree más tareas de las necesarias, lo que puede reducir el rendimiento.

Habilitación de la cancelación

La biblioteca TPL proporciona un mecanismo que habilita las tareas para coordinar la cancelación de manera cooperativa. Para permitir que los bloques de flujo de datos participen en este mecanismo de cancelación, establezca la CancellationToken propiedad . Cuando este CancellationToken objeto se establece en el estado cancelado, todos los bloques de flujo de datos que supervisan este token finalizan la ejecución de su elemento actual, pero no comienzan a procesar elementos posteriores. Estos bloques de flujo de datos también borran los mensajes almacenados en búfer, liberan conexiones a los bloques de origen y destino y pasan al estado cancelado. Al pasar al estado cancelado, la Completion propiedad tiene la Status propiedad establecida en Canceled, a menos que se produzca una excepción durante el procesamiento. En ese caso, Status se establece en Faulted.

Para obtener un ejemplo que muestra cómo usar la cancelación en una aplicación de Windows Forms, vea Cómo: Cancelar un bloque de flujo de datos. Para obtener más información sobre la cancelación en el TPL, vea Cancelación de tareas.

Especificar comportamiento codicioso frente a no codicioso

Varios tipos de bloques de flujo de datos de agrupación pueden funcionar en modo expansivo o no expansivo . Por defecto, los tipos de bloques de flujo de datos predefinidos funcionan en modo codicioso.

En el caso de tipos de bloque de combinación como JoinBlock<T1,T2>, el modo ávido significa que el bloque acepta inmediatamente datos incluso si los datos correspondientes con los que debe unirse aún no están disponibles. El modo no codicioso significa que el bloque pospone todos los mensajes entrantes hasta que uno esté disponible en cada uno de sus destinos para completar la unión. Si alguno de los mensajes aplazados ya no está disponible, el bloque de unión libera todos los mensajes aplazados y reinicia el proceso. En el caso de la clase BatchBlock<T>, el comportamiento expansivo y no expansivo es similar, excepto que en modo no expansivo, un objeto BatchBlock<T> pospone todos los mensajes entrantes hasta que haya suficientes mensajes disponibles desde fuentes distintas para completar un lote.

Para especificar el modo no codicioso para un bloque de flujo de datos, establezca Greedy en False. Para obtener un ejemplo que muestra cómo usar el modo no codicioso para permitir que varios bloques de combinación compartan un origen de datos de forma más eficaz, consulte Cómo: Usar JoinBlock para Leer Datos de Múltiples Fuentes.

Bloques de flujo de datos personalizados

Aunque la biblioteca de flujos de datos de TPL proporciona muchos tipos de bloques predefinidos, puede crear tipos de bloques adicionales que realicen un comportamiento personalizado. Implemente las ISourceBlock<TOutput> interfaces o ITargetBlock<TInput> directamente o use el Encapsulate método para crear un bloque complejo que encapsula el comportamiento de los tipos de bloques existentes. Para obtener ejemplos que muestran cómo implementar la funcionalidad de bloque de flujo de datos personalizado, consulte Tutorial: Creación de un tipo de bloque de flujo de datos personalizado.

Título Descripción
Cómo: Escribir mensajes en y leer mensajes desde un bloque de flujo de datos Muestra cómo escribir y leer mensajes de un BufferBlock<T> objeto .
Cómo: Implementar un patrón de flujo de datos de Producer-Consumer Describe cómo usar el modelo de flujo de datos para implementar un patrón productor-consumidor, donde el productor envía mensajes a un bloque de flujo de datos y el consumidor lee los mensajes de ese bloque.
Cómo: Realizar una acción cuando un bloque de flujo de datos recibe datos Describe cómo proporcionar delegados a los tipos de bloque de flujo de datos de ejecución, ActionBlock<TInput>, TransformBlock<TInput,TOutput>y TransformManyBlock<TInput,TOutput>.
Tutorial: Creación de una canalización de flujo de datos Describe cómo crear una canalización de flujo de datos que descargue texto de la web y realice operaciones en ese texto.
Cómo: Desvincular bloques de flujo de datos Muestra cómo usar el LinkTo método para desvincular un bloque de destino de su origen después de que el origen ofrezca un mensaje al destino.
Tutorial: Uso del flujo de datos en una aplicación de Windows Forms Muestra cómo crear una red de bloques de flujo de datos que realizan el procesamiento de imágenes en una aplicación de Windows Forms.
Cómo: Cancelar un bloque de flujo de datos Muestra cómo usar la cancelación en una aplicación de Windows Forms.
Cómo: Usar JoinBlock para leer datos de varios orígenes Explica cómo usar la JoinBlock<T1,T2> clase para realizar una operación cuando los datos están disponibles desde varios orígenes y cómo usar el modo no expansivo para permitir que varios bloques de combinación compartan un origen de datos de forma más eficaz.
Cómo: Especificar el grado de paralelismo en un bloque de flujo de datos Describe cómo establecer la MaxDegreeOfParallelism propiedad para habilitar un bloque de flujo de datos de ejecución para procesar más de un mensaje a la vez.
Cómo: Especificar un programador de tareas en un bloque de flujo de datos Muestra cómo asociar un programador de tareas específico al usar el flujo de datos en la aplicación.
Tutorial: Uso de BatchBlock y BatchedJoinBlock para mejorar la eficacia Describe cómo usar la BatchBlock<T> clase para mejorar la eficacia de las operaciones de inserción de base de datos y cómo usar la BatchedJoinBlock<T1,T2> clase para capturar los resultados y las excepciones que se producen mientras el programa lee de una base de datos.
Tutorial: Creación de un tipo de bloque de flujo de datos personalizado Muestra dos maneras de crear un tipo de bloque de flujo de datos que implemente el comportamiento personalizado.
Biblioteca paralela de tareas (TPL) Presenta el TPL, una biblioteca que simplifica la programación paralela y simultánea en aplicaciones de .NET Framework.