方法:データフロー ブロックでデータを受信したときにアクションを実行する

"実行データフロー ブロック" の型は、データを受信したときに、ユーザーが指定したデリゲートを呼び出します。 System.Threading.Tasks.Dataflow.ActionBlock<TInput>System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>、および System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> クラスは、実行データフロー ブロックの種類です。 実行データフロー ブロックに処理関数を提供するときに、delegate キーワード (Visual Basic では Sub)、Action<T>Func<T,TResult>、またはラムダ式を使用することができます。 このドキュメントでは、Func<T,TResult> とラムダ式を使用して、実行ブロックでアクションを実行する方法について説明します。

注意

TPL データフロー ライブラリ (System.Threading.Tasks.Dataflow 名前空間) は、.NET と一緒には配布されません。 Visual Studio に System.Threading.Tasks.Dataflow 名前空間をインストールするには、プロジェクトを開き、[プロジェクト] メニューの [NuGet パッケージの管理] をクリックし、System.Threading.Tasks.Dataflow パッケージをオンラインで検索します。 または、.NET Core CLI を使ってインストールするには、dotnet add package System.Threading.Tasks.Dataflow を実行します。

次の例では、データフローを使用してディスクからファイルを読み取り、ファイル内のバイト数がゼロに等しい件数を計算します。 TransformBlock<TInput,TOutput> を使用してファイルを読み取り、ゼロ バイトの数を計算し、ActionBlock<TInput> を使用してゼロ バイト数をコンソールに出力します。 TransformBlock<TInput,TOutput> オブジェクトは、ブロックがデータを受け取ったときに処理を実行する Func<T,TResult> オブジェクトを指定します。 ActionBlock<TInput> オブジェクトは、ラムダ式を使用して、読み取ったゼロ バイトの件数をコンソールに出力します。

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to provide delegates to exectution dataflow blocks.
class DataflowExecutionBlocks
{
   // Computes the number of zero bytes that the provided file
   // contains.
   static int CountBytes(string path)
   {
      byte[] buffer = new byte[1024];
      int totalZeroBytesRead = 0;
      using (var fileStream = File.OpenRead(path))
      {
         int bytesRead = 0;
         do
         {
            bytesRead = fileStream.Read(buffer, 0, buffer.Length);
            totalZeroBytesRead += buffer.Count(b => b == 0);
         } while (bytesRead > 0);
      }

      return totalZeroBytesRead;
   }

   static void Main(string[] args)
   {
      // Create a temporary file on disk.
      string tempFile = Path.GetTempFileName();

      // Write random data to the temporary file.
      using (var fileStream = File.OpenWrite(tempFile))
      {
         Random rand = new Random();
         byte[] buffer = new byte[1024];
         for (int i = 0; i < 512; i++)
         {
            rand.NextBytes(buffer);
            fileStream.Write(buffer, 0, buffer.Length);
         }
      }

      // Create an ActionBlock<int> object that prints to the console
      // the number of bytes read.
      var printResult = new ActionBlock<int>(zeroBytesRead =>
      {
         Console.WriteLine("{0} contains {1} zero bytes.",
            Path.GetFileName(tempFile), zeroBytesRead);
      });

      // Create a TransformBlock<string, int> object that calls the
      // CountBytes function and returns its result.
      var countBytes = new TransformBlock<string, int>(
         new Func<string, int>(CountBytes));

      // Link the TransformBlock<string, int> object to the
      // ActionBlock<int> object.
      countBytes.LinkTo(printResult);

      // Create a continuation task that completes the ActionBlock<int>
      // object when the TransformBlock<string, int> finishes.
      countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });

      // Post the path to the temporary file to the
      // TransformBlock<string, int> object.
      countBytes.Post(tempFile);

      // Requests completion of the TransformBlock<string, int> object.
      countBytes.Complete();

      // Wait for the ActionBlock<int> object to print the message.
      printResult.Completion.Wait();

      // Delete the temporary file.
      File.Delete(tempFile);
   }
}

/* Sample output:
tmp4FBE.tmp contains 2081 zero bytes.
*/
Imports System.IO
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to provide delegates to exectution dataflow blocks.
Friend Class DataflowExecutionBlocks
    ' Computes the number of zero bytes that the provided file
    ' contains.
    Private Shared Function CountBytes(ByVal path As String) As Integer
        Dim buffer(1023) As Byte
        Dim totalZeroBytesRead As Integer = 0
        Using fileStream = File.OpenRead(path)
            Dim bytesRead As Integer = 0
            Do
                bytesRead = fileStream.Read(buffer, 0, buffer.Length)
                totalZeroBytesRead += buffer.Count(Function(b) b = 0)
            Loop While bytesRead > 0
        End Using

        Return totalZeroBytesRead
    End Function

    Shared Sub Main(ByVal args() As String)
        ' Create a temporary file on disk.
        Dim tempFile As String = Path.GetTempFileName()

        ' Write random data to the temporary file.
        Using fileStream = File.OpenWrite(tempFile)
            Dim rand As New Random()
            Dim buffer(1023) As Byte
            For i As Integer = 0 To 511
                rand.NextBytes(buffer)
                fileStream.Write(buffer, 0, buffer.Length)
            Next i
        End Using

        ' Create an ActionBlock<int> object that prints to the console 
        ' the number of bytes read.
        Dim printResult = New ActionBlock(Of Integer)(Sub(zeroBytesRead) Console.WriteLine("{0} contains {1} zero bytes.", Path.GetFileName(tempFile), zeroBytesRead))

        ' Create a TransformBlock<string, int> object that calls the 
        ' CountBytes function and returns its result.
        Dim countBytes = New TransformBlock(Of String, Integer)(New Func(Of String, Integer)(AddressOf DataflowExecutionBlocks.CountBytes))

        ' Link the TransformBlock<string, int> object to the 
        ' ActionBlock<int> object.
        countBytes.LinkTo(printResult)

        ' Create a continuation task that completes the ActionBlock<int>
        ' object when the TransformBlock<string, int> finishes.
        countBytes.Completion.ContinueWith(Sub() printResult.Complete())

        ' Post the path to the temporary file to the 
        ' TransformBlock<string, int> object.
        countBytes.Post(tempFile)

        ' Requests completion of the TransformBlock<string, int> object.
        countBytes.Complete()

        ' Wait for the ActionBlock<int> object to print the message.
        printResult.Completion.Wait()

        ' Delete the temporary file.
        File.Delete(tempFile)
    End Sub
End Class

' Sample output:
'tmp4FBE.tmp contains 2081 zero bytes.
'

TransformBlock<TInput,TOutput> オブジェクトにラムダ式を使用することはできますが、この例では Func<T,TResult> を使用して他のコードで CountBytes メソッドを使用できるようにしています。 実行される処理がこのタスク固有であり、他のコードでは役立ちそうにないため、ActionBlock<TInput> オブジェクトではラムダ式を使用しています。 タスク並列ライブラリでのラムダ式の動作の詳細については、「PLINQ および TPL のラムダ式」を参照してください。

データフローに関するドキュメントの「デリゲート型の概要」セクションには、ActionBlock<TInput>TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> オブジェクトに提供できるデリゲート型がまとめられています。 表では、デリゲート型が同期的または非同期的に動作するかどうかについても示しています。

信頼性の高いプログラミング

この例では、データフロー ブロックのタスクを同期的に実行するために、TransformBlock<TInput,TOutput> オブジェクトに Func<T,TResult> 型のデリゲートを提供しています。 データフロー ブロックが非同期的に動作できるようにするには、型 Func<T, Task<TResult>> のデリゲートをデータフロー ブロックに提供します。 データフロー ブロックが非同期的に動作している場合、返された Task<TResult> オブジェクトが完了したときのみ、データフロー ブロックのタスクが完了します。 次の例では、CountBytes メソッドを変更して、async 演算子と await 演算子 (Visual Basic では AsyncAwait) を使用して、提供されたファイル内のゼロ バイトの件数の合計を非同期的に計算します。 ReadAsync メソッドは、ファイルの読み取り操作を非同期的に実行します。

// Asynchronously computes the number of zero bytes that the provided file
// contains.
static async Task<int> CountBytesAsync(string path)
{
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
   {
      int bytesRead = 0;
      do
      {
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);
   }

   return totalZeroBytesRead;
}
' Asynchronously computes the number of zero bytes that the provided file 
' contains.
Private Shared async Function CountBytesAsync(ByVal path As String) As Task(Of Integer)
    Dim buffer(1023) As Byte
    Dim totalZeroBytesRead As Integer = 0
    Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
        Dim bytesRead As Integer = 0
        Do
            ' Asynchronously read from the file stream.
            bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
            totalZeroBytesRead += buffer.Count(Function(b) b = 0)
        Loop While bytesRead > 0
    End Using

    Return totalZeroBytesRead
End Function

実行データフロー ブロックでアクションを実行するために、非同期ラムダ式を使用することもできます。 次の例では、前の例で使用される TransformBlock<TInput,TOutput> オブジェクトを変更して、ラムダ式を使用して処理を非同期的に実行できるようにしています。

// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytesAsync = new TransformBlock<string, int>(async path =>
{
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
   {
      int bytesRead = 0;
      do
      {
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);
   }

   return totalZeroBytesRead;
});
' Create a TransformBlock<string, int> object that calls the 
' CountBytes function and returns its result.
Dim countBytesAsync = New TransformBlock(Of String, Integer)(async Function(path)
                                                                 ' Asynchronously read from the file stream.
                                                                 Dim buffer(1023) As Byte
                                                                 Dim totalZeroBytesRead As Integer = 0
                                                                 Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
                                                                     Dim bytesRead As Integer = 0
                                                                     Do
                                                                         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
                                                                         totalZeroBytesRead += buffer.Count(Function(b) b = 0)
                                                                     Loop While bytesRead > 0
                                                                 End Using
                                                                 Return totalZeroBytesRead
                                                             End Function)

関連項目