Cara: Melakukan Tindakan Saat Blok Aliran Data Menerima Data

Jenis blok aliran data eksekusi memanggil delegasi yang disediakan pengguna saat mereka menerima data. Kelas System.Threading.Tasks.Dataflow.ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>, dan System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> adalah jenis blok aliran data eksekusi. Anda dapat menggunakan kata kunci delegate (Sub dalam Visual Basic), Action<T>, Func<T,TResult>, atau ekspresi lambda saat Anda menyediakan fungsi kerja ke blok aliran data eksekusi. Dokumen ini menjelaskan cara menggunakan Func<T,TResult> dan ekspresi lambda untuk melakukan tindakan dalam blok eksekusi.


Pustaka Aliran Data TPL (namespace layanan System.Threading.Tasks.Dataflow) tidak didistribusikan dengan .NET. Untuk menginstal namespace layanan System.Threading.Tasks.Dataflow di Visual Studio, buka proyek, pilih Kelola Paket NuGet dari menu Proyek, dan cari paket System.Threading.Tasks.Dataflow secara online. Atau, untuk menginstalnya menggunakan .NET Core CLI, jalankan dotnet add package System.Threading.Tasks.Dataflow.


Contoh berikut menggunakan aliran data untuk membaca file dari disk dan menghitung jumlah byte dalam file yang sama dengan nol. Ini menggunakan TransformBlock<TInput,TOutput> untuk membaca file dan menghitung jumlah nol byte, dan ActionBlock<TInput> untuk mencetak jumlah nol byte ke konsol. Objek TransformBlock<TInput,TOutput> menentukan objek Func<T,TResult> untuk melakukan pekerjaan saat blok menerima data. Objek ActionBlock<TInput> menggunakan ekspresi lambda untuk mencetak ke konsol jumlah nol byte yang dibaca.

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to provide delegates to exectution dataflow blocks.
class DataflowExecutionBlocks
   // Computes the number of zero bytes that the provided file
   // contains.
   static int CountBytes(string path)
      byte[] buffer = new byte[1024];
      int totalZeroBytesRead = 0;
      using (var fileStream = File.OpenRead(path))
         int bytesRead = 0;
            bytesRead = fileStream.Read(buffer, 0, buffer.Length);
            totalZeroBytesRead += buffer.Count(b => b == 0);
         } while (bytesRead > 0);

      return totalZeroBytesRead;

   static void Main(string[] args)
      // Create a temporary file on disk.
      string tempFile = Path.GetTempFileName();

      // Write random data to the temporary file.
      using (var fileStream = File.OpenWrite(tempFile))
         Random rand = new Random();
         byte[] buffer = new byte[1024];
         for (int i = 0; i < 512; i++)
            fileStream.Write(buffer, 0, buffer.Length);

      // Create an ActionBlock<int> object that prints to the console
      // the number of bytes read.
      var printResult = new ActionBlock<int>(zeroBytesRead =>
         Console.WriteLine("{0} contains {1} zero bytes.",
            Path.GetFileName(tempFile), zeroBytesRead);

      // Create a TransformBlock<string, int> object that calls the
      // CountBytes function and returns its result.
      var countBytes = new TransformBlock<string, int>(
         new Func<string, int>(CountBytes));

      // Link the TransformBlock<string, int> object to the
      // ActionBlock<int> object.

      // Create a continuation task that completes the ActionBlock<int>
      // object when the TransformBlock<string, int> finishes.
      countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });

      // Post the path to the temporary file to the
      // TransformBlock<string, int> object.

      // Requests completion of the TransformBlock<string, int> object.

      // Wait for the ActionBlock<int> object to print the message.

      // Delete the temporary file.

/* Sample output:
tmp4FBE.tmp contains 2081 zero bytes.
Imports System.IO
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to provide delegates to exectution dataflow blocks.
Friend Class DataflowExecutionBlocks
    ' Computes the number of zero bytes that the provided file
    ' contains.
    Private Shared Function CountBytes(ByVal path As String) As Integer
        Dim buffer(1023) As Byte
        Dim totalZeroBytesRead As Integer = 0
        Using fileStream = File.OpenRead(path)
            Dim bytesRead As Integer = 0
                bytesRead = fileStream.Read(buffer, 0, buffer.Length)
                totalZeroBytesRead += buffer.Count(Function(b) b = 0)
            Loop While bytesRead > 0
        End Using

        Return totalZeroBytesRead
    End Function

    Shared Sub Main(ByVal args() As String)
        ' Create a temporary file on disk.
        Dim tempFile As String = Path.GetTempFileName()

        ' Write random data to the temporary file.
        Using fileStream = File.OpenWrite(tempFile)
            Dim rand As New Random()
            Dim buffer(1023) As Byte
            For i As Integer = 0 To 511
                fileStream.Write(buffer, 0, buffer.Length)
            Next i
        End Using

        ' Create an ActionBlock<int> object that prints to the console 
        ' the number of bytes read.
        Dim printResult = New ActionBlock(Of Integer)(Sub(zeroBytesRead) Console.WriteLine("{0} contains {1} zero bytes.", Path.GetFileName(tempFile), zeroBytesRead))

        ' Create a TransformBlock<string, int> object that calls the 
        ' CountBytes function and returns its result.
        Dim countBytes = New TransformBlock(Of String, Integer)(New Func(Of String, Integer)(AddressOf DataflowExecutionBlocks.CountBytes))

        ' Link the TransformBlock<string, int> object to the 
        ' ActionBlock<int> object.

        ' Create a continuation task that completes the ActionBlock<int>
        ' object when the TransformBlock<string, int> finishes.
        countBytes.Completion.ContinueWith(Sub() printResult.Complete())

        ' Post the path to the temporary file to the 
        ' TransformBlock<string, int> object.

        ' Requests completion of the TransformBlock<string, int> object.

        ' Wait for the ActionBlock<int> object to print the message.

        ' Delete the temporary file.
    End Sub
End Class

' Sample output:
'tmp4FBE.tmp contains 2081 zero bytes.

Meskipun Anda dapat memberikan ekspresi lambda ke objek TransformBlock<TInput,TOutput>, contoh ini akan menggunakan Func<T,TResult> untuk mengaktifkan kode lain untuk menggunakan metode CountBytes. Objek ActionBlock<TInput> menggunakan ekspresi lambda karena pekerjaan yang akan dilakukan khusus untuk tugas ini dan tidak mungkin berguna dari kode lain. Untuk informasi selengkapnya tentang cara kerja ekspresi lambda di Pustaka Paralel Tugas, lihat Ekspresi Lambda di PLINQ dan TPL.

Bagian Ringkasan Jenis Delegasi dalam dokumen Aliran Data meringkas jenis delegasi yang bisa Anda berikan ke objek ActionBlock<TInput>, TransformBlock<TInput,TOutput>, dan TransformManyBlock<TInput,TOutput>. Tabel juga menentukan apakah jenis delegasi beroperasi secara sinkron atau asinkron.

Pemrograman yang Kuat

Contoh ini menyediakan delegasi jenis Func<T,TResult> ke objek TransformBlock<TInput,TOutput> untuk melakukan tugas blok aliran data secara sinkron. Untuk memungkinkan blok aliran data agar berperilaku asinkron, berikan delegasi jenis Func<T, Task<TResult>> ke blok aliran data. Jika blok aliran data berperilaku asinkron, tugas blok aliran data selesai hanya saat objek Task<TResult> yang ditampilkan selesai. Contoh berikut mengubah metode CountBytes dan menggunakan operator asinkron dan menunggu (Asinkron dan Menunggu dalam Visual Basic) untuk menghitung jumlah total byte yang nol dalam file yang disediakan secara asinkron. Metode ReadAsync melakukan operasi baca file secara asinkron.

// Asynchronously computes the number of zero bytes that the provided file
// contains.
static async Task<int> CountBytesAsync(string path)
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
      int bytesRead = 0;
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);

   return totalZeroBytesRead;
' Asynchronously computes the number of zero bytes that the provided file 
' contains.
Private Shared async Function CountBytesAsync(ByVal path As String) As Task(Of Integer)
    Dim buffer(1023) As Byte
    Dim totalZeroBytesRead As Integer = 0
    Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
        Dim bytesRead As Integer = 0
            ' Asynchronously read from the file stream.
            bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
            totalZeroBytesRead += buffer.Count(Function(b) b = 0)
        Loop While bytesRead > 0
    End Using

    Return totalZeroBytesRead
End Function

Anda juga dapat menggunakan ekspresi lambda asinkron untuk melakukan tindakan dalam blok aliran data eksekusi. Contoh berikut mengubah objek TransformBlock<TInput,TOutput> yang digunakan dalam contoh sebelumnya sehingga menggunakan ekspresi lambda untuk melakukan pekerjaan secara asinkron.

// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytesAsync = new TransformBlock<string, int>(async path =>
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
      int bytesRead = 0;
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);

   return totalZeroBytesRead;
' Create a TransformBlock<string, int> object that calls the 
' CountBytes function and returns its result.
Dim countBytesAsync = New TransformBlock(Of String, Integer)(async Function(path)
                                                                 ' Asynchronously read from the file stream.
                                                                 Dim buffer(1023) As Byte
                                                                 Dim totalZeroBytesRead As Integer = 0
                                                                 Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
                                                                     Dim bytesRead As Integer = 0
                                                                         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
                                                                         totalZeroBytesRead += buffer.Count(Function(b) b = 0)
                                                                     Loop While bytesRead > 0
                                                                 End Using
                                                                 Return totalZeroBytesRead
                                                             End Function)

