Procedimiento Toma de medidas cuando un bloque de flujos de datos recibe datos

Los tipos Bloque de flujo de datos de ejecución llaman a un delegado proporcionado por el usuario cuando reciben datos. Las clases System.Threading.Tasks.Dataflow.ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> y System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> son tipos de bloques de flujo de datos de ejecución. Puede usar la palabra clave delegate (Sub en Visual Basic), Action<T>, Func<T,TResult> o una expresión lambda cuando proporcione una función de trabajo a un bloque de flujo de datos de ejecución. Este documento describe cómo usar Func<T,TResult> y expresiones lambda para realizar la acción en bloques de ejecución.


La biblioteca de flujos de datos TPL (el espacio de nombres System.Threading.Tasks.Dataflow) no se distribuye con .NET. Para instalar el espacio de nombres System.Threading.Tasks.Dataflow en Visual Studio, abra el proyecto, seleccione Administrar paquetes NuGet en el menú Proyecto y busque en línea el paquete System.Threading.Tasks.Dataflow. Como alternativa, para realizar la instalación con la CLI de .Net Core, ejecute dotnet add package System.Threading.Tasks.Dataflow.


En el ejemplo siguiente se usa el flujo de datos para leer un archivo del disco y se calcula el número de bytes en el archivo que son iguales a cero. Usa TransformBlock<TInput,TOutput> para leer el archivo y calcular el número de cero bytes, y ActionBlock<TInput> para imprimir el número de cero bytes en la consola. El objeto TransformBlock<TInput,TOutput> especifica un objeto Func<T,TResult> para realizar el trabajo cuando los bloques reciben datos. El objeto ActionBlock<TInput> usa una expresión lambda para imprimir en la consola el número de cero bytes que se lee.

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to provide delegates to exectution dataflow blocks.
class DataflowExecutionBlocks
   // Computes the number of zero bytes that the provided file
   // contains.
   static int CountBytes(string path)
      byte[] buffer = new byte[1024];
      int totalZeroBytesRead = 0;
      using (var fileStream = File.OpenRead(path))
         int bytesRead = 0;
            bytesRead = fileStream.Read(buffer, 0, buffer.Length);
            totalZeroBytesRead += buffer.Count(b => b == 0);
         } while (bytesRead > 0);

      return totalZeroBytesRead;

   static void Main(string[] args)
      // Create a temporary file on disk.
      string tempFile = Path.GetTempFileName();

      // Write random data to the temporary file.
      using (var fileStream = File.OpenWrite(tempFile))
         Random rand = new Random();
         byte[] buffer = new byte[1024];
         for (int i = 0; i < 512; i++)
            fileStream.Write(buffer, 0, buffer.Length);

      // Create an ActionBlock<int> object that prints to the console
      // the number of bytes read.
      var printResult = new ActionBlock<int>(zeroBytesRead =>
         Console.WriteLine("{0} contains {1} zero bytes.",
            Path.GetFileName(tempFile), zeroBytesRead);

      // Create a TransformBlock<string, int> object that calls the
      // CountBytes function and returns its result.
      var countBytes = new TransformBlock<string, int>(
         new Func<string, int>(CountBytes));

      // Link the TransformBlock<string, int> object to the
      // ActionBlock<int> object.

      // Create a continuation task that completes the ActionBlock<int>
      // object when the TransformBlock<string, int> finishes.
      countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });

      // Post the path to the temporary file to the
      // TransformBlock<string, int> object.

      // Requests completion of the TransformBlock<string, int> object.

      // Wait for the ActionBlock<int> object to print the message.

      // Delete the temporary file.

/* Sample output:
tmp4FBE.tmp contains 2081 zero bytes.
Imports System.IO
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to provide delegates to exectution dataflow blocks.
Friend Class DataflowExecutionBlocks
    ' Computes the number of zero bytes that the provided file
    ' contains.
    Private Shared Function CountBytes(ByVal path As String) As Integer
        Dim buffer(1023) As Byte
        Dim totalZeroBytesRead As Integer = 0
        Using fileStream = File.OpenRead(path)
            Dim bytesRead As Integer = 0
                bytesRead = fileStream.Read(buffer, 0, buffer.Length)
                totalZeroBytesRead += buffer.Count(Function(b) b = 0)
            Loop While bytesRead > 0
        End Using

        Return totalZeroBytesRead
    End Function

    Shared Sub Main(ByVal args() As String)
        ' Create a temporary file on disk.
        Dim tempFile As String = Path.GetTempFileName()

        ' Write random data to the temporary file.
        Using fileStream = File.OpenWrite(tempFile)
            Dim rand As New Random()
            Dim buffer(1023) As Byte
            For i As Integer = 0 To 511
                fileStream.Write(buffer, 0, buffer.Length)
            Next i
        End Using

        ' Create an ActionBlock<int> object that prints to the console 
        ' the number of bytes read.
        Dim printResult = New ActionBlock(Of Integer)(Sub(zeroBytesRead) Console.WriteLine("{0} contains {1} zero bytes.", Path.GetFileName(tempFile), zeroBytesRead))

        ' Create a TransformBlock<string, int> object that calls the 
        ' CountBytes function and returns its result.
        Dim countBytes = New TransformBlock(Of String, Integer)(New Func(Of String, Integer)(AddressOf DataflowExecutionBlocks.CountBytes))

        ' Link the TransformBlock<string, int> object to the 
        ' ActionBlock<int> object.

        ' Create a continuation task that completes the ActionBlock<int>
        ' object when the TransformBlock<string, int> finishes.
        countBytes.Completion.ContinueWith(Sub() printResult.Complete())

        ' Post the path to the temporary file to the 
        ' TransformBlock<string, int> object.

        ' Requests completion of the TransformBlock<string, int> object.

        ' Wait for the ActionBlock<int> object to print the message.

        ' Delete the temporary file.
    End Sub
End Class

' Sample output:
'tmp4FBE.tmp contains 2081 zero bytes.

Aunque puede proporcionar una expresión lambda para un objeto TransformBlock<TInput,TOutput>, este ejemplo utiliza Func<T,TResult> para habilitar otro código para usar el método CountBytes. El objeto ActionBlock<TInput> utiliza una expresión lambda porque el trabajo que se debe realizar es específico de esta tarea y es probable que no sea útil desde otro código. Para información sobre cómo funcionan las expresiones lambda en la biblioteca TPL, vea Expresiones lambda en PLINQ y TPL.

En la sección Resumen de tipos de delegado del documento Flujo de datos se resumen los tipos de delegado que puede proporcionar a los objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput>. La tabla también especifica si el tipo de delegado funciona de forma sincrónica o asincrónica.

Programación sólida

En este ejemplo se proporcionada un delegado de tipo Func<T,TResult> para el objeto TransformBlock<TInput,TOutput>, con el fin de realizar la tarea del bloque de flujo de datos de forma sincrónica. Para que el bloque de flujo de datos se comporte de forma asincrónica, proporcione un delegado de tipo Func<T, Task<TResult>> al bloque de flujo de datos. Cuando un bloque de flujo de datos se comporta de forma asincrónica, la tarea del bloque de flujo de datos se completa solo cuando el objeto Task<TResult> devuelto finaliza. En el ejemplo siguiente se modifica el método CountBytes y se usan los operadores async y await (Async y Await en Visual Basic) para calcular de forma asincrónica el número total de bytes que son cero en el archivo proporcionado. El método ReadAsync realiza las operaciones de lectura de archivo de forma asincrónica.

// Asynchronously computes the number of zero bytes that the provided file
// contains.
static async Task<int> CountBytesAsync(string path)
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
      int bytesRead = 0;
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);

   return totalZeroBytesRead;
' Asynchronously computes the number of zero bytes that the provided file 
' contains.
Private Shared async Function CountBytesAsync(ByVal path As String) As Task(Of Integer)
    Dim buffer(1023) As Byte
    Dim totalZeroBytesRead As Integer = 0
    Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
        Dim bytesRead As Integer = 0
            ' Asynchronously read from the file stream.
            bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
            totalZeroBytesRead += buffer.Count(Function(b) b = 0)
        Loop While bytesRead > 0
    End Using

    Return totalZeroBytesRead
End Function

También puede utilizar expresiones lambda asincrónicas para realizar la acción en un bloque de flujo de datos de ejecución. En el ejemplo siguiente se modifica el objeto TransformBlock<TInput,TOutput> que se utiliza en el ejemplo anterior para que utilice una expresión lambda para realizar el trabajo de forma asincrónica.

// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytesAsync = new TransformBlock<string, int>(async path =>
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
      int bytesRead = 0;
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);

   return totalZeroBytesRead;
' Create a TransformBlock<string, int> object that calls the 
' CountBytes function and returns its result.
Dim countBytesAsync = New TransformBlock(Of String, Integer)(async Function(path)
                                                                 ' Asynchronously read from the file stream.
                                                                 Dim buffer(1023) As Byte
                                                                 Dim totalZeroBytesRead As Integer = 0
                                                                 Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
                                                                     Dim bytesRead As Integer = 0
                                                                         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
                                                                         totalZeroBytesRead += buffer.Count(Function(b) b = 0)
                                                                     Loop While bytesRead > 0
                                                                 End Using
                                                                 Return totalZeroBytesRead
                                                             End Function)

Vea también