다음을 통해 공유


데이터 흐름(작업 병렬 라이브러리)

TPL(작업 병렬 라이브러리)은 동시성 사용 애플리케이션의 견고성을 높이는 데 도움이 되는 데이터 흐름 구성 요소를 제공합니다. 이러한 데이터 흐름 구성 요소를 통칭하여 TPL 데이터 흐름 라이브러리라고 합니다. 이 데이터 흐름 모델은 정교하지 않은 데이터 흐름 및 파이프라인 작업을 위해 in-process 메시지 전달을 제공하여 행위자 기반 프로그래밍을 촉진합니다. 데이터 흐름 구성 요소는 TPL의 형식 및 예약 인프라를 바탕으로 빌드되며 비동기 프로그래밍에 대한 C#, Visual Basic 및 F# 언어 지원과 통합됩니다. 이러한 데이터 흐름 구성 요소는 비동기적으로 서로 통신해야 하는 여러 작업이 있는 경우나 데이터를 사용할 수 있게 될 때 해당 데이터를 처리하려는 경우에 유용합니다. 예를 들어 웹 카메라에서 이미지 데이터를 처리하는 애플리케이션의 경우, 데이터 흐름 모델을 사용함으로써 애플리케이션은 이미지 프레임을 사용할 수 있게 될 때 해당 이미지 프레임을 처리할 수 있습니다. 애플리케이션이 명도를 보정하거나 적목 현상을 줄이는 등의 작업을 수행하여 이미지 프레임을 개선하는 경우 데이터 흐름 구성 요소의 파이프라인을 만들 수 있습니다. 파이프라인의 각 단계에서는 TPL이 제공하는 기능과 같은 좀더 정교하지 않은 병렬 처리 기능을 사용하여 이미지를 변환할 수도 있습니다.

이 문서에서는 TPL 데이터 흐름 라이브러리에 대한 개요를 제공합니다. 여기에서는 프로그래밍 모델, 미리 정의된 데이터 흐름 블록 형식 및 애플리케이션의 특정 요구 사항을 충족하도록 데이터 흐름 블록을 구성하는 방법을 설명합니다.

참고 항목

TPL 데이터 흐름 라이브러리(System.Threading.Tasks.Dataflow 네임스페이스)는 .NET과 함께 배포되지 않습니다. Visual Studio에서 System.Threading.Tasks.Dataflow 네임스페이스를 설치하려면 프로젝트를 열고, 프로젝트 메뉴에서 NuGet 패키지 관리를 선택한 후, System.Threading.Tasks.Dataflow 패키지를 온라인으로 검색합니다. 또는 .NET Core CLI를 사용하여 설치하려면 dotnet add package System.Threading.Tasks.Dataflow를 실행합니다.

프로그래밍 모델

TPL 데이터 흐름 라이브러리는 처리량이 많고 대기 시간이 짧으며 CPU 및 I/O를 많이 사용하는 애플리케이션의 메시지 전달 및 병렬화를 위한 기반을 제공합니다. 또한 데이터가 버퍼링되고 시스템에서 이동하는 방식을 명시적으로 제어할 수 있도록 합니다. 데이터 흐름 프로그래밍 모델을 보다 정확히 이해하려면 디스크에서 이미지를 비동기적으로 로드하고 해당 이미지의 합성을 만드는 애플리케이션을 고려해 보십시오. 기존의 프로그래밍 모델에서는 대개 콜백과 동기화 개체(예: 잠금)를 사용하여 작업을 조정하고 공유 데이터에 액세스해야 합니다. 데이터 흐름 프로그래밍 모델을 사용하면 디스크에서 이미지를 읽을 때 해당 이미지를 처리하는 데이터 흐름 개체를 만들 수 있습니다. 데이터 흐름 모델에서는 데이터를 사용할 수 있게 될 때 데이터가 처리되는 방법과 데이터 간의 종속성도 선언합니다. 런타임에서 데이터 간의 종속성을 관리하기 때문에 대개 공유 데이터에 대한 액세스를 동기화할 필요가 없습니다. 또한 런타임에서 데이터의 비동기 도착을 기준으로 작업을 예약하기 때문에 데이터 흐름은 내부 스레드를 효율적으로 관리하여 응답성과 처리량을 개선할 수 있습니다. 데이터 흐름 프로그래밍 모델을 사용하여 Windows Forms 애플리케이션에서 이미지 처리를 구현하는 예제는 연습: Windows Forms 애플리케이션에서 데이터 흐름 사용을 참조하세요.

소스 및 대상

TPL 데이터 흐름 라이브러리는 데이터를 버퍼링하고 처리하는 데이터 구조인 데이터 흐름 블록으로 구성되어 있습니다. TPL은 세 가지 데이터 흐름 블록인 소스 블록, 대상 블록전파자 블록을 정의합니다. 소스 블록은 데이터의 소스 역할을 하며 읽을 수 있습니다. 대상 블록은 데이터의 수신자 역할을 하며 쓸 수 있습니다. 전파자 블록은 소스 블록과 대상 블록 역할을 하며 읽고 쓸 수 있습니다. TPL은 소스를 나타내는 System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> 인터페이스, 대상을 나타내는 System.Threading.Tasks.Dataflow.ITargetBlock<TInput> 및 전파자를 나타내는 System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput>을 정의합니다. IPropagatorBlock<TInput,TOutput>ISourceBlock<TOutput>ITargetBlock<TInput> 모두에서 상속합니다.

TPL 데이터 흐름 라이브러리는 ISourceBlock<TOutput>, ITargetBlock<TInput>IPropagatorBlock<TInput,TOutput> 인터페이스를 구현하는 몇 가지 미리 정의된 데이터 흐름 블록 형식을 제공합니다. 이러한 데이터 흐름 블록 형식은 이 문서의 미리 정의된 데이터 흐름 블록 형식 섹션에 설명되어 있습니다.

블록 연결

데이터 흐름 블록의 선형 시퀀스인 파이프라인이나 데이터 흐름 블록의 그래프인 네트워크를 만들기 위해 데이터 흐름 블록을 연결할 수 있습니다. 파이프라인은 네트워크의 한 형태입니다. 파이프라인 또는 네트워크에서 소스는 데이터를 사용할 수 있게 되면 대상에 데이터를 비동기적으로 전파합니다. ISourceBlock<TOutput>.LinkTo 메서드는 소스 데이터 흐름 블록을 대상 블록에 연결합니다. 소스는 0개 이상의 대상에 연결될 수 있으며, 대상은 0개 이상의 소스에서 연결될 수 있습니다. 파이프라인 또는 네트워크에서 데이터 흐름 블록을 동시에 추가하거나 제거할 수 있습니다. 미리 정의된 데이터 흐름 블록 형식은 연결 및 연결 해제의 모든 스레드로부터의 안전성 측면을 처리합니다.

기본 파이프라인을 구성하도록 데이터 흐름 블록을 연결하는 예제는 연습: 데이터 흐름 파이프라인 만들기를 참조하세요. 더 복잡한 네트워크를 구성하도록 데이터 흐름 블록을 연결하는 예제는 연습: Windows Forms 애플리케이션에서 데이터 흐름 사용을 참조하세요. 소스가 대상에 메시지를 제공한 후 소스에서 대상의 연결을 해제하는 예제는 방법: 데이터 흐름 블록 링크 끊기를 참조하세요.

필터링

ISourceBlock<TOutput>.LinkTo 메서드를 호출하여 소스를 대상에 연결하는 경우 대상 블록이 메시지의 값에 따라 해당 메시지를 수락할지 아니면 거부할지를 결정하는 대리자를 제공할 수 있습니다. 이 필터링 메커니즘은 데이터 흐름 블록이 특정 값만 받도록 보장하는 유용한 방법입니다. 대부분의 미리 정의된 데이터 흐름 블록 형식의 경우 소스 블록이 여러 대상 블록에 연결되어 있으면 대상 블록이 메시지를 거부할 때 소스는 그 다음 대상에 해당 메시지를 제공합니다. 소스가 대상에 메시지를 제공하는 순서는 소스에 의해 정의되며 소스의 형식에 따라 달라질 수 있습니다. 대부분의 소스 블록 형식은 한 대상이 메시지를 수락할 후 해당 메시지의 제공을 중지합니다. 이 규칙의 한 가지 예외는 일부 대상이 메시지를 거부하는 경우에도 모든 대상에 각 메시지를 제공하는 BroadcastBlock<T> 클래스입니다. 필터링을 사용하여 특정 메시지만 처리하는 예제는 연습: Windows Forms 애플리케이션에서 데이터 흐름 사용을 참조하세요.

Important

미리 정의된 각 소스 데이터 흐름 블록 형식은 메시지가 수신된 순서대로 전파되도록 보장하기 때문에 각 메시지를 소스 블록에서 읽어온 후에야 소스 블록이 다음 메시지를 처리할 수 있습니다. 따라서 필터링을 사용하여 여러 대상을 소스에 연결하는 경우 적어도 하나의 대상 블록이 각 메시지를 받는지 확인해야 합니다. 이렇게 하지 않으면 애플리케이션에서 교착 상태가 발생할 수도 있습니다.

메시지 전달

데이터 흐름 프로그래밍 모델은 프로그램의 개별 구성 요소가 메시지를 전달하여 서로 통신하는 메시지 전달 개념과 관련됩니다. 애플리케이션 구성 요소 간에 메시지를 전파하는 한 가지 방법은 Post(동기) 및 SendAsync(비동기) 메서드를 호출하여 대상 데이터 흐름 블록에 메시지를 보내고 Receive, ReceiveAsyncTryReceive 메서드를 호출하여 원본 블록으로부터 메시지를 가져옵니다. 입력 데이터를 헤드 노드(대상 블록)에 보내고 출력 데이터를 파이프라인의 터미널 노드나 네트워크의 터미널 노드(하나 이상의 소스 블록)에서 받는 방법으로 이러한 메서드를 데이터 흐름 파이프라인 또는 네트워크와 결합할 수 있습니다. 또한 Choose 메서드를 사용하여 제공된 소스 중에서 사용 가능한 데이터가 있는 첫 번째 소스에서 읽고 해당 데이터에 대한 작업을 수행할 수도 있습니다.

소스 블록은 ITargetBlock<TInput>.OfferMessage 메서드를 호출하여 대상 블록에 데이터를 제공합니다. 대상 블록은 세 가지 방법 중 하나로 제공된 메시지에 응답합니다. 즉, 메시지를 수락하거나, 메시지를 거부하거나, 메시지를 연기할 수 있습니다. 대상이 메시지를 수락하면 OfferMessage 메서드가 Accepted를 반환하고, 대상이 메시지를 거부하면 OfferMessage 메서드가 Declined를 반환합니다. 대상이 소스에서 더 이상 메시지를 받지 않도록 요구하는 경우에는 OfferMessageDecliningPermanently를 반환합니다. 미리 정의된 소스 블록 형식은 이러한 반환 값이 수신된 후 연결된 대상에 메시지를 보내지 않으며 해당 대상의 연결을 자동으로 해제합니다.

대상 블록이 나중에 사용할 수 있도록 메시지를 연기하면 OfferMessage 메서드가 Postponed를 반환합니다. 메시지를 연기하는 대상 블록은 나중에 ISourceBlock<TOutput>.ReserveMessage 메서드를 호출하여 제공된 메시지를 예약하려고 시도할 수 있습니다. 이 시점에서 해당 메시지는 여전히 사용 가능하여 대상 블록이 사용할 수 있거나, 다른 대상이 사용하고 있는 상태입니다. 대상 블록은 나중에 메시지가 필요하거나 더 이상 필요하지 않은 경우 각각 ISourceBlock<TOutput>.ConsumeMessage 또는 ReleaseReservation 메서드를 호출합니다. 메시지 예약은 non-greedy 모드에서 작동하는 데이터 흐름 블록 형식에서 주로 사용됩니다. non-greedy 모드는 이 문서의 뒷부분에 설명되어 있습니다. 연기된 메시지를 예약하는 대신 대상 블록은 ISourceBlock<TOutput>.ConsumeMessage 메서드를 사용하여 연기된 메시지를 직접 사용하려고 시도할 수도 있습니다.

데이터 흐름 블록 완료

데이터 흐름 블록은 완료 개념도 지원합니다. 완료된 상태에 있는 데이터 흐름 블록은 더 이상 작업을 수행하지 않습니다. 각 데이터 흐름 블록에는 블록의 완료 상태를 나타내는 완료 작업이라고 하는 System.Threading.Tasks.Task 개체가 연결되어 있습니다. 완료 작업을 사용하여 Task 개체가 완료될 때까지 기다릴 수 있기 때문에 데이터 흐름 네트워크의 터미널 노드 중 하나 이상이 완료될 때까지 기다릴 수 있습니다. IDataflowBlock 인터페이스는 데이터 흐름 블록에 완료 요청을 알리는 Complete 메서드와 데이터 흐름 블록에 대한 완료 작업을 반환하는 Completion 속성을 정의합니다. ISourceBlock<TOutput>ITargetBlock<TInput> 모두 IDataflowBlock 인터페이스를 상속합니다.

데이터 흐름 블록이 오류 없이 완료되었는지, 하나 이상의 오류가 발생했는지, 아니면 취소되었는지를 확인하는 두 가지 방법이 있습니다. 첫 번째 방법은 try-catch(Visual Basic에서는 Try-Catch) 블록에서 완료 작업에 대해 Task.Wait 메서드를 호출하는 것입니다. 다음 예제에서는 입력 값이 0보다 작은 경우 ActionBlock<TInput>을 throw하는 ArgumentOutOfRangeException 개체를 만듭니다. 이 예제에서 완료 작업에 대해 AggregateException를 호출할 때 Wait이 throw됩니다. ArgumentOutOfRangeExceptionInnerExceptions 개체의 AggregateException 속성을 통해 액세스됩니다.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

이 예제에서는 예외가 예외 데이터 흐름 블록의 대리자에서 처리되지 않는 경우를 보여 줍니다. 이러한 블록의 본문에서 예외를 처리하는 것이 좋습니다. 그러나 이렇게 할 수 없는 경우 블록은 메시지가 취소된 것처럼 동작하고 들어오는 메시지를 처리하지 않습니다.

데이터 흐름 블록이 명시적으로 취소되는 경우 AggregateException 개체는 OperationCanceledException 속성에 InnerExceptions을 포함합니다. 데이터 흐름 취소에 대한 자세한 내용은 취소 사용 섹션을 참조하세요.

데이터 흐름 블록의 완료 상태를 확인하는 두 번째 방법은 완료 작업의 연속을 사용하거나, C# 및 Visual Basic의 비동기 언어 기능을 사용하여 완료 작업을 비동기적으로 기다리는 것입니다. Task.ContinueWith 메서드에 제공하는 대리자는 선행 작업을 나타내는 Task 개체를 사용합니다. Completion 속성의 경우 연속의 대리자는 완료 작업 자체를 사용합니다. 다음 예제는 ContinueWith 메서드를 사용하여 전반적인 데이터 흐름 작업의 상태를 출력하는 연속 작업을 만드는 점을 제외하고 이전 예제와 유사합니다.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.",
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

연속 작업의 본문에서 IsCanceled와 같은 속성을 사용하여 데이터 흐름 블록의 완료 상태에 대한 추가 정보를 확인할 수도 있습니다. 연속 작업과 연속 작업의 취소 및 오류 처리와 관련된 방법에 대한 자세한 내용은 연속 작업을 사용하여 작업 연결, 작업 취소예외 처리를 참조하세요.

미리 정의된 데이터 흐름 블록 형식

TPL 데이터 흐름 라이브러리는 몇 가지 미리 정의된 데이터 흐름 블록 형식을 제공합니다. 이러한 형식은 버퍼링 블록, 실행 블록그룹 블록이라는 세 가지 범주로 나뉩니다. 다음 단원에서는 이러한 범주를 구성하는 블록 형식에 대해 설명합니다.

버퍼링 블록

버퍼링 블록은 데이터 소비자가 사용할 데이터를 포함합니다. TPL 데이터 흐름 라이브러리는 세 가지 버퍼링 블록 형식인 System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>System.Threading.Tasks.Dataflow.WriteOnceBlock<T>을 제공합니다.

BufferBlock<T>

BufferBlock<T> 클래스는 일반적인 용도의 비동기 메시징 구조를 나타냅니다. 이 클래스는 여러 소스가 기록하거나 여러 대상이 읽을 수 있는 메시지의 FIFO(선입 선출) 큐를 저장합니다. 대상이 BufferBlock<T> 개체에서 메시지를 받으면 해당 메시지가 메시지 큐에서 제거됩니다. 따라서 BufferBlock<T> 개체에 대상이 여러 개 있을 수 있지만 한 대상만 각 메시지를 받습니다. BufferBlock<T> 클래스는 여러 메시지를 다른 구성 요소에 전달하려고 할 때 유용하고 해당 구성 요소는 각 메시지를 수신해야 합니다.

다음 기본 예제에서는 몇 가지 Int32 값을 BufferBlock<T> 개체에 게시한 다음 이 개체에서 해당 값을 다시 읽습니다.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

BufferBlock<T> 개체에 메시지를 쓰거나 이 개체에서 메시지를 읽는 방법을 보여주는 전체 예제는 방법: 데이터 흐름 블록에 메시지 쓰기 및 데이터 흐름 블록에서 메시지 읽기를 참조하세요.

BroadcastBlock<T>

BroadcastBlock<T> 클래스는 여러 메시지를 다른 구성 요소에 전달해야 하지만 해당 구성 요소에 최신 값만 필요한 경우에 유용합니다. 이 클래스는 여러 구성 요소에 메시지를 브로드캐스트하려고 할 때도 유용합니다.

다음 기본 예제에서는 Double 값을 BroadcastBlock<T> 개체에 게시한 다음 이 개체에서 해당 값을 여러 번 읽습니다. 값을 읽은 후 BroadcastBlock<T> 개체에서 제거하지 않기 때문에 동일한 값을 매번 사용할 수 있습니다.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

BroadcastBlock<T>을 사용하여 메시지를 여러 대상 블록에 브로드캐스트하는 방법을 보여주는 전체 예제는 방법: 데이터 흐름 블록의 작업 스케줄러 지정을 참조하세요.

WriteOnceBlock<T>

WriteOnceBlock<T> 클래스는 BroadcastBlock<T> 개체에 한 번만 쓸 수 있는 점을 제외하고 WriteOnceBlock<T> 클래스와 유사합니다. WriteOnceBlock<T> 개체가 생성 시가 아니라 값을 받은 후에 변경할 수 없게 되는 점을 제외하면 WriteOnceBlock<T>을 C# readonly(Visual Basic에서는 ReadOnly) 키워드와 유사한 것으로 간주할 수 있습니다. BroadcastBlock<T> 클래스처럼 대상이 WriteOnceBlock<T> 개체에서 메시지를 수신할 때 해당 메시지는 해당 개체에서 제거되지 않습니다. 따라서 여러 대상이 하나의 메시지 복사본을 수신합니다. WriteOnceBlock<T> 클래스는 여러 메시지 중 첫 메시지만 전파하려는 경우에 유용합니다.

다음 기본 예제에서는 여러 String 값을 WriteOnceBlock<T> 개체에 게시한 다음 이 개체에서 해당 값을 다시 읽습니다. WriteOnceBlock<T> 개체에 한 번만 쓸 수 있기 때문에 WriteOnceBlock<T> 개체는 메시지를 받은 후 이후 메시지를 삭제합니다.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

WriteOnceBlock<T>을 사용하여 완료되는 첫 번째 작업의 값을 받는 방법을 보여주는 전체 예제는 방법: 데이터 흐름 블록 링크 끊기를 참조하세요.

실행 블록

실행 블록은 수신된 데이터의 각 조각에 대해 사용자가 제공한 대리자를 호출합니다. TPL 데이터 흐름 라이브러리는 세 가지 실행 블록 형식인 ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>을 제공합니다.

ActionBlock<T>

ActionBlock<TInput> 클래스는 데이터를 받을 때 대리자를 호출하는 대상 블록입니다. ActionBlock<TInput> 개체를 데이터를 사용할 수 있게 될 때 비동기적으로 실행되는 대리자로 간주할 수 있습니다. ActionBlock<TInput> 개체에 제공하는 대리자는 Action<T> 형식이나 System.Func<TInput, Task> 형식일 수 있습니다. ActionBlock<TInput>과 함께 Action<T> 개체를 사용하는 경우 각 입력 요소의 처리는 대리자가 반환될 때 완료된 것으로 간주됩니다. ActionBlock<TInput>와 함께 System.Func<TInput, Task> 개체를 사용하는 경우 각 입력 요소의 처리는 반환된 Task 개체가 완료되는 경우에만 완료된 것으로 간주됩니다. 이 두 가지 메커니즘을 사용하여 각 입력 요소의 동기적 처리와 비동기적 처리 모두에 ActionBlock<TInput>을 사용할 수 있습니다.

다음 기본 예제에서는 여러 Int32 값을 ActionBlock<TInput> 개체에 게시합니다. ActionBlock<TInput> 개체는 해당 값을 콘솔에 출력합니다. 그런 다음 이 예제에서는 블록을 완료된 상태로 설정하고 모든 데이터 흐름 작업이 완료될 때까지 대기합니다.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

ActionBlock<TInput> 클래스와 함께 대리자를 사용하는 방법을 보여주는 전체 예제는 방법: 데이터 흐름 블록에서 데이터를 받을 경우 작업 수행을 참조하세요.

TransformBlock<TInput, TOutput>

TransformBlock<TInput,TOutput> 클래스는 소스와 대상 역할을 모두 수행하는 점을 제외하고 ActionBlock<TInput> 클래스와 유사합니다. TransformBlock<TInput,TOutput> 개체에 전달하는 대리자는 TOutput 형식의 값을 반환합니다. TransformBlock<TInput,TOutput> 개체에 제공하는 대리자는 System.Func<TInput, TOutput> 형식이나 System.Func<TInput, Task<TOutput>> 형식일 수 있습니다. TransformBlock<TInput,TOutput>와 함께 System.Func<TInput, TOutput> 개체를 사용하는 경우 각 입력 요소의 처리는 대리자가 반환될 때 완료된 것으로 간주됩니다. TransformBlock<TInput,TOutput>와 함께 사용된 System.Func<TInput, Task<TOutput>> 개체를 사용하는 경우 각 입력 요소의 처리는 반환된 Task<TResult> 개체가 완료되는 경우에만 완료된 것으로 간주됩니다. ActionBlock<TInput>과 마찬가지로, 이 두 가지 메커니즘을 사용함으로써 각 입력 요소의 동기적 처리와 비동기적 처리 모두에 TransformBlock<TInput,TOutput>을 사용할 수 있습니다.

다음 기본 예제에서는 입력의 제곱근을 계산하는 TransformBlock<TInput,TOutput> 개체를 만듭니다. TransformBlock<TInput,TOutput> 개체는 Int32 값을 입력으로 사용하고 Double 값을 출력으로 생성합니다.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Windows Forms 애플리케이션에서 이미지 처리를 수행하는 데이터 흐름 네트워크에서 TransformBlock<TInput,TOutput>을 사용하는 전체 예제는 연습: Windows Forms 애플리케이션에서 데이터 흐름 사용을 참조하세요.

TransformManyBlock<TInput, TOutput>

TransformManyBlock<TInput,TOutput> 클래스는 TransformBlock<TInput,TOutput>이 각 입력 값에 대해 하나의 출력 값이 아니라 각 입력 값에 대해 0개 이상의 출력 값을 생성하는 점을 제외하고 TransformManyBlock<TInput,TOutput> 클래스와 유사합니다. TransformManyBlock<TInput,TOutput> 개체에 제공하는 대리자는 System.Func<TInput, IEnumerable<TOutput>> 형식이나 System.Func<TInput, Task<IEnumerable<TOutput>>> 형식일 수 있습니다. TransformManyBlock<TInput,TOutput>와 함께 System.Func<TInput, IEnumerable<TOutput>> 개체를 사용하는 경우 각 입력 요소의 처리는 대리자가 반환될 때 완료된 것으로 간주됩니다. TransformManyBlock<TInput,TOutput>와 함께 System.Func<TInput, Task<IEnumerable<TOutput>>> 개체를 사용하는 경우 각 입력 요소의 처리는 반환된 System.Threading.Tasks.Task<IEnumerable<TOutput>> 개체가 완료되는 경우에만 완료된 것으로 간주됩니다.

다음 기본 예제에서는 문자열을 개별 문자 시퀀스로 분할하는 TransformManyBlock<TInput,TOutput> 개체를 만듭니다. TransformManyBlock<TInput,TOutput> 개체는 String 값을 입력으로 사용하고 Char 값을 출력으로 생성합니다.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

TransformManyBlock<TInput,TOutput>을 사용하여 데이터 흐름 파이프라인의 각 입력에 대한 여러 독립적 출력을 생성하는 전체 예제는 연습: 데이터 흐름 파이프라인 만들기를 참조하세요.

병렬 처리 수준

모든 ActionBlock<TInput>, TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> 개체는 블록이 입력 메시지를 처리할 준비가 될 때까지 해당 메시지를 버퍼링합니다. 기본적으로 이러한 클래스 받은 순서대로 한 번에 하나씩 메시지를 처리합니다. 또한 ActionBlock<TInput>, TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> 개체가 동시에 여러 메시지를 처리할 수 있도록 하기 위해 병렬 처리 수준을 지정할 수도 있습니다. 동시 실행에 대한 자세한 내용은 이 문서 뒷부분의 병렬 처리 수준 지정 단원을 참조하십시오. 실행 데이터 흐름 블록이 한 번에 두 개 이상의 메시지를 처리할 수 있도록 병렬 처리 수준을 설정하는 예제는 방법: 데이터 흐름 블록의 병렬 처리 수준 지정을 참조하세요.

대리자 형식 요약

다음 표에서는 ActionBlock<TInput>, TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> 개체에 제공할 수 있는 대리자 형식을 요약합니다. 또한 대리자 형식이 동기적으로 작동하는지, 아니면 비동기적으로 작동하는지를 지정합니다.

Type 동기 대리자 형식 비동기 대리자 형식
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

실행 블록 형식으로 작업할 때 람다 식을 사용할 수도 있습니다. 실행 블록과 함께 람다 식을 사용하는 방법을 보여 주는 예제는 방법: 데이터 흐름 블록에서 데이터를 받을 경우 작업 수행을 참조하세요.

그룹 블록

그룹 블록은 다양한 제약 조건에서 하나 이상의 소스로부터 데이터를 결합합니다. TPL 데이터 흐름 라이브러리는 세가지 조인 블록 형식인 BatchBlock<T>, JoinBlock<T1,T2>BatchedJoinBlock<T1,T2>을 제공합니다.

BatchBlock<T>

BatchBlock<T> 클래스는 배치라고 하는 입력 데이터 집합을 출력 데이터의 배열로 결합합니다. BatchBlock<T> 개체를 만들 때 각 배치의 크기를 지정합니다. BatchBlock<T> 개체는 지정된 수의 입력 요소를 받을 때 해당 요소가 포함된 배열을 비동기적으로 전파합니다. BatchBlock<T> 개체가 완료된 상태로 설정되었지만 배치를 형성하는 데 충분한 요소를 포함하지 않은 경우에는 나머지 입력 요소가 포함된 마지막 배열을 전파합니다.

BatchBlock<T> 클래스는 greedy 또는 non-greedy 모드로 작동합니다. 기본값인 greedy 모드에서는 BatchBlock<T> 개체가 제공되는 모든 메시지를 수락하고 지정된 수의 요소를 받은 후 배열을 전파합니다. non-greedy 모드에서는 BatchBlock<T> 개체가 충분한 소스가 배치를 형성하기 위한 메시지를 블록에 제공할 때까지 모든 들어오는 메시지를 연기합니다. 일반적으로 greedy 모드는 필요한 처리 오버헤드가 적기 때문에 non-greedy 모드보다 효율적으로 수행됩니다. 그러나 원자 방식으로 여러 소스에서 사용을 조정해야 하는 경우 non-greedy 모드를 사용할 수 있습니다. GreedyFalse 생성자의 dataflowBlockOptions 매개 변수에서 BatchBlock<T>로 설정하여 non-greedy 모드를 지정합니다.

다음 기본 예제에서는 몇 가지 Int32 값을 배치에 10개의 요소를 포함하는 BatchBlock<T> 개체에 게시합니다. BatchBlock<T>에서 모든 값이 전파되도록 보장하기 위해서 이 예제에서는 Complete 메서드를 호출합니다. Complete 메서드는 BatchBlock<T> 개체를 완료된 상태로 설정하므로 BatchBlock<T> 개체가 남아 있는 요소를 모두 마지막 배치로 전파합니다.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

BatchBlock<T>을 사용하여 데이터베이스 삽입 작업의 효율성을 높이는 전체 예제는 연습: BatchBlock 및 BatchedJoinBlock을 사용하여 효율성 향상을 참조하세요.

JoinBlock<T1, T2, ...>

JoinBlock<T1,T2>JoinBlock<T1,T2,T3> 클래스는 입력 요소를 수집하고 해당 요소가 포함된 System.Tuple<T1,T2> 또는 System.Tuple<T1,T2,T3> 개체를 전파합니다. JoinBlock<T1,T2>JoinBlock<T1,T2,T3> 클래스는 ITargetBlock<TInput>에서 상속하지 않습니다. 대신에 Target1을 구현하는 Target2, Target3ITargetBlock<TInput>을 제공합니다.

마찬가지로 BatchBlock<T>, JoinBlock<T1,T2>JoinBlock<T1,T2,T3>은 greedy 또는 non-greedy 모드에서 작동합니다. 기본값인 greedy 모드에서는 JoinBlock<T1,T2> 또는 JoinBlock<T1,T2,T3> 개체가 제공되는 모든 메시지를 수락하고 각 대상이 적어도 하나의 메시지를 받은 후 튜플을 전파합니다. non-greedy 모드에서는 JoinBlock<T1,T2> 또는 JoinBlock<T1,T2,T3> 개체가 모든 대상에 튜플을 만드는 데 필요한 데이터가 제공될 때까지 들어오는 모든 메시지를 연기합니다. 이 시점에서 블록은 소스로부터 모든 필요한 항목을 원자적으로 검색하는 2단계 커밋 프로토콜에 참여합니다. 이러한 연기를 통해 다른 엔터티가 그 동안 데이터를 사용할 수 있으므로 전반적인 시스템 작업이 진행될 수 있습니다.

다음 기본 예제에서는 JoinBlock<T1,T2,T3> 개체가 값을 계산하기 위해 여러 데이터를 필요로 하는 경우를 보여 줍니다. 이 예제에서는 산술 연산을 수행하기 위해 두 JoinBlock<T1,T2,T3> 값과 Int32 값을 필요로 하는 Char 개체를 만듭니다.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

non-greedy 모드에서 JoinBlock<T1,T2> 개체를 사용하여 리소스를 협조적으로 공유하는 전체 예제는 방법: JoinBlock을 사용하여 여러 소스에서 데이터 읽기를 참조하세요.

BatchedJoinBlock<T1, T2, ...>

BatchedJoinBlock<T1,T2>BatchedJoinBlock<T1,T2,T3> 클래스는 입력 요소의 배치를 수집하고 해당 요소가 포함된 System.Tuple(IList(T1), IList(T2)) 또는 System.Tuple(IList(T1), IList(T2), IList(T3)) 개체를 전파합니다. BatchedJoinBlock<T1,T2>BatchBlock<T>JoinBlock<T1,T2>이 결합된 것으로 간주할 수 있습니다. BatchedJoinBlock<T1,T2> 개체를 만들 때 각 배치의 크기를 지정합니다. BatchedJoinBlock<T1,T2>Target1을 구현하는 Target2ITargetBlock<TInput> 속성도 제공합니다. 모든 대상에서 지정된 수의 입력 요소를 받으면 BatchedJoinBlock<T1,T2> 개체는 해당 요소가 포함된 System.Tuple(IList(T1), IList(T2)) 개체를 비동기적으로 전파합니다.

다음 기본 예제에서는 결과, BatchedJoinBlock<T1,T2> 값 및 Int32 개체인 오류를 포함하는 Exception 개체를 만듭니다. 이 예제에서는 여러 작업을 수행하고 결과를 Target1 개체의 Target2 속성에 쓰고 오류를 BatchedJoinBlock<T1,T2> 속성에 씁니다. 성공한 작업의 수와 실패한 작업의 수를 미리 알 수 없기 때문에 IList<T> 개체는 각 대상이 0개 이상의 값을 받을 수 있도록 합니다.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

BatchedJoinBlock<T1,T2>을 사용하여 결과와 프로그램이 데이터베이스에서 읽는 동안 발생하는 모든 예외를 캡처하는 전체 예제는 연습: BatchBlock 및 BatchedJoinBlock을 사용하여 효율성 향상을 참조하세요.

데이터 흐름 블록 동작 구성

System.Threading.Tasks.Dataflow.DataflowBlockOptions 개체를 데이터 흐름 블록 형식의 생성자에 제공하여 추가 옵션을 사용하도록 설정할 수 있습니다. 이러한 옵션은 기본 작업과 병렬 처리 수준을 관리하는 스케줄러와 같은 동작을 제어합니다. DataflowBlockOptions에는 데이터 흐름 블록 형식과 관련된 동작을 지정하는 파생 형식도 있습니다. 다음 표에서는 각 데이터 흐름 블록 형식과 관련된 옵션 형식을 요약합니다.

데이터 흐름 블록 형식 DataflowBlockOptions 형식
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

다음 단원에서는 System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionsSystem.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions 클래스를 통해 사용 가능한 중요한 데이터 흐름 블록 옵션에 대한 추가 정보를 제공합니다.

작업 Scheduler 지정

모든 미리 정의된 데이터 흐름 블록은 TPL 작업 예약 메커니즘을 사용하여 대상에 데이터를 전파하고, 소스에서 데이터를 받고, 데이터를 사용할 수 있게 되면 사용자 정의 대리자를 실행하는 등의 작업을 수행합니다. TaskScheduler는 작업을 스레드의 큐에 대기시키는 작업 스케줄러를 나타내는 추상 클래스입니다. 기본 작업 스케줄러인 DefaultThreadPool 클래스를 사용하여 작업을 큐에 대기시키고 실행합니다. 데이터 흐름 블록 개체를 생성할 때 TaskScheduler 속성을 설정하여 기본 작업 스케줄러를 재정의할 수 있습니다.

동일한 작업 스케줄러가 여러 데이터 흐름 블록을 관리하는 경우 해당 블록 전체에 정책을 적용할 수 있습니다. 예들 들어 여러 데이터 흐름 블록이 동일한 ConcurrentExclusiveSchedulerPair 개체의 단독 스케줄러를 대상으로 하도록 각각 구성된 경우 해당 블록에서 실행되는 모든 작업이 serialize됩니다. 이와 마찬가지로 이러한 블록이 동일한 ConcurrentExclusiveSchedulerPair 개체의 동시 스케줄러를 대상으로 하도록 구성되어 있고 이 스케줄러가 최대 동시성 수준을 갖도록 구성된 경우 이러한 블록의 모든 작업은 해당 동시 작업 수로 제한됩니다. ConcurrentExclusiveSchedulerPair 클래스를 사용하여 읽기 작업은 병렬로 발생하지만 쓰기 작업은 다른 모든 작업과 독립적으로 발생하도록 설정하는 예제는 방법: 데이터 흐름 블록의 작업 스케줄러 지정을 참조하세요. TPL의 작업 스케줄러에 대한 자세한 내용은 TaskScheduler 클래스 항목을 참조하세요.

병렬 처리 수준 지정

기본적으로 TPL 데이터 흐름 라이브러리가 제공하는 세 가지 실행 블록 형식인 ActionBlock<TInput>, TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput>은 한 번에 하나의 메시지를 처리합니다. 또한 이러한 데이터 흐름 블록 형식은 수신되는 순서대로 메시지를 처리합니다. 이러한 데이터 흐름 블록이 메시지를 동시에 처리하도록 설정하려면 데이터 흐름 블록 개체를 생성할 때 ExecutionDataflowBlockOptions.MaxDegreeOfParallelism 속성을 설정합니다.

MaxDegreeOfParallelism의 기본값은 1이며, 이 경우 데이터 흐름 블록이 한 번에 하나의 메시지를 처리하도록 보장됩니다. 이 속성을 1보다 큰 값으로 설정하면 데이터 흐름 블록이 여러 메시지를 동시에 처리할 수 있습니다. 이 속성을 DataflowBlockOptions.Unbounded로 설정하면 내부 작업 스케줄러가 최대 동시성 수준을 관리할 수 있습니다.

Important

최대 병렬 처리 수준을 1보다 크게 지정하는 경우 여러 메시지가 동시에 처리되므로 메시지가 수신된 순서대로 처리되지 않을 수 있습니다. 그러나 메시지가 블록에서 출력되는 순서는 메시지가 수신되는 순서와 동일합니다.

MaxDegreeOfParallelism 속성이 최대 병렬 처리 수준을 나타내기 때문에 데이터 흐름 블록은 지정한 것보다 낮은 병렬 처리 수준으로 실행될 수 있습니다. 데이터 흐름 블록은 기능적 요구 사항을 충족하기 위해서나 사용 가능한 시스템 리소스가 부족하기 때문에 보다 낮은 병렬 처리 수준을 사용할 수도 있습니다. 데이터 흐름 블록은 지정한 것보다 많은 병렬 처리를 선택하지 않습니다.

MaxDegreeOfParallelism 속성의 값은 각 데이터 흐름 블록 개체에서 단독으로 사용됩니다. 예를 들어 4개의 데이터 흐름 블록이 각각 최대 병렬 처리 수준으로 1을 지정하는 경우 4개의 데이터 흐름 블록 개체는 모두 잠재적으로 병렬로 실행될 수 있습니다.

긴 작업이 병렬로 발생할 수 있도록 최대 병렬 처리 수준을 설정하는 예제는 방법: 데이터 흐름 블록의 병렬 처리 수준 지정을 참조하세요.

작업당 메시지 수 지정

미리 정의된 데이터 흐름 블록 형식은 작업을 사용하여 여러 입력 요소를 처리합니다. 이는 데이터를 처리하는 데 필요한 작업 개체의 수를 최소화하는 데 도움이 되므로 애플리케이션이 보다 효율적으로 실행될 수 있습니다. 그러나 하나의 데이터 흐름 블록 집합에 있는 작업이 데이터를 처리하고 있는 경우 다른 데이터 흐름 블록의 작업은 메시지를 큐에 대기시켜서 처리 시간 동안 기다려야 할 수 있습니다. 데이터 흐름 작업 간의 공정성을 높이려면 MaxMessagesPerTask 속성을 설정합니다. MaxMessagesPerTask가 기본값인 DataflowBlockOptions.Unbounded로 설정된 경우 데이터 흐름 블록에서 사용되는 작업은 사용할 수 있는 만큼의 메시지를 처리합니다. MaxMessagesPerTaskUnbounded 이외의 값으로 설정된 경우 데이터 흐름 블록은 Task 개체당 해당 메시지 수 이하를 처리합니다. MaxMessagesPerTask 속성을 설정하여 작업 간의 공정성을 높일 수 있지만, 시스템에서 필요한 것보다 많은 작업을 만들어 성능이 저하될 수 있는 문제가 발생할 수 있습니다.

취소 사용

TPL은 작업이 협조적 방식으로 취소를 조정할 수 있도록 하는 메커니즘을 제공합니다. 데이터 흐름 블록이 이 취소 메커니즘에 참여할 수 있도록 하려면 CancellationToken 속성을 설정합니다. 이 CancellationToken 개체가 취소된 상태로 설정된 경우 이 토큰을 모니터링하는 모든 데이터 흐름 블록은 동시 항목의 실행을 완료하지만 후속 항목의 처리를 시작하지 않습니다. 또한 이러한 데이터 흐름 블록은 버퍼링된 메시지를 모두 지우고, 모든 소스 및 대상 블록에 대한 연결을 해제하고, 취소된 상태로 전환합니다. 취소된 상태로 전환함으로써 Completion 속성의 Status 속성은 처리 중에 예외가 발생하지 않는 한 Canceled로 설정됩니다. 이 경우 StatusFaulted로 설정됩니다.

Windows Forms 애플리케이션에서 취소를 사용하는 방법을 보여 주는 예제는 방법: 데이터 흐름 블록 취소를 참조하세요. TPL의 취소에 대한 자세한 내용은 작업 취소를 참조하세요.

Greedy 및 Non-Greedy 동작 지정

몇 가지 그룹 데이터 흐름 블록 형식은 greedy 또는 non-greedy 모드로 작동할 수 있습니다. 기본적으로 미리 정의된 데이터 흐름 블록 형식은 greedy 모드로 작동합니다.

JoinBlock<T1,T2>과 같은 조인 블록 형식의 경우 greedy 모드는 조인할 데이터를 아직 사용할 수 없는 경우에도 블록이 해당 데이터를 즉시 수락함을 의미합니다. non-greedy 모드는 각 대상에서 조인을 완료하기 위해 메시지를 사용할 수 있을 때까지 블록이 들어오는 모든 메시지를 연기함을 의미합니다. 연기된 메시지 중에서 하나라도 더 이상 사용할 수 없는 경우 조인 블록은 연기된 메시지를 모두 해제하고 프로세스를 다시 시작합니다. BatchBlock<T> 클래스의 경우, non-greedy 모드에서 BatchBlock<T> 개체가 개별 소스들에서 배치를 완료하는 데 충분한 메시지를 사용할 수 있을 때까지 들어오는 메시지를 모두 연기하는 점을 제외하고 greedy 및 non-greedy 동작이 유사합니다.

데이터 흐름 블록에 대한 non-greedy 모드를 지정하려면 GreedyFalse로 설정합니다. non-greedy 모드를 사용하여 여러 조인 블록이 데이터 소스를 보다 효율적으로 공유할 수 있도록 하는 방법을 보여 주는 예제는 방법: JoinBlock을 사용하여 여러 소스에서 데이터 읽기를 참조하세요.

사용자 지정 데이터 흐름 블록

TPL 데이터 흐름 라이브러리가 미리 정의된 블록 형식을 다양하게 제공하지만 사용자 지정 동작을 수행하는 추가 블록 형식을 만들 수 있습니다. ISourceBlock<TOutput> 또는 ITargetBlock<TInput> 인터페이스를 직접 구현하거나 Encapsulate 메서드를 사용하여 기존 블록 형식의 동작을 캡슐화하는 복합 블록을 작성합니다. 사용자 지정 데이터 흐름 블록 기능을 구현하는 방법을 보여 주는 예제는 연습: 사용자 지정 데이터 흐름 블록 형식 만들기를 참조하세요.

제목 설명
방법: 데이터 흐름 블록에 메시지 쓰기 및 테이터 흐름 블록에서 메시지 읽기 BufferBlock<T> 개체에서 메시지를 쓰고 읽는 방법을 보여 줍니다.
방법: 공급자-소비자 데이터 흐름 패턴 구현 데이터 흐름 모델을 사용하여 생산자가 메시지를 데이터 흐름 블록에 보내고 소비자가 해당 블록에서 메시지를 읽는 생산자-소비자 패턴을 구현하는 방법을 설명합니다.
방법: 데이터 흐름 블록에서 데이터를 받을 경우 작업 수행 실행 데이터 흐름 블록 형식인 ActionBlock<TInput>, TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput>에 대리자를 제공하는 방법을 설명합니다.
연습: 데이터 흐름 파이프라인 만들기 웹에서 텍스트를 다운로드하고 해당 텍스트에 대한 작업을 수행하는 데이터 흐름 파이프라인을 만드는 방법을 설명합니다.
방법: 데이터 흐름 블록 링크 끊기 LinkTo 메서드를 사용하여 소스가 대상에 메시지를 제공한 후 소스에서 대상 블록의 연결을 해제하는 방법을 보여줍니다.
연습: Windows Forms 애플리케이션에서 데이터 흐름 사용 Windows Forms 애플리케이션에서 이미지 처리를 수행하는 데이터 흐름 블록의 네트워크를 만드는 방법을 보여 줍니다.
방법: 데이터 흐름 블록 취소 Windows Forms 애플리케이션에서 취소를 사용하는 방법을 보여 줍니다.
방법: JoinBlock을 사용하여 여러 소스에서 데이터 읽기 JoinBlock<T1,T2> 클래스를 사용하여 여러 소스에서 데이터를 사용할 수 있을 때 작업을 수행하는 방법과 non-greedy 모드를 사용하여 여러 조인 블록이 데이터 소스를 보다 효율적으로 공유할 수 있도록 하는 방법을 설명합니다.
방법: 데이터 흐름 블록의 병렬 처리 수준 지정 MaxDegreeOfParallelism 속성을 설정하여 실행 데이터 흐름 블록이 한 번에 둘 이상의 메시지를 처리할 수 있도록 하는 방법을 설명합니다.
방법: 데이터 흐름 블록의 작업 스케줄러 지정 애플리케이션에서 데이터 흐름을 사용하는 경우 특정 작업 스케줄러를 연결하는 방법을 보여 줍니다.
연습: BatchBlock 및 BatchedJoinBlock을 사용하여 효율성 향상 BatchBlock<T> 클래스를 사용하여 데이터베이스 삽입 작업의 효율성을 높이는 방법과 BatchedJoinBlock<T1,T2> 클래스를 사용하여 결과와 프로그램이 데이터베이스에서 읽는 동안 발생하는 모든 예외를 캡처하는 방법을 설명합니다.
연습: 사용자 지정 데이터 흐름 블록 형식 만들기 사용자 지정 동작을 구현하는 데이터 흐름 블록 형식을 만드는 두 가지 방법을 보여 줍니다.
TPL(작업 병렬 라이브러리) .NET Framework 애플리케이션에서 병렬 및 동시 프로그래밍을 단순화하는 라이브러리인 TPL을 소개합니다.