Compartir a través de


Asincronía en .NET 4.5: Introducción a Dataflow

  1. Introducción
  2. Async/Await
  3. Task Parallel Library
  4. Introducción a Dataflow
Introducción a Dataflow

A partir del .NET Framework 4.5 hay un nuevo namespace, que viene con la Task Parallel Library: System.Threading.Tasks.Dataflow. Este namespace permite la programación según el modelo de actores independientes y facilita la creación de redes de mensajería. Es una forma de paralelizar trabajos que requieren una utilización alta del CPU o del I/O, para conseguir un rendimiento alto con una latencia baja.

Las Partes Integrantes

Una red de mensajería se forma por actores que se comunican entre ellos. Cada actor tiene su propia tarea y decide de forma independiente cuando ejecutarla y a quien debería avisar una vez que haya terminado de ejecutar una tarea.

El namespace de Dataflow distingue entre tres tipos de actores – o bloques:

· “Buffering Blocks” – bloques que acumulan mensajes

· “Execution Blocks” – bloques que ejecutan tareas

· “Grouping Blocks” – bloques que agrupan mensajes

En seguida os voy a presentar un ejemplo de cada tipo. Una lista completa de todos los bloques que vienen por defecto podéis encontrar aquí: https://msdn.microsoft.com/es-ES/library/hh194722.aspx

Por supuesto, también es posible implementar su propio bloque: https://msdn.microsoft.com/es-es/library/hh228606.aspx

BufferBlock<T>

Como indica el nombre, este bloque acumula mensajes. Se pueden meter mensajes en el buffer desde varias fuentes a la vez, y también se pueden enviar los mensajes a varios destinatarios a la vez. La forma en que los mensajes pasan por el buffer es “FIFO” (“First In First Out”) – el mensaje que entra primero sale primero.

Importante: Si hay varios destinatarios posibles, cada mensaje solo se puede leer por uno de ellos. Para distribuir mensajes a todos los destinatarios a la vez deberíamos usar el BroadcastBlock<T>

Un ejemplo simple de un BufferBlock<T>:

// Crear una instancia de BufferBlock<int>. var bufferBlock = new BufferBlock<int>();   // Postear un par de mensajes. for (int i = 0; i < 3; i++) {    bufferBlock.Post(i); }   // Recibir los mensajes del bloque. for (int i = 0; i < 3; i++) {    Console.WriteLine(bufferBlock.Receive()); }   /* La salida:    0    1    2  */ 

Este bloque permanece al grupo de los bloques que ejecutan tareas. El ActionBlock<T> ejecuta un delegate (una tarea) cuando reciba un mensaje. Según el delegate que se usa, Action o System.Func<TInput, Task>, las tareas se ejecutan de forma síncrona o asíncrona.

En el siguiente ejemplo creamos un ActionBlock<T> , posteamos tres mensajes y esperamos a que termine la ejecución.

// Crear una instancia de ActionBlock<int> que imprime 

// valores en la consola. 

var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

 

// Postear un par de mensajes.

for (int i = 0; i < 3; i++)

{

   actionBlock.Post(i * 10);

}

 

// Poner el bloque en el estado “completed” y esperar

// a que terminen todas las tareas.

actionBlock.Complete();

actionBlock.Completion.Wait();

 

/* La salida:

   0

   10

   20

 */

 

BatchBlock<T>

El objetivo de la última categoría de bloques es agrupar mensajes. El BatchBlock<T> espera a un número de mensajes determinado y entonces los convierte en un array. Si se completa el bloque antes de que le hayan llegado el numero de mensajes necesario, el ultimó array contendrá menos valores – los mensajes que le han llegado hasta el momento.

En el siguiente ejemplo, creamos un BatchBlock<T>, le mandamos trece mensajes y calculamos la suma de cada grupo.

// Crear un BatchBlock<int> que agrupa hasta // diez mensajes.  var batchBlock = new BatchBlock<int>(10);   // Postear un par de mensajes. for (int i = 0; i < 13; i++) {    batchBlock.Post(i); } // Indicar que el bloque haya terminado. // Eso causa que el bloque cree un ultimo // array con los mensajes que queden. batchBlock.Complete();   // Imprimir la suma de los dos grupos.   Console.WriteLine("La suma de los elementos del grupo 1 es {0}.",    batchBlock.Receive().Sum());   Console.WriteLine("La suma de los elementos del grupo 2 es {0}.",    batchBlock.Receive().Sum());   /* La salida:    The sum of the elements in batch 1 is 45.    The sum of the elements in batch 2 is 33.  */   

Ejemplo de una red de mensajería simple

Para no solo ver los bloques uno a uno, si no para hacernos una idea de como funcionan juntos, aquí un ejemplo de una red simple: Creamos números, como lo mensajes del input, y los mandamos a nuestro red de agentes.

Cuando llegue el primer mensaje (el primer numero) los agentes empiezan su trabajo: Calculan la potencia de cada número y agrupan los resultados en arrays. En cada paso un agente independiente imprime el progreso, para que veamos como progresan los mensajes en la red.

Aquí un diagrama que ilustra nuestra red:

clip_image002

Para que la distribución no pase demasiado rápido simulamos carga del CPU con Task.Delay(). Aquí el código comentado:

 

// El buffer inicial, que acumula los mensajes que creamos. BufferBlock<int> buffer = new BufferBlock<int>();   // Un bloque que manda el mismo mensaje por una parte // al siguiente bloque de la red y por otra parte // al agente que imprime el progreso en la consola. BroadcastBlock<int> bcBlock1 = new BroadcastBlock<int>(x => x); ActionBlock<int> reportBlock = new ActionBlock<int>(x => {     Console.WriteLine("-- INPUT --> {0}", x); });   // El bloque que transforma nuestros mensajes: // Coge los números que recibe y calcula su potencia. TransformBlock<int, int> powBlock = newTransformBlock<int, int>(async x => {     awaitTask.Delay(x * 50);     return x * x; });   // Otro conjunto de bloques para imprimir el progreso en la // consola y para avanzar la propagación de los mensajes en la red. BroadcastBlock<int> bcBlock2 = new BroadcastBlock<int>(x => x); ActionBlock<int> afterPowReportBlock = new ActionBlock<int>(x => {     Console.WriteLine("Potencia: {0}", x); });   // El último bloque en nuestra red. Espera a cinco mensajes // y los agrupo en un array. BatchBlock<int> intBatcherBlock = new BatchBlock<int>(5);   // Como ya hemos llegado al final de nuestra red, ya no hace // falta mandar el mensaje a más de un bloque. Por eso solo // creamos un bloque que imprime el resultado final en la consola. ActionBlock<int[]> reportArrayBlock = new ActionBlock<int[]>(x => {     StringBuilder sb = newStringBuilder();     sb.Append("## grupo llenado ## Valores : ");     foreach (int i in x)         sb.Append(i).Append(" ");     Console.WriteLine(sb.ToString()); });   // Ahora tenemos que enlazar los bloques. Este paso define // cómo se comunican entre ellos y realmente es el paso // en que creamos una red. buffer.LinkTo(bcBlock1); bcBlock1.LinkTo(reportBlock); bcBlock1.LinkTo(powBlock);   powBlock.LinkTo(bcBlock2); bcBlock2.LinkTo(afterPowReportBlock); bcBlock2.LinkTo(intBatcherBlock);   intBatcherBlock.LinkTo(reportArrayBlock);   // Ya que tenemos la red preparado, solo nos queda generar // los números y postearlos en el buffer. Lo demás pasará // automáticamente, y en la consola podremos ver como // se propagan los mensajes en nuestra red. awaitTask.Run(async () => {     for (int i = 1; i <= 10; ++i)     {         buffer.Post(i);         awaitTask.Delay(150);     } });

Conclusión

Espero que os haya gustado la introducción del namespace Dataflow. Si os interesa el tema y si queréis saber más, en el siguiente enlace podéis leer más sobre el tema: https://msdn.microsoft.com/es-es/library/hh228603.aspx

Espero que os sirva de ayuda.

- Helge Mahrt