チュートリアル: データフロー パイプラインの作成

DataflowBlock.ReceiveDataflowBlock.ReceiveAsyncDataflowBlock.TryReceive の各メソッドを使用してソース ブロックからメッセージを受信できますが、メッセージ ブロックを接続してデータフロー パイプラインを形成することもできます。 データフロー パイプラインは一連のデータフロー ブロックで構成されます。それぞれが特定のタスクを実行し、全体として 1 つの大きな目標を達成します。 データフロー パイプラインのすべてのデータフロー ブロックは、他のデータフロー ブロックからメッセージを受け取ったときに処理を実行します。 これは、自動車製造の組み立てラインに例えることができます。 各車両が組み立てラインを通過する際、あるステーションではフレームを組み立て、次のステーションではエンジンを設置するなどです。 組み立てラインでは、複数の車両を同時に組み立てることができるため、一度に車両全体を組み立てるよりスループットが向上します。

このドキュメントで説明するデータフロー パイプラインでは、『The Iliad of Homer』(ホメロスのイリアス) という英語の書籍を Web サイトからダウンロードしてから、テキストの中の単語ごとに、その単語の文字を逆に並べた単語を探します。 このドキュメントでは、データフロー パイプラインは、次の手順で構成しています。

  1. パイプラインに参加しているデータフロー ブロックを作成します。

  2. 各データフロー ブロックを、パイプラインの次のブロックに接続します。 各ブロックは、入力として、パイプラインの前のブロックの出力を受信します。

  3. 各データフロー ブロックでは、前のブロックの終了後、次のブロックで完了状態に設定する継続タスクを作成します。

  4. データをパイプラインの先頭に送信します。

  5. パイプラインの先頭を完了としてマークします。

  6. パイプラインのすべての作業が完了するまで待機します。

必須コンポーネント

このチュートリアルを開始する前に、「Dataflow (データフロー)」をお読みください。

コンソール アプリケーションの作成

Visual Studio で、Visual C# または Visual Basic のコンソール アプリケーション プロジェクトを作成します。 System.Threading.Tasks.Dataflow NuGet パッケージをインストールします。

注意

TPL データフロー ライブラリ (System.Threading.Tasks.Dataflow 名前空間) は、.NET と一緒には配布されません。 Visual Studio に System.Threading.Tasks.Dataflow 名前空間をインストールするには、プロジェクトを開き、[プロジェクト] メニューの [NuGet パッケージの管理] をクリックし、System.Threading.Tasks.Dataflow パッケージをオンラインで検索します。 または、.NET Core CLI を使ってインストールするには、dotnet add package System.Threading.Tasks.Dataflow を実行します。

次のコードをプロジェクトに追加して、基本のアプリケーションを作成します。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

データフロー ブロックの作成

次のコードを Main メソッドに追加して、パイプラインに参加するデータフロー ブロックを作成します。 次の表は、パイプラインの各メンバーの役割をまとめたものです。

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
メンバー 種類 説明
downloadString TransformBlock<TInput,TOutput> Web から書籍のテキストをダウンロードします。
createWordList TransformBlock<TInput,TOutput> 書籍のテキストを単語の配列に区切ります。
filterWordList TransformBlock<TInput,TOutput> 短い単語や重複するものを配列から削除します。
findReversedWords TransformManyBlock<TInput,TOutput> フィルター処理した単語の配列のコレクションにある全単語のうち、逆方向の単語の配列も発生するものを検索します。
printReversedWords ActionBlock<TInput> 単語と、関連する逆方向の単語をコンソールに表示します。

この例のデータフロー パイプラインの複数の手順を 1 つの手順に結合することができますが、この例では、大規模なタスクを実行するために複数の独立したデータ フロー タスクを構成する概念を示します。 この例では、TransformBlock<TInput,TOutput> を使用して、パイプラインの各メンバーが入力データで操作を実行し、結果をパイプラインの次の手順に送ることができるようにします。 パイプラインの findReversedWords のメンバーは、各入力に複数の独立した出力を生成するため、TransformManyBlock<TInput,TOutput> オブジェクトになります。 パイプラインの末尾の printReversedWordsActionBlock<TInput> オブジェクトです。このメンバーは入力に対して操作を実行しますが、結果を生成しないからです。

パイプラインの形成

各ブロックをパイプラインの次のブロックに接続するには、次のコードを追加します。

LinkTo メソッドを呼び出してソース データフロー ブロックをターゲット データフロー ブロックに接続すると、データが使用可能になったときにソース データフロー ブロックがターゲット ブロックにデータを反映させます。 DataflowLinkOptions も指定して PropagateCompletion を true に設定した場合は、パイプライン内のブロックの 1 つが成功か不成功かを問わず完了すると、そのパイプライン内の次のブロックも完了となります。

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

パイプラインへのデータの送信

The Iliad of Homer』(ホメロスのイリアス) という書籍の URL をデータフロー パイプラインの先頭に送信するために、次のコードを追加します。

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

この例では、DataflowBlock.Post を使用して、同期的にパイプラインの先頭にデータを送信します。 データフロー ノードにデータを非同期的に送信する必要がある場合は、DataflowBlock.SendAsync メソッドを使用します。

パイプラインのアクティビティの完了

パイプラインの先頭を完了済みとしてマークするには、次のコードを追加します。 パイプラインの先頭は、バッファ内のメッセージをすべて処理した後に自身の完了を伝達します。

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

この例では、処理するために 1 つの URL をデータフロー パイプラインを介して送信します。 パイプラインを介して複数の入力を送信する場合は、すべての入力を送信した後に IDataflowBlock.Complete メソッドを呼び出します。 アプリケーションに、適切に定義されたポイントがない (データが使用できなくなっているか、アプリケーションがパイプラインの終了を待つ必要がない) 場合は、この手順を省略できます。

パイプラインの終了までの待機

パイプラインの終了まで待機するには、次のコードを追加します。 全体的な操作が終了となるのは、パイプラインの末尾が終了したときです。

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

データフローの完了は、いずれかのスレッドから、または複数のスレッドから同時に待つことができます。

完全な例

次の例は、このチュートリアルのコード全体を示しています。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

次の手順

この例では、データフロー パイプラインを介して処理するために 1 つの URL を送信しています。 パイプラインを介して複数の入力値を送信する場合は、自動車工場で部品が通過する方法に似た並列処理の形式をアプリケーションに導入することができます。 パイプラインの最初のメンバーが結果を 2 番目のメンバーに送信する場合、最初のメンバーは、2 番目のメンバーが最初のメンバーの結果を処理するときに、並行して別のアイテムを処理できます。

データフロー パイプラインを使用して実現される並列処理は、粒度の粗い並列処理と呼ばれます。一般的に、少数の大きなタスクで構成されているためです。 粒度の細かい並列処理を使用する、つまりデータフロー パイプラインのタスクを小さく、実行時間を短くすることもできます。 この例では、パイプラインの findReversedWords メンバーが PLINQ を使用して作業リストの複数のアイテムを並列処理します。 粒度の粗いパイプラインで粒度の細かい並列処理を行うと、全体のスループットが向上します。

また、1 つのソース データフロー ブロックを複数のターゲット ブロックに接続してデータフロー ネットワークを作成することもできます。 オーバー ロードされたバージョンの LinkTo メソッドは、ターゲット ブロックがその値に基づいて各メッセージを受け入れるかどうかを定義する Predicate<T> オブジェクトを受け取ります。 ソースとして動作するほとんどのデータフロー ブロック型では、接続されたすべてのターゲット ブロックにメッセージを提供します。これは、いずれかのブロックがそのメッセージを受け入れるまで、ターゲット ブロックが接続された順序で行われます。 このフィルター機構を使用すると、特定のデータはあるパスを通り、その他のデータは別のパスを通るように仕向ける、接続されたデータフロー ブロックの体系を作成することができます。 フィルター処理を使用してデータフロー ネットワークを作成する例については、「チュートリアル:Windows フォーム アプリケーションでのデータフローの使用」を参照してください。

関連項目