Partager via


Procédure : effectuer une action lorsqu’un bloc de flux de données reçoit des données

Les types de blocs de flux de données d’exécution appellent un délégué fourni par l’utilisateur lorsqu’ils reçoivent des données. Les System.Threading.Tasks.Dataflow.ActionBlock<TInput>classes et System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> les System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>classes sont des types de blocs de flux de données d’exécution. Vous pouvez utiliser le delegate mot clé (Sub en Visual Basic), Action<T>ou Func<T,TResult>une expression lambda lorsque vous fournissez une fonction de travail à un bloc de flux de données d’exécution. Ce document explique comment utiliser Func<T,TResult> et utiliser des expressions lambda pour effectuer une action dans des blocs d’exécution.

Remarque

La bibliothèque de flux de données TPL (l’espace System.Threading.Tasks.Dataflow de noms) n’est pas distribuée avec .NET. Pour installer l’espace System.Threading.Tasks.Dataflow de noms dans Visual Studio, ouvrez votre projet, choisissez Gérer les packages NuGet dans le menu Projet et recherchez en ligne le System.Threading.Tasks.Dataflow package. Sinon, pour l’installer à l’aide de l’interface CLI .NET Core, exécutez dotnet add package System.Threading.Tasks.Dataflow.

Exemple :

L’exemple suivant utilise le flux de données pour lire un fichier à partir du disque et calcule le nombre d’octets dans ce fichier égal à zéro. Il utilise TransformBlock<TInput,TOutput> pour lire le fichier et calculer le nombre d'octets nuls, et ActionBlock<TInput> pour imprimer le nombre d'octets nuls dans la console. L’objet TransformBlock<TInput,TOutput> spécifie un Func<T,TResult> objet à exécuter lorsque les blocs reçoivent des données. L’objet ActionBlock<TInput> utilise une expression lambda pour imprimer dans la console le nombre de zéro octets lus.

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to provide delegates to exectution dataflow blocks.
class DataflowExecutionBlocks
{
   // Computes the number of zero bytes that the provided file
   // contains.
   static int CountBytes(string path)
   {
      byte[] buffer = new byte[1024];
      int totalZeroBytesRead = 0;
      using (var fileStream = File.OpenRead(path))
      {
         int bytesRead = 0;
         do
         {
            bytesRead = fileStream.Read(buffer, 0, buffer.Length);
            totalZeroBytesRead += buffer.Count(b => b == 0);
         } while (bytesRead > 0);
      }

      return totalZeroBytesRead;
   }

   static void Main(string[] args)
   {
      // Create a temporary file on disk.
      string tempFile = Path.GetTempFileName();

      // Write random data to the temporary file.
      using (var fileStream = File.OpenWrite(tempFile))
      {
         Random rand = new Random();
         byte[] buffer = new byte[1024];
         for (int i = 0; i < 512; i++)
         {
            rand.NextBytes(buffer);
            fileStream.Write(buffer, 0, buffer.Length);
         }
      }

      // Create an ActionBlock<int> object that prints to the console
      // the number of bytes read.
      var printResult = new ActionBlock<int>(zeroBytesRead =>
      {
         Console.WriteLine($"{Path.GetFileName(tempFile)} contains {zeroBytesRead} zero bytes.");
      });

      // Create a TransformBlock<string, int> object that calls the
      // CountBytes function and returns its result.
      var countBytes = new TransformBlock<string, int>(
         new Func<string, int>(CountBytes));

      // Link the TransformBlock<string, int> object to the
      // ActionBlock<int> object.
      countBytes.LinkTo(printResult);

      // Create a continuation task that completes the ActionBlock<int>
      // object when the TransformBlock<string, int> finishes.
      countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });

      // Post the path to the temporary file to the
      // TransformBlock<string, int> object.
      countBytes.Post(tempFile);

      // Requests completion of the TransformBlock<string, int> object.
      countBytes.Complete();

      // Wait for the ActionBlock<int> object to print the message.
      printResult.Completion.Wait();

      // Delete the temporary file.
      File.Delete(tempFile);
   }
}

/* Sample output:
tmp4FBE.tmp contains 2081 zero bytes.
*/
Imports System.IO
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to provide delegates to exectution dataflow blocks.
Friend Class DataflowExecutionBlocks
    ' Computes the number of zero bytes that the provided file
    ' contains.
    Private Shared Function CountBytes(ByVal path As String) As Integer
        Dim buffer(1023) As Byte
        Dim totalZeroBytesRead As Integer = 0
        Using fileStream = File.OpenRead(path)
            Dim bytesRead As Integer = 0
            Do
                bytesRead = fileStream.Read(buffer, 0, buffer.Length)
                totalZeroBytesRead += buffer.Count(Function(b) b = 0)
            Loop While bytesRead > 0
        End Using

        Return totalZeroBytesRead
    End Function

    Shared Sub Main(ByVal args() As String)
        ' Create a temporary file on disk.
        Dim tempFile As String = Path.GetTempFileName()

        ' Write random data to the temporary file.
        Using fileStream = File.OpenWrite(tempFile)
            Dim rand As New Random()
            Dim buffer(1023) As Byte
            For i As Integer = 0 To 511
                rand.NextBytes(buffer)
                fileStream.Write(buffer, 0, buffer.Length)
            Next i
        End Using

        ' Create an ActionBlock<int> object that prints to the console 
        ' the number of bytes read.
        Dim printResult = New ActionBlock(Of Integer)(Sub(zeroBytesRead) Console.WriteLine("{0} contains {1} zero bytes.", Path.GetFileName(tempFile), zeroBytesRead))

        ' Create a TransformBlock<string, int> object that calls the 
        ' CountBytes function and returns its result.
        Dim countBytes = New TransformBlock(Of String, Integer)(New Func(Of String, Integer)(AddressOf DataflowExecutionBlocks.CountBytes))

        ' Link the TransformBlock<string, int> object to the 
        ' ActionBlock<int> object.
        countBytes.LinkTo(printResult)

        ' Create a continuation task that completes the ActionBlock<int>
        ' object when the TransformBlock<string, int> finishes.
        countBytes.Completion.ContinueWith(Sub() printResult.Complete())

        ' Post the path to the temporary file to the 
        ' TransformBlock<string, int> object.
        countBytes.Post(tempFile)

        ' Requests completion of the TransformBlock<string, int> object.
        countBytes.Complete()

        ' Wait for the ActionBlock<int> object to print the message.
        printResult.Completion.Wait()

        ' Delete the temporary file.
        File.Delete(tempFile)
    End Sub
End Class

' Sample output:
'tmp4FBE.tmp contains 2081 zero bytes.
'

Bien que vous puissiez fournir une expression lambda à un TransformBlock<TInput,TOutput> objet, cet exemple utilise Func<T,TResult> pour permettre à d’autres codes d’utiliser la CountBytes méthode. L’objet ActionBlock<TInput> utilise une expression lambda, car le travail à effectuer est spécifique à cette tâche et n’est probablement pas utile à partir d’un autre code. Pour plus d’informations sur le fonctionnement des expressions lambda dans la bibliothèque parallèle de tâches, consultez Expressions lambda dans PLINQ et TPL.

La section Résumé des types délégués dans le document Dataflow récapitule les types délégués que vous pouvez fournir aux objets ActionBlock<TInput>, TransformBlock<TInput,TOutput>, et TransformManyBlock<TInput,TOutput>. La table spécifie également si le type délégué fonctionne de manière synchrone ou asynchrone.

Programmation robuste

Cet exemple fournit un délégué de type Func<T,TResult> à l’objet TransformBlock<TInput,TOutput> pour effectuer la tâche du bloc de flux de données de manière synchrone. Pour permettre au bloc de flux de données de se comporter de manière asynchrone, fournissez un délégué de type Func<T, Task<TResult>> au bloc de flux de données. Lorsqu’un bloc de flux de données se comporte de façon asynchrone, la tâche du bloc de flux de données est terminée uniquement lorsque l’objet retourné Task<TResult> se termine. L’exemple suivant modifie la CountBytes méthode et utilise les opérateurs asynchrones et await (Async et Await en Visual Basic) pour calculer de manière asynchrone le nombre total d’octets qui sont zéro dans le fichier fourni. La ReadAsync méthode effectue des opérations de lecture de fichier de manière asynchrone.

// Asynchronously computes the number of zero bytes that the provided file
// contains.
static async Task<int> CountBytesAsync(string path)
{
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
   {
      int bytesRead = 0;
      do
      {
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);
   }

   return totalZeroBytesRead;
}
' Asynchronously computes the number of zero bytes that the provided file 
' contains.
Private Shared async Function CountBytesAsync(ByVal path As String) As Task(Of Integer)
    Dim buffer(1023) As Byte
    Dim totalZeroBytesRead As Integer = 0
    Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
        Dim bytesRead As Integer = 0
        Do
            ' Asynchronously read from the file stream.
            bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
            totalZeroBytesRead += buffer.Count(Function(b) b = 0)
        Loop While bytesRead > 0
    End Using

    Return totalZeroBytesRead
End Function

Vous pouvez également utiliser des expressions lambda asynchrones pour effectuer une action dans un bloc de flux de données d’exécution. L’exemple suivant modifie l’objet TransformBlock<TInput,TOutput> utilisé dans l’exemple précédent afin qu’il utilise une expression lambda pour effectuer le travail de manière asynchrone.

// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytesAsync = new TransformBlock<string, int>(async path =>
{
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
   {
      int bytesRead = 0;
      do
      {
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);
   }

   return totalZeroBytesRead;
});
' Create a TransformBlock<string, int> object that calls the 
' CountBytes function and returns its result.
Dim countBytesAsync = New TransformBlock(Of String, Integer)(async Function(path)
                                                                 ' Asynchronously read from the file stream.
                                                                 Dim buffer(1023) As Byte
                                                                 Dim totalZeroBytesRead As Integer = 0
                                                                 Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
                                                                     Dim bytesRead As Integer = 0
                                                                     Do
                                                                         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
                                                                         totalZeroBytesRead += buffer.Count(Function(b) b = 0)
                                                                     Loop While bytesRead > 0
                                                                 End Using
                                                                 Return totalZeroBytesRead
                                                             End Function)

Voir aussi