다음을 통해 공유


안내: 데이터 플로우 파이프라인 만들기

DataflowBlock.Receive, DataflowBlock.ReceiveAsyncDataflowBlock.TryReceive 메서드를 사용하여 원본 블록에서 메시지를 받을 수 있지만 메시지 블록을 연결하여 데이터 흐름 파이프라인형성할 수도 있습니다. 데이터 흐름 파이프라인은 일련의 구성 요소이거나 데이터 흐름 블록을, 각각은 더 큰 목표에 기여하는 특정 작업을 수행합니다. 데이터 흐름 파이프라인의 모든 데이터 흐름 블록은 다른 데이터 흐름 블록에서 메시지를 받을 때 작업을 수행합니다. 이와 유사한 것은 자동차 제조를 위한 조립 라인입니다. 각 차량이 어셈블리 라인을 통과하면 한 스테이션이 프레임을 어셈블하고, 다음 스테이션은 엔진을 설치하는 등의 작업을 합니다. 어셈블리 라인을 사용하면 여러 차량을 동시에 조립할 수 있으므로 전체 차량을 한 번에 하나씩 조립하는 것보다 더 나은 처리량을 제공합니다.

이 문서에서는 웹 사이트에서 호머의 일리아드 책을 다운로드하고 텍스트를 검색하여 첫 번째 단어의 문자를 뒤집는 단어와 개별 단어를 일치시키는 데이터 흐름 파이프라인을 보여 줍니다. 이 문서에서 데이터 흐름 파이프라인의 형성은 다음 단계로 구성됩니다.

  1. 파이프라인에 참여하는 데이터 흐름 블록을 만듭니다.

  2. 각 데이터 흐름 블록을 파이프라인의 다음 블록에 연결합니다. 각 블록은 파이프라인에서 이전 블록의 출력을 입력으로 받습니다.

  3. 각 데이터 흐름 블록에 대해 이전 블록이 완료된 후 다음 블록을 완료된 상태로 설정하는 연속 작업을 만듭니다.

  4. 파이프라인의 헤더에 데이터를 보냅니다.

  5. 파이프라인의 시작 지점을 완료로 표시합니다.

  6. 파이프라인이 모든 작업을 완료할 때까지 기다립니다.

필수 조건

이 연습을 시작하기 전에 데이터 흐름 읽어 보세요.

콘솔 애플리케이션 만들기

Visual Studio에서 Visual C# 또는 Visual Basic 콘솔 애플리케이션 프로젝트를 만듭니다. System.Threading.Tasks.Dataflow NuGet 패키지를 설치합니다.

비고

TPL 데이터 흐름 라이브러리(System.Threading.Tasks.Dataflow 네임스페이스)는 .NET과 함께 배포되지 않습니다. Visual Studio에서 System.Threading.Tasks.Dataflow 네임스페이스를 설치하려면, 프로젝트를 열고 프로젝트 메뉴에서 NuGet 패키지 관리를 선택한 다음, System.Threading.Tasks.Dataflow 패키지를 온라인으로 검색합니다. 대안으로, .NET Core CLI 을 사용하여을 설치하려면, dotnet add package System.Threading.Tasks.Dataflow을 실행하십시오.

프로젝트에 다음 코드를 추가하여 기본 애플리케이션을 만듭니다.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

데이터 흐름 블록 만들기

Main 메서드에 다음 코드를 추가하여 파이프라인에 참여하는 데이터 흐름 블록을 만듭니다. 다음 표에는 파이프라인의 각 멤버의 역할이 요약됩니다.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine($"Downloading '{uri}'...");

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
회원 유형 설명
downloadString TransformBlock<TInput,TOutput> 웹에서 책 텍스트를 다운로드합니다.
createWordList TransformBlock<TInput,TOutput> 책 텍스트를 단어 배열로 구분합니다.
filterWordList TransformBlock<TInput,TOutput> 단어 배열에서 짧은 단어와 중복을 제거합니다.
findReversedWords TransformManyBlock<TInput,TOutput> 필터링된 단어 배열 컬렉션에서 역방향이 단어 배열에서도 발생하는 모든 단어를 찾습니다.
printReversedWords ActionBlock<TInput> 콘솔에 단어와 해당 역방향 단어를 표시합니다.

이 예제의 데이터 흐름 파이프라인에서 여러 단계를 한 단계로 결합할 수 있지만 이 예제에서는 더 큰 작업을 수행하기 위해 여러 개의 독립적인 데이터 흐름 작업을 구성하는 개념을 보여 줍니다. 이 예제에서는 TransformBlock<TInput,TOutput> 사용하여 파이프라인의 각 멤버가 입력 데이터에 대한 작업을 수행하고 파이프라인의 다음 단계로 결과를 보낼 수 있도록 합니다. 파이프라인의 findReversedWords 멤버는 각 입력에 대해 여러 개의 독립적인 출력을 생성하기 때문에 TransformManyBlock<TInput,TOutput> 개체입니다. 파이프라인의 tail(printReversedWords)은 입력에 대한 작업을 수행하고 결과를 생성하지 않으므로 ActionBlock<TInput> 개체입니다.

파이프라인 구성

다음 코드를 추가하여 각 블록을 파이프라인의 다음 블록에 연결합니다.

LinkTo 메서드를 호출하여 원본 데이터 흐름 블록을 대상 데이터 흐름 블록에 연결하면 원본 데이터 흐름 블록은 데이터를 사용할 수 있게 되면 대상 블록에 데이터를 전파합니다. DataflowLinkOptions을 true로 설정하여 PropagateCompletion를 제공하는 경우, 파이프라인에서 한 블록을 성공적으로 완료하거나 완료하지 못하더라도, 다음 블록이 완료됩니다.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

파이프라인에 데이터 게시

다음 코드를 추가하여 책 URL 호머의 일리아드을 데이터 흐름 파이프라인의 헤드에 게시하도록 합니다.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

이 예제에서는 DataflowBlock.Post 사용하여 파이프라인의 헤드에 데이터를 동기적으로 보냅니다. 데이터 흐름 노드에 데이터를 비동기적으로 보내야 하는 경우 DataflowBlock.SendAsync 메서드를 사용합니다.

파이프라인 작업 완료

다음 코드를 추가하여 파이프라인의 헤드를 완료됨으로 표시합니다. 파이프라인의 헤드는 버퍼링된 모든 메시지를 처리한 후 완료를 전파합니다.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

이 예제에서는 처리할 데이터 흐름 파이프라인을 통해 하나의 URL을 보냅니다. 파이프라인을 통해 둘 이상의 입력을 보내는 경우 모든 입력을 제출한 후 IDataflowBlock.Complete 메서드를 호출합니다. 애플리케이션에 데이터를 더 이상 사용할 수 없는 잘 정의된 지점이 없거나 애플리케이션이 파이프라인이 완료되기를 기다릴 필요가 없는 경우 이 단계를 생략할 수 있습니다.

파이프라인이 완료 될 때까지 대기 중

다음 코드를 추가하여 파이프라인이 완료되기를 기다립니다. 파이프라인의 끝 부분이 완료되면 전체 작업이 완료됩니다.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

스레드 또는 여러 스레드에서 동시에 데이터 흐름이 완료될 때까지 기다릴 수 있습니다.

전체 예제

다음 예제에서는 이 워크스루의 전체 코드를 보여 줍니다.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine($"Downloading '{uri}'...");

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

다음 단계

이 예제에서는 하나의 URL을 전송하여 데이터 흐름 파이프라인을 통해 처리합니다. 파이프라인을 통해 둘 이상의 입력 값을 보내는 경우 부품이 자동차 팩터리를 통해 이동하는 방식과 유사한 병렬 처리 형식을 애플리케이션에 도입할 수 있습니다. 파이프라인의 첫 번째 멤버가 결과를 두 번째 멤버로 보내면 두 번째 멤버가 첫 번째 결과를 처리할 때 다른 항목을 병렬로 처리할 수 있습니다.

데이터 흐름 파이프라인을 사용하여 달성되는 병렬 처리는 일반적으로 더 적은 수의 더 큰 작업으로 구성되기 때문에 거친 병렬 처리 알려져 있습니다. 또한 데이터 흐름 파이프라인에서 더 작고 짧은 실행 작업의 보다 세분화된 병렬 처리를 사용할 수 있습니다. 이 예제에서 파이프라인의 findReversedWords 멤버는 PLINQ 사용하여 작업 목록의 여러 항목을 병렬로 처리합니다. 거친 파이프라인에서 세분화된 병렬 처리를 사용하면 전체 처리량을 향상시킬 수 있습니다.

원본 데이터 흐름 블록을 여러 대상 블록에 연결하여 데이터 흐름 네트워크만들 수도 있습니다. 오버로드된 버전의 LinkTo 메서드는 대상 블록이 해당 값에 따라 각 메시지를 수락하는지 여부를 정의하는 Predicate<T> 개체를 사용합니다. 원본 역할을 하는 대부분의 데이터 흐름 블록 형식은 블록 중 하나가 해당 메시지를 수락할 때까지 연결된 모든 대상 블록에 메시지를 연결한 순서대로 제공합니다. 이 필터링 메커니즘을 사용하여 특정 데이터를 한 경로와 다른 경로를 통해 다른 데이터를 통과하는 연결된 데이터 흐름 블록의 시스템을 만들 수 있습니다. 필터링을 사용하여 데이터 흐름 네트워크를 만드는 예제에 대해서는 연습: Windows Forms 애플리케이션에서 데이터 흐름 사용을 참조하세요.

참고하십시오

  • 데이터 흐름