Teilen über


Vorgehensweise: Ausführen einer Aktion, wenn ein Datenflussblock Daten empfängt

Ausführungsdatenflussblocktypen rufen eine vom Benutzer bereitgestellte Stellvertretung auf, wenn sie Daten empfangen. Die System.Threading.Tasks.Dataflow.ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> und System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> Klassen sind Ausführungsdatenflussblocktypen. Sie können das delegate Schlüsselwort (Sub in Visual Basic), Action<T>, oder Func<T,TResult>einen Lambda-Ausdruck verwenden, wenn Sie eine Arbeitsfunktion für einen Ausführungsdatenflussblock bereitstellen. In diesem Dokument wird beschrieben, wie Sie Func<T,TResult> und Lambda-Ausdrücke verwenden, um Aktionen in Ausführungsblöcken auszuführen.

Hinweis

Die TPL-Datenflussbibliothek (der System.Threading.Tasks.Dataflow Namespace) ist in .NET 6 und höheren Versionen enthalten. Für .NET Framework- und .NET Standard-Projekte müssen Sie das 📦 NuGet-Paket "System.Threading.Tasks.Dataflow" installieren.

Example

Im folgenden Beispiel wird der Datenfluss verwendet, um eine Datei vom Datenträger zu lesen und die Anzahl der Bytes in dieser Datei zu berechnen, die gleich Null sind. Es verwendet TransformBlock<TInput,TOutput>, um die Datei zu lesen und die Anzahl der Nullbytes zu berechnen. Mit ActionBlock<TInput> wird dann die Anzahl der Nullbytes auf der Konsole ausgegeben. Das TransformBlock<TInput,TOutput> Objekt gibt ein Func<T,TResult> Objekt an, das ausgeführt werden soll, wenn die Blöcke Daten empfangen. Das ActionBlock<TInput> Objekt verwendet einen Lambda-Ausdruck, um in der Konsole die Anzahl der gelesenen Nullbytes zu drucken.

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to provide delegates to exectution dataflow blocks.
class DataflowExecutionBlocks
{
   // Computes the number of zero bytes that the provided file
   // contains.
   static int CountBytes(string path)
   {
      byte[] buffer = new byte[1024];
      int totalZeroBytesRead = 0;
      using (var fileStream = File.OpenRead(path))
      {
         int bytesRead = 0;
         do
         {
            bytesRead = fileStream.Read(buffer, 0, buffer.Length);
            totalZeroBytesRead += buffer.Count(b => b == 0);
         } while (bytesRead > 0);
      }

      return totalZeroBytesRead;
   }

   static void Main(string[] args)
   {
      // Create a temporary file on disk.
      string tempFile = Path.GetTempFileName();

      // Write random data to the temporary file.
      using (var fileStream = File.OpenWrite(tempFile))
      {
         Random rand = new Random();
         byte[] buffer = new byte[1024];
         for (int i = 0; i < 512; i++)
         {
            rand.NextBytes(buffer);
            fileStream.Write(buffer, 0, buffer.Length);
         }
      }

      // Create an ActionBlock<int> object that prints to the console
      // the number of bytes read.
      var printResult = new ActionBlock<int>(zeroBytesRead =>
      {
         Console.WriteLine($"{Path.GetFileName(tempFile)} contains {zeroBytesRead} zero bytes.");
      });

      // Create a TransformBlock<string, int> object that calls the
      // CountBytes function and returns its result.
      var countBytes = new TransformBlock<string, int>(
         new Func<string, int>(CountBytes));

      // Link the TransformBlock<string, int> object to the
      // ActionBlock<int> object.
      countBytes.LinkTo(printResult);

      // Create a continuation task that completes the ActionBlock<int>
      // object when the TransformBlock<string, int> finishes.
      countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });

      // Post the path to the temporary file to the
      // TransformBlock<string, int> object.
      countBytes.Post(tempFile);

      // Requests completion of the TransformBlock<string, int> object.
      countBytes.Complete();

      // Wait for the ActionBlock<int> object to print the message.
      printResult.Completion.Wait();

      // Delete the temporary file.
      File.Delete(tempFile);
   }
}

/* Sample output:
tmp4FBE.tmp contains 2081 zero bytes.
*/
Imports System.IO
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to provide delegates to exectution dataflow blocks.
Friend Class DataflowExecutionBlocks
    ' Computes the number of zero bytes that the provided file
    ' contains.
    Private Shared Function CountBytes(ByVal path As String) As Integer
        Dim buffer(1023) As Byte
        Dim totalZeroBytesRead As Integer = 0
        Using fileStream = File.OpenRead(path)
            Dim bytesRead As Integer = 0
            Do
                bytesRead = fileStream.Read(buffer, 0, buffer.Length)
                totalZeroBytesRead += buffer.Count(Function(b) b = 0)
            Loop While bytesRead > 0
        End Using

        Return totalZeroBytesRead
    End Function

    Shared Sub Main(ByVal args() As String)
        ' Create a temporary file on disk.
        Dim tempFile As String = Path.GetTempFileName()

        ' Write random data to the temporary file.
        Using fileStream = File.OpenWrite(tempFile)
            Dim rand As New Random()
            Dim buffer(1023) As Byte
            For i As Integer = 0 To 511
                rand.NextBytes(buffer)
                fileStream.Write(buffer, 0, buffer.Length)
            Next i
        End Using

        ' Create an ActionBlock<int> object that prints to the console 
        ' the number of bytes read.
        Dim printResult = New ActionBlock(Of Integer)(Sub(zeroBytesRead) Console.WriteLine("{0} contains {1} zero bytes.", Path.GetFileName(tempFile), zeroBytesRead))

        ' Create a TransformBlock<string, int> object that calls the 
        ' CountBytes function and returns its result.
        Dim countBytes = New TransformBlock(Of String, Integer)(New Func(Of String, Integer)(AddressOf DataflowExecutionBlocks.CountBytes))

        ' Link the TransformBlock<string, int> object to the 
        ' ActionBlock<int> object.
        countBytes.LinkTo(printResult)

        ' Create a continuation task that completes the ActionBlock<int>
        ' object when the TransformBlock<string, int> finishes.
        countBytes.Completion.ContinueWith(Sub() printResult.Complete())

        ' Post the path to the temporary file to the 
        ' TransformBlock<string, int> object.
        countBytes.Post(tempFile)

        ' Requests completion of the TransformBlock<string, int> object.
        countBytes.Complete()

        ' Wait for the ActionBlock<int> object to print the message.
        printResult.Completion.Wait()

        ' Delete the temporary file.
        File.Delete(tempFile)
    End Sub
End Class

' Sample output:
'tmp4FBE.tmp contains 2081 zero bytes.
'

Obwohl Sie einen Lambda-Ausdruck für ein TransformBlock<TInput,TOutput> Objekt bereitstellen können, wird in diesem Beispiel verwendet Func<T,TResult> , um anderen Code für die Verwendung der CountBytes Methode zu aktivieren. Das ActionBlock<TInput> Objekt verwendet einen Lambda-Ausdruck, da die auszuführende Arbeit spezifisch für diese Aufgabe ist und wahrscheinlich in anderem Code nicht nützlich ist. Weitere Informationen zur Funktionsweise von Lambda-Ausdrücken in der Task Parallel Library finden Sie unter Lambda-Ausdrücke in PLINQ und TPL.

Der Abschnitt "Zusammenfassung der Delegattypen" im Dataflow-Dokument fasst die Delegattypen zusammen, die Sie für ActionBlock<TInput>, TransformBlock<TInput,TOutput>und TransformManyBlock<TInput,TOutput> Objekte bereitstellen können. Die Tabelle gibt außerdem an, ob der Delegattyp synchron oder asynchron arbeitet.

Robuste Programmierung

In diesem Beispiel wird ein Delegat vom Typ Func<T,TResult> an das TransformBlock<TInput,TOutput> Objekt bereitgestellt, um die Aufgabe des Datenflussblocks synchron auszuführen. Um das asynchrone Verhalten des Datenflussblocks zu ermöglichen, stellen Sie einen Delegat des Typs Func<T, Task<TResult>> für den Datenflussblock bereit. Wenn sich ein Datenflussblock asynchron verhält, ist die Aufgabe des Datenflussblocks nur abgeschlossen, wenn das zurückgegebene Task<TResult> Objekt abgeschlossen ist. Im folgenden Beispiel wird die CountBytes-Methode geändert, und die Async- und Await-Operatoren (Async und Await in Visual Basic) werden verwendet, um asynchron die Gesamtanzahl der null Bytes in der bereitgestellten Datei zu berechnen. Die ReadAsync Methode führt Dateilesevorgänge asynchron aus.

// Asynchronously computes the number of zero bytes that the provided file
// contains.
static async Task<int> CountBytesAsync(string path)
{
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
   {
      int bytesRead = 0;
      do
      {
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);
   }

   return totalZeroBytesRead;
}
' Asynchronously computes the number of zero bytes that the provided file 
' contains.
Private Shared async Function CountBytesAsync(ByVal path As String) As Task(Of Integer)
    Dim buffer(1023) As Byte
    Dim totalZeroBytesRead As Integer = 0
    Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
        Dim bytesRead As Integer = 0
        Do
            ' Asynchronously read from the file stream.
            bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
            totalZeroBytesRead += buffer.Count(Function(b) b = 0)
        Loop While bytesRead > 0
    End Using

    Return totalZeroBytesRead
End Function

Sie können auch asynchrone Lambda-Ausdrücke verwenden, um Aktionen in einem Ausführungsdatenflussblock auszuführen. Im folgenden Beispiel wird das TransformBlock<TInput,TOutput> Objekt geändert, das im vorherigen Beispiel verwendet wird, sodass ein Lambda-Ausdruck zum asynchronen Ausführen der Arbeit verwendet wird.

// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytesAsync = new TransformBlock<string, int>(async path =>
{
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
   {
      int bytesRead = 0;
      do
      {
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);
   }

   return totalZeroBytesRead;
});
' Create a TransformBlock<string, int> object that calls the 
' CountBytes function and returns its result.
Dim countBytesAsync = New TransformBlock(Of String, Integer)(async Function(path)
                                                                 ' Asynchronously read from the file stream.
                                                                 Dim buffer(1023) As Byte
                                                                 Dim totalZeroBytesRead As Integer = 0
                                                                 Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
                                                                     Dim bytesRead As Integer = 0
                                                                     Do
                                                                         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
                                                                         totalZeroBytesRead += buffer.Count(Function(b) b = 0)
                                                                     Loop While bytesRead > 0
                                                                 End Using
                                                                 Return totalZeroBytesRead
                                                             End Function)

Siehe auch