Megosztás a következőn keresztül:


Útmutató: Művelet végrehajtása, amikor egy adatfolyam-blokk adatokat fogad

A végrehajtási adatfolyam-blokktípusok meghívnak egy felhasználó által megadott meghatalmazottat, amikor adatokat fogadnak. A System.Threading.Tasks.Dataflow.ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>és System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> az osztályok végrehajtási adatfolyamblokk-típusok. A delegate kulcsszót (Sub Visual Basicben), a Action<T>, Func<T,TResult> vagy a lambda kifejezést akkor használhatja, ha egy végrehajtási adatfolyamblokkhoz munkafüggvényt ad meg. Ez a dokumentum azt ismerteti, hogyan használhatók Func<T,TResult> és használhatók lambda kifejezések a végrehajtási blokkokban végzett műveletek végrehajtásához.

Megjegyzés:

A TPL-adatfolyamtár (a System.Threading.Tasks.Dataflow névtér) nincs elosztva a .NET-tel. Ha telepíteni szeretné a System.Threading.Tasks.Dataflow névteret a Visual Studióban, nyissa meg a projektet, válassza NuGet-csomagok kezelése a Project menüjében, és keressen online a System.Threading.Tasks.Dataflow csomagra. Másik lehetőségként a .NET Core CLI használatával telepítheti, futtassa a dotnet add package System.Threading.Tasks.Dataflow.

Példa

Az alábbi példa adatfolyam használatával olvas be egy fájlt a lemezről, és kiszámítja a fájlban lévő bájtok számát, amelyek nullával egyenlők. A fájl olvasására és a nulla bájtok számának kiszámítására a TransformBlock<TInput,TOutput>-t használja, és a nulla bájtok számának konzolra való nyomtatására a ActionBlock<TInput>-t. Az TransformBlock<TInput,TOutput> objektum egy Func<T,TResult> olyan objektumot határoz meg, amely akkor hajtja végre a munkát, amikor a blokkok adatokat fogadnak. Az ActionBlock<TInput> objektum lambda kifejezéssel nyomtatja ki a konzolra az olvasott nulla bájtok számát.

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to provide delegates to exectution dataflow blocks.
class DataflowExecutionBlocks
{
   // Computes the number of zero bytes that the provided file
   // contains.
   static int CountBytes(string path)
   {
      byte[] buffer = new byte[1024];
      int totalZeroBytesRead = 0;
      using (var fileStream = File.OpenRead(path))
      {
         int bytesRead = 0;
         do
         {
            bytesRead = fileStream.Read(buffer, 0, buffer.Length);
            totalZeroBytesRead += buffer.Count(b => b == 0);
         } while (bytesRead > 0);
      }

      return totalZeroBytesRead;
   }

   static void Main(string[] args)
   {
      // Create a temporary file on disk.
      string tempFile = Path.GetTempFileName();

      // Write random data to the temporary file.
      using (var fileStream = File.OpenWrite(tempFile))
      {
         Random rand = new Random();
         byte[] buffer = new byte[1024];
         for (int i = 0; i < 512; i++)
         {
            rand.NextBytes(buffer);
            fileStream.Write(buffer, 0, buffer.Length);
         }
      }

      // Create an ActionBlock<int> object that prints to the console
      // the number of bytes read.
      var printResult = new ActionBlock<int>(zeroBytesRead =>
      {
         Console.WriteLine($"{Path.GetFileName(tempFile)} contains {zeroBytesRead} zero bytes.");
      });

      // Create a TransformBlock<string, int> object that calls the
      // CountBytes function and returns its result.
      var countBytes = new TransformBlock<string, int>(
         new Func<string, int>(CountBytes));

      // Link the TransformBlock<string, int> object to the
      // ActionBlock<int> object.
      countBytes.LinkTo(printResult);

      // Create a continuation task that completes the ActionBlock<int>
      // object when the TransformBlock<string, int> finishes.
      countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });

      // Post the path to the temporary file to the
      // TransformBlock<string, int> object.
      countBytes.Post(tempFile);

      // Requests completion of the TransformBlock<string, int> object.
      countBytes.Complete();

      // Wait for the ActionBlock<int> object to print the message.
      printResult.Completion.Wait();

      // Delete the temporary file.
      File.Delete(tempFile);
   }
}

/* Sample output:
tmp4FBE.tmp contains 2081 zero bytes.
*/
Imports System.IO
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to provide delegates to exectution dataflow blocks.
Friend Class DataflowExecutionBlocks
    ' Computes the number of zero bytes that the provided file
    ' contains.
    Private Shared Function CountBytes(ByVal path As String) As Integer
        Dim buffer(1023) As Byte
        Dim totalZeroBytesRead As Integer = 0
        Using fileStream = File.OpenRead(path)
            Dim bytesRead As Integer = 0
            Do
                bytesRead = fileStream.Read(buffer, 0, buffer.Length)
                totalZeroBytesRead += buffer.Count(Function(b) b = 0)
            Loop While bytesRead > 0
        End Using

        Return totalZeroBytesRead
    End Function

    Shared Sub Main(ByVal args() As String)
        ' Create a temporary file on disk.
        Dim tempFile As String = Path.GetTempFileName()

        ' Write random data to the temporary file.
        Using fileStream = File.OpenWrite(tempFile)
            Dim rand As New Random()
            Dim buffer(1023) As Byte
            For i As Integer = 0 To 511
                rand.NextBytes(buffer)
                fileStream.Write(buffer, 0, buffer.Length)
            Next i
        End Using

        ' Create an ActionBlock<int> object that prints to the console 
        ' the number of bytes read.
        Dim printResult = New ActionBlock(Of Integer)(Sub(zeroBytesRead) Console.WriteLine("{0} contains {1} zero bytes.", Path.GetFileName(tempFile), zeroBytesRead))

        ' Create a TransformBlock<string, int> object that calls the 
        ' CountBytes function and returns its result.
        Dim countBytes = New TransformBlock(Of String, Integer)(New Func(Of String, Integer)(AddressOf DataflowExecutionBlocks.CountBytes))

        ' Link the TransformBlock<string, int> object to the 
        ' ActionBlock<int> object.
        countBytes.LinkTo(printResult)

        ' Create a continuation task that completes the ActionBlock<int>
        ' object when the TransformBlock<string, int> finishes.
        countBytes.Completion.ContinueWith(Sub() printResult.Complete())

        ' Post the path to the temporary file to the 
        ' TransformBlock<string, int> object.
        countBytes.Post(tempFile)

        ' Requests completion of the TransformBlock<string, int> object.
        countBytes.Complete()

        ' Wait for the ActionBlock<int> object to print the message.
        printResult.Completion.Wait()

        ' Delete the temporary file.
        File.Delete(tempFile)
    End Sub
End Class

' Sample output:
'tmp4FBE.tmp contains 2081 zero bytes.
'

Bár lambda kifejezést adhat meg egy TransformBlock<TInput,TOutput> objektumhoz, ez a példa a Func<T,TResult> használatát teszi lehetővé, hogy más kódok a CountBytes metódust használhassák. Az ActionBlock<TInput> objektum lambda kifejezést használ, mert az elvégzendő munka erre a feladatra vonatkozik, és más kódból valószínűleg nem hasznos. A lambdakifejezések működésével kapcsolatos további információkért lásd a Lambda-kifejezéseket a PLINQ-ban és a TPL-ben.

Az adatfolyam-dokumentumban található Delegálástípusok összegzése című szakasz összefoglalja azokat a delegálási típusokat, amelyeket megadhat a , TransformBlock<TInput,TOutput>és TransformManyBlock<TInput,TOutput> az objektumok számáraActionBlock<TInput>. A tábla azt is meghatározza, hogy a delegálás típusa szinkronban vagy aszinkron módon működik-e.

Robusztus programozás

Ez a példa egy típusmegbízottot Func<T,TResult> biztosít az objektumhoz az TransformBlock<TInput,TOutput> adatfolyam-blokk feladatának szinkron végrehajtásához. Ha engedélyezni szeretné, hogy az adatfolyamblokk aszinkron módon viselkedjen, adjon meg egy delegáltat Func<T, Task<TResult>> az adatfolyamblokkhoz. Ha egy adatfolyamblokk aszinkron módon viselkedik, az adatfolyamblokk feladata csak akkor fejeződik be, ha a visszaadott Task<TResult> objektum befejeződik. Az alábbi példa módosítja a CountBytes metódust, és az aszinkron és várakozási operátorok (Async és Await in Visual Basic) használatával aszinkron módon kiszámítja a megadott fájlban nulla bájtok teljes számát. A ReadAsync metódus aszinkron módon hajtja végre a fájlolvasási műveleteket.

// Asynchronously computes the number of zero bytes that the provided file
// contains.
static async Task<int> CountBytesAsync(string path)
{
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
   {
      int bytesRead = 0;
      do
      {
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);
   }

   return totalZeroBytesRead;
}
' Asynchronously computes the number of zero bytes that the provided file 
' contains.
Private Shared async Function CountBytesAsync(ByVal path As String) As Task(Of Integer)
    Dim buffer(1023) As Byte
    Dim totalZeroBytesRead As Integer = 0
    Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
        Dim bytesRead As Integer = 0
        Do
            ' Asynchronously read from the file stream.
            bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
            totalZeroBytesRead += buffer.Count(Function(b) b = 0)
        Loop While bytesRead > 0
    End Using

    Return totalZeroBytesRead
End Function

Aszinkron lambdakifejezésekkel is végrehajthat műveletet egy végrehajtási adatfolyam-blokkban. Az alábbi példa módosítja az TransformBlock<TInput,TOutput> előző példában használt objektumot, hogy egy lambda kifejezéssel végezze el a munkát aszinkron módon.

// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytesAsync = new TransformBlock<string, int>(async path =>
{
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
   {
      int bytesRead = 0;
      do
      {
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);
   }

   return totalZeroBytesRead;
});
' Create a TransformBlock<string, int> object that calls the 
' CountBytes function and returns its result.
Dim countBytesAsync = New TransformBlock(Of String, Integer)(async Function(path)
                                                                 ' Asynchronously read from the file stream.
                                                                 Dim buffer(1023) As Byte
                                                                 Dim totalZeroBytesRead As Integer = 0
                                                                 Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
                                                                     Dim bytesRead As Integer = 0
                                                                     Do
                                                                         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
                                                                         totalZeroBytesRead += buffer.Count(Function(b) b = 0)
                                                                     Loop While bytesRead > 0
                                                                 End Using
                                                                 Return totalZeroBytesRead
                                                             End Function)

Lásd még