다음을 통해 공유


데이터 흐름(작업 병렬 라이브러리)

TPL(작업 병렬 라이브러리)은 동시성 지원 애플리케이션의 견고성을 높이는 데 도움이 되는 데이터 흐름 구성 요소를 제공합니다. 이러한 데이터 흐름 구성 요소를 통칭하여 TPL 데이터 흐름 라이브러리. 이 데이터 흐름 모델은 프로세스 내 메시지 전달을 통해, 코스그래인 데이터 흐름 및 파이프라이닝 작업을 위한 행위자 기반 프로그래밍을 촉진합니다. 데이터 흐름 구성 요소는 TPL의 형식 및 일정 인프라를 기반으로 하며 비동기 프로그래밍에 대한 C#, Visual Basic 및 F# 언어 지원과 통합됩니다. 이러한 데이터 흐름 구성 요소는 비동기적으로 서로 통신해야 하는 여러 작업이 있거나 데이터를 사용할 수 있게 되면 처리하려는 경우에 유용합니다. 예를 들어 웹 카메라에서 이미지 데이터를 처리하는 애플리케이션을 고려해 보세요. 애플리케이션은 데이터 흐름 모델을 사용하여 이미지 프레임이 준비되면 즉시 처리할 수 있습니다. 예를 들어 조명 보정 또는 적목 현상 축소를 수행하여 애플리케이션이 이미지 프레임을 향상시키는 경우 데이터 흐름 구성 요소의 파이프라인 만들 수 있습니다. 파이프라인의 각 단계에서는 TPL에서 제공하는 기능과 같은 더 거친 병렬 처리 기능을 사용하여 이미지를 변환할 수 있습니다.

이 문서에서는 TPL 데이터 흐름 라이브러리의 개요를 제공합니다. 프로그래밍 모델, 미리 정의된 데이터 흐름 블록 형식 및 애플리케이션의 특정 요구 사항을 충족하도록 데이터 흐름 블록을 구성하는 방법을 설명합니다.

비고

TPL 데이터 흐름 라이브러리(System.Threading.Tasks.Dataflow 네임스페이스)는 .NET과 함께 배포되지 않습니다. Visual Studio에서 System.Threading.Tasks.Dataflow 네임스페이스를 설치하려면, 프로젝트를 열고 프로젝트 메뉴에서 NuGet 패키지 관리를 선택한 다음, System.Threading.Tasks.Dataflow 패키지를 온라인으로 검색합니다. 대안으로, .NET Core CLI 을 사용하여을 설치하려면, dotnet add package System.Threading.Tasks.Dataflow을 실행하십시오.

프로그래밍 모델

TPL 데이터 흐름 라이브러리는 처리량이 높고 대기 시간이 짧은 CPU 집약적 애플리케이션 및 I/O 집약적 애플리케이션을 메시지 전달 및 병렬 처리에 대한 기초를 제공합니다. 또한 데이터를 버퍼링하고 시스템 주위로 이동하는 방법을 명시적으로 제어할 수 있습니다. 데이터 흐름 프로그래밍 모델을 더 잘 이해하려면 디스크에서 이미지를 비동기적으로 로드하고 해당 이미지의 복합을 만드는 애플리케이션을 고려합니다. 기존 프로그래밍 모델은 일반적으로 잠금과 같은 콜백 및 동기화 개체를 사용하여 작업을 조정하고 공유 데이터에 대한 액세스를 조정해야 합니다. 데이터 흐름 프로그래밍 모델을 사용하여 이미지를 디스크에서 읽을 때 처리하는 데이터 흐름 개체를 만들 수 있습니다. 데이터 흐름 모델에서 데이터를 사용할 수 있을 때 처리되는 방법과 데이터 간의 종속성도 선언합니다. 런타임은 데이터 간의 종속성을 관리하므로 공유 데이터에 대한 액세스를 동기화해야 하는 요구 사항을 피할 수 있습니다. 또한 런타임 일정은 데이터의 비동기 도착에 따라 작동하므로 데이터 흐름은 기본 스레드를 효율적으로 관리하여 응답성과 처리량을 향상시킬 수 있습니다. 데이터 흐름 프로그래밍 모델을 사용하여 Windows Forms 애플리케이션에서 이미지 처리를 구현하는 예제는 연습: Windows Forms 애플리케이션데이터 흐름 사용을 참조하세요.

출처 및 대상

TPL 데이터 흐름 라이브러리는 데이터를 버퍼링하고 처리하는 데이터 구조인 데이터 흐름 블록으로 구성됩니다. TPL은 소스 블록, 대상 블록전파자 블록세 가지 종류의 데이터 흐름 블록을 정의합니다. 원본 블록은 데이터의 원본 역할을 하며 읽을 수 있습니다. 대상 블록은 데이터의 수신자 역할을 하며 기록될 수 있습니다. 전파자 블록은 소스 블록과 대상 블록의 역할을 하며, 읽고 쓸 수 있습니다. TPL은 System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> 인터페이스를 원본을 나타내고, System.Threading.Tasks.Dataflow.ITargetBlock<TInput>은 대상을 나타내며, System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput>는 전파자를 나타내도록 정의합니다. IPropagatorBlock<TInput,TOutput> ISourceBlock<TOutput>ITargetBlock<TInput>둘 다에서 상속됩니다.

TPL 데이터 흐름 라이브러리는 ISourceBlock<TOutput>, ITargetBlock<TInput>IPropagatorBlock<TInput,TOutput> 인터페이스를 구현하는 몇 가지 미리 정의된 데이터 흐름 블록 형식을 제공합니다. 이러한 데이터 흐름 블록 형식은 미리 정의된 데이터 흐름 블록 형식 섹션에 설명되어 있습니다.

연결 블록

데이터 흐름 블록을 연결하여 데이터 흐름 블록의 선형 시퀀스인 파이프라인 또는 데이터 흐름 블록 그래프인 네트워크 만들 수 있습니다. 파이프라인은 한 가지 형태의 네트워크입니다. 파이프라인 또는 네트워크에서 원본은 데이터를 사용할 수 있게 되면 대상에 데이터를 비동기적으로 전파합니다. ISourceBlock<TOutput>.LinkTo 메서드는 원본 데이터 흐름 블록을 대상 블록에 연결합니다. 원본은 0개 이상의 대상에 연결할 수 있습니다. 대상은 0개 이상의 원본에서 연결할 수 있습니다. 파이프라인 또는 네트워크에서 동시에 데이터 흐름 블록을 추가하거나 제거할 수 있습니다. 미리 정의된 데이터 흐름 블록 형식은 연결 및 연결 해제의 모든 스레드 안전 측면을 처리합니다.

데이터 흐름 블록을 연결하여 기본 파이프라인을 형성하는 예제는 연습: 데이터 흐름 파이프라인만들기를 참조하세요. 더 복잡한 네트워크를 형성하기 위해 데이터 흐름 블록을 연결하는 예제는 연습: Windows Forms 애플리케이션데이터 흐름 사용을 참조하세요. 원본이 대상에 메시지를 제공하면 원본에서 대상의 연결을 해제하는 예제는 방법: 데이터 흐름 블록연결 해제를 참조하세요.

필터링

ISourceBlock<TOutput>.LinkTo 메서드를 호출하여 원본을 대상에 연결하는 경우 대상 블록이 해당 메시지의 값에 따라 메시지를 수락하거나 거부하는지 여부를 결정하는 대리자를 제공할 수 있습니다. 이 필터링 메커니즘은 데이터 흐름 블록이 특정 값만 받도록 보장하는 유용한 방법입니다. 대부분의 미리 정의된 데이터 흐름 블록 형식에서 원본 블록이 여러 대상 블록에 연결된 경우 대상 블록이 메시지를 거부할 때 원본은 해당 메시지를 다음 대상에 제공합니다. 원본이 대상에 메시지를 제공하는 순서는 원본에 의해 정의되며 원본 유형에 따라 달라질 수 있습니다. 대부분의 원본 블록 형식은 한 대상이 해당 메시지를 수락한 후 메시지 제공을 중지합니다. 이 규칙의 한 가지 예외는 일부 대상이 메시지를 거부하는 경우에도 모든 대상에 각 메시지를 제공하는 BroadcastBlock<T> 클래스입니다. 필터링을 사용하여 특정 메시지만 처리하는 예제는 연습: Windows Forms 애플리케이션데이터 흐름 사용을 참조하세요.

중요합니다

미리 정의된 각 원본 데이터 흐름 블록 유형은 메시지가 수신되는 순서대로 전파되도록 보장하므로 원본 블록에서 다음 메시지를 처리하려면 먼저 모든 메시지를 원본 블록에서 읽어야 합니다. 따라서 필터링을 사용하여 여러 대상을 원본에 연결하는 경우 하나 이상의 대상 블록이 각 메시지를 수신하는지 확인합니다. 그렇지 않으면 애플리케이션이 교착 상태에 빠질 수 있습니다.

메시지 전달

데이터 흐름 프로그래밍 모델은 전달하는메시지의 개념과 관련이 있습니다. 여기서 프로그램의 독립적인 구성 요소는 메시지를 전송하여 서로 통신합니다. 애플리케이션 구성 요소 간에 메시지를 전파하는 한 가지 방법은 Post(동기) 및 SendAsync(비동기) 메서드를 호출하여 대상 데이터 흐름 블록에 메시지를 보내고 Receive, ReceiveAsyncTryReceive 메서드를 호출하여 원본 블록에서 메시지를 수신하는 것입니다. 헤드 노드(대상 블록)로 입력 데이터를 보내고 파이프라인의 터미널 노드 또는 네트워크의 터미널 노드(하나 이상의 원본 블록)에서 출력 데이터를 수신하여 이러한 메서드를 데이터 흐름 파이프라인 또는 네트워크와 결합할 수 있습니다. Choose 메서드를 사용하여 사용 가능한 데이터가 있는 제공된 원본 중 첫 번째 원본에서 읽고 해당 데이터에 대한 작업을 수행할 수도 있습니다.

원본 블록은 ITargetBlock<TInput>.OfferMessage 메서드를 호출하여 대상 블록에 데이터를 제공합니다. 대상 블록은 메시지를 수락하거나, 메시지를 거절하거나, 메시지를 연기할 수 있는 세 가지 방법 중 하나로 제공된 메시지에 응답합니다. 대상이 메시지를 수락하면 OfferMessage 메서드는 Accepted반환합니다. 대상이 메시지를 거절하면 OfferMessage 메서드는 Declined반환합니다. 대상이 더 이상 원본에서 메시지를 수신하지 않도록 요구하는 경우 OfferMessageDecliningPermanently반환합니다. 미리 정의된 소스 블록 형식은 이러한 반환 값을 받은 후 연결된 대상에 메시지를 제공하지 않으며 이러한 대상에서 자동으로 연결을 해제합니다.

대상 블록이 나중에 사용하기 위해 메시지를 연기하면 OfferMessage 메서드는 Postponed반환합니다. 메시지를 연기하는 대상 블록은 나중에 ISourceBlock<TOutput>.ReserveMessage 메서드를 호출하여 제공된 메시지를 예약하려고 할 수 있습니다. 이 시점에서 메시지는 여전히 사용할 수 있으며 대상 블록에서 사용할 수 있거나 다른 대상이 메시지를 가져갔습니다. 대상 블록에 나중에 메시지가 필요하거나 더 이상 메시지가 필요하지 않은 경우 각각 ISourceBlock<TOutput>.ConsumeMessage 또는 ReleaseReservation 메서드를 호출합니다. 메시지 예약은 일반적으로 비욕심 모드에서 작동하는 데이터 흐름 블록 형식에서 사용됩니다. 비욕심 모드는 이 문서의 뒷부분에 설명되어 있습니다. 대상 블록은 연기된 메시지를 예약하는 대신 ISourceBlock<TOutput>.ConsumeMessage 메서드를 사용하여 연기된 메시지를 직접 사용할 수도 있습니다.

데이터 흐름 블록 완성

데이터 흐름 블록은 완료개념도 지원합니다. 완료된 상태의 데이터 흐름 블록은 더 이상 작업을 수행하지 않습니다. 각 데이터 흐름 블록에는 블록의 완료 상태를 나타내는 System.Threading.Tasks.Task알려진 연결된 개체가 있습니다. Task 개체가 완료될 때까지 기다릴 수 있으므로 완료 작업을 사용하여 데이터 흐름 네트워크의 하나 이상의 터미널 노드가 완료될 때까지 기다릴 수 있습니다. IDataflowBlock 인터페이스는 데이터 흐름 블록에 완료 요청을 알리는 Complete 메서드와, 데이터 흐름 블록의 완료 작업을 반환하는 Completion 속성을 정의합니다. ISourceBlock<TOutput>ITargetBlock<TInput> 모두 IDataflowBlock 인터페이스를 상속합니다.

데이터 흐름 블록이 오류 없이 완료되었는지, 하나 이상의 오류가 발생했는지 또는 취소되었는지 확인하는 두 가지 방법이 있습니다. 첫 번째 방법은 Task.Waittry- 블록(Visual Basic의catchTry-)에서 완료 태스크에서 Catch 메서드를 호출하는 것입니다. 다음 예제에서는 입력 값이 0보다 작은 경우 ActionBlock<TInput> throw하는 ArgumentOutOfRangeException 개체를 만듭니다. 이 예제에서는 완료 태스크에서 AggregateException을 호출할 때 Wait가 발생합니다. ArgumentOutOfRangeException InnerExceptions 개체의 AggregateException 속성을 통해 액세스됩니다.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

이 예제에서는 실행 데이터 흐름 블록의 대리자에서 예외가 처리되지 않는 경우를 보여 줍니다. 이러한 블록의 본문에서 예외를 처리하는 것이 좋습니다. 그러나 이렇게 할 수 없는 경우 블록은 취소된 것처럼 동작하며 들어오는 메시지를 처리하지 않습니다.

데이터 흐름 블록이 명시적으로 취소되면 AggregateException 개체는 OperationCanceledException 속성에 InnerExceptions 포함합니다. 데이터플로우 취소에 대한 자세한 내용은 취소 활성화 섹션을 참고하십시오.

데이터 흐름 블록의 완료 상태를 확인하는 두 번째 방법은 완료 작업의 연속을 사용하거나 C# 및 Visual Basic의 비동기 언어 기능을 사용하여 완료 작업을 비동기적으로 기다리는 것입니다. Task.ContinueWith 메서드에 제공하는 대리자는 선행 작업을 나타내는 Task 개체를 사용합니다. Completion 속성의 경우, 연속 작업을 위한 대리자는 완료 작업 자체를 전달받습니다. 다음 예제는 ContinueWith 메서드를 사용하여 전체 데이터 흐름 작업의 상태를 출력하는 연속 작업을 만드는 것을 제외하고 이전 예제와 유사합니다.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

연속 작업 본문의 IsCanceled 같은 속성을 사용하여 데이터 흐름 블록의 완료 상태에 대한 추가 정보를 확인할 수도 있습니다. 연속 작업 및 취소 및 오류 처리와 관련된 방법에 대한 자세한 내용은 연속 작업 , 작업 취소예외 처리사용하여 태스크를 연결하는참조하세요.

미리 정의된 데이터 흐름 블록 형식

TPL 데이터 흐름 라이브러리는 미리 정의된 여러 데이터 흐름 블록 형식을 제공합니다. 이러한 형식은 버퍼링 블록, 실행 블록그룹화 블록세 가지 범주로 나뉩니다. 다음 섹션에서는 이러한 범주를 구성하는 블록 형식에 대해 설명합니다.

버퍼링 블록

버퍼링 블록은 데이터 소비자가 사용할 데이터를 저장합니다. TPL 데이터 흐름 라이브러리는 System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>System.Threading.Tasks.Dataflow.WriteOnceBlock<T>세 가지 버퍼링 블록 형식을 제공합니다.

BufferBlock<T>

BufferBlock<T> 클래스는 범용 비동기 메시징 구조를 나타냅니다. 이 클래스는 여러 원본에서 쓰거나 여러 대상에서 읽을 수 있는 메시지의 FIFO(선입선출) 큐를 저장합니다. 대상이 BufferBlock<T> 개체에서 메시지를 받으면 해당 메시지는 메시지 큐에서 제거됩니다. 따라서 BufferBlock<T> 개체는 여러 대상을 가질 수 있지만, 각 메시지는 오직 하나의 대상만 수신하게 됩니다. BufferBlock<T> 클래스는 여러 메시지를 다른 구성 요소에 전달하려는 경우 유용하며 해당 구성 요소는 각 메시지를 받아야 합니다.

다음 기본 예제에서는 여러 Int32 값을 BufferBlock<T> 개체에 게시한 다음 해당 개체에서 해당 값을 다시 읽습니다.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

BufferBlock<T> 개체에 메시지를 쓰고 메시지를 읽는 방법을 보여 주는 전체 예제는 방법: 데이터 흐름 블록메시지 쓰기 및 메시지 읽기를 참조하세요.

BroadcastBlock<T>

BroadcastBlock<T> 클래스는 여러 메시지를 다른 구성 요소에 전달해야 하지만 해당 구성 요소에는 최신 값만 필요한 경우에 유용합니다. 이 클래스는 메시지를 여러 구성 요소에 브로드캐스트하려는 경우에도 유용합니다.

다음 기본 예제에서는 Double 값을 BroadcastBlock<T> 개체에 게시한 다음 해당 개체에서 해당 값을 여러 번 다시 읽습니다. 값을 읽은 후 BroadcastBlock<T> 개체에서 제거되지 않으므로 매번 동일한 값을 사용할 수 있습니다.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

BroadcastBlock<T> 사용하여 메시지를 여러 대상 블록으로 브로드캐스트하는 방법을 보여 주는 전체 예제는 방법: 데이터 흐름 블록작업 스케줄러 지정을 참조하세요.

WriteOnceBlock<T>

WriteOnceBlock<T> 클래스는 BroadcastBlock<T> 개체를 한 번만 쓸 수 있다는 점을 제외하고 WriteOnceBlock<T> 클래스와 유사합니다. WriteOnceBlock<T> C# 읽기 전용(Visual Basic에서는 ReadOnly) 키워드와 비슷하다고 생각할 수 있습니다. 단, WriteOnceBlock<T> 개체는 생성 시가 아닌 값을 받은 후에 변경할 수 없게 됩니다. BroadcastBlock<T> 클래스와 마찬가지로 대상이 WriteOnceBlock<T> 개체에서 메시지를 받으면 해당 메시지는 해당 개체에서 제거되지 않습니다. 따라서 여러 대상은 메시지의 복사본을 받습니다. WriteOnceBlock<T> 클래스는 여러 메시지 중 첫 번째 메시지만 전파하려는 경우에 유용합니다.

다음 기본 예제에서는 여러 String 값을 WriteOnceBlock<T> 개체에 게시한 다음 해당 개체에서 값을 다시 읽습니다. WriteOnceBlock<T> 개체는 한 번만 쓸 수 있으므로 WriteOnceBlock<T> 개체가 메시지를 받은 후에는 후속 메시지를 삭제합니다.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

WriteOnceBlock<T> 사용하여 완료되는 첫 번째 작업의 값을 받는 방법을 보여 주는 전체 예제는 방법: 데이터 흐름 블록연결 해제를 참조하세요.

실행 블록

실행 블록은 수신된 데이터의 각 부분에 대해 사용자가 제공한 대리자를 호출합니다. TPL 데이터 흐름 라이브러리는 ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>세 가지 실행 블록 형식을 제공합니다.

액션블록<T>

ActionBlock<TInput> 클래스는 데이터를 받을 때 대리자를 호출하는 대상 블록입니다. ActionBlock<TInput> 개체는 데이터를 사용할 수 있게 되면 비동기적으로 실행되는 대리자로 간주합니다. ActionBlock<TInput> 개체에 제공하는 대리자는 Action<T> 형식이거나 System.Func<TInput, Task>형식일 수 있습니다. ActionBlock<TInput> 개체를 Action<T>과 함께 사용할 때, 대리자가 반환되면 각 입력 요소의 처리가 완료된 것으로 간주됩니다. ActionBlock<TInput> System.Func<TInput, Task> 개체를 사용하는 경우 반환된 Task 개체가 완료된 경우에만 각 입력 요소의 처리가 완료된 것으로 간주됩니다. 이러한 두 메커니즘을 사용하면 각 입력 요소의 동기 및 비동기 처리 모두에 ActionBlock<TInput> 사용할 수 있습니다.

다음 기본 예제에서는 여러 Int32 값을 ActionBlock<TInput> 개체에 게시합니다. ActionBlock<TInput> 개체는 해당 값을 콘솔에 출력합니다. 이 예제에서는 블록을 완료된 상태로 설정하고 모든 데이터 흐름 작업이 완료될 때까지 기다립니다.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

ActionBlock<TInput> 클래스에서 대리자를 사용하는 방법을 보여 주는 전체 예제는 방법: 데이터 흐름 블록이 데이터수신할 때 작업 수행을 참조하세요.

TransformBlock<TInput, TOutput>

TransformBlock<TInput,TOutput> 클래스는 원본 및 대상으로 작동한다는 점을 제외하고 ActionBlock<TInput> 클래스와 유사합니다. TransformBlock<TInput,TOutput> 개체에 전달하는 대리자는 TOutput형식의 값을 반환합니다. TransformBlock<TInput,TOutput> 개체에 제공하는 대리자는 System.Func<TInput, TOutput> 형식이거나 System.Func<TInput, Task<TOutput>>형식일 수 있습니다. TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> 개체를 사용할 때, 대리자가 반환하면 각 입력 요소의 처리가 완료된 것으로 간주됩니다. TransformBlock<TInput,TOutput>사용하는 System.Func<TInput, Task<TOutput>> 개체를 사용하는 경우 반환된 Task<TResult> 개체가 완료된 경우에만 각 입력 요소의 처리가 완료된 것으로 간주됩니다. ActionBlock<TInput>마찬가지로 이러한 두 메커니즘을 사용하여 각 입력 요소의 동기 및 비동기 처리에 TransformBlock<TInput,TOutput> 사용할 수 있습니다.

다음 기본 예제에서는 입력의 제곱근을 계산하는 TransformBlock<TInput,TOutput> 개체를 만듭니다. TransformBlock<TInput,TOutput> 개체는 Int32 값을 입력으로 사용하고 Double 값을 출력으로 생성합니다.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Windows Forms 애플리케이션에서 이미지 처리를 수행하는 데이터 흐름 블록 네트워크에서 TransformBlock<TInput,TOutput> 사용하는 전체 예제는 연습: Windows Forms 애플리케이션데이터 흐름 사용을 참조하세요.

TransformManyBlock<TInput, TOutput>

TransformManyBlock<TInput,TOutput> 클래스는 TransformBlock<TInput,TOutput> 각 입력 값에 대해 하나의 출력 값이 아니라 각 입력 값에 대해 0개 이상의 출력 값을 생성한다는 점을 제외하고 TransformManyBlock<TInput,TOutput> 클래스와 유사합니다. TransformManyBlock<TInput,TOutput> 개체에 제공하는 대리자는 System.Func<TInput, IEnumerable<TOutput>> 형식이거나 System.Func<TInput, Task<IEnumerable<TOutput>>>형식일 수 있습니다. TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> 개체를 사용할 때, 대리자가 반환하면 각 입력 요소의 처리가 완료된 것으로 간주됩니다. TransformManyBlock<TInput,TOutput> System.Func<TInput, Task<IEnumerable<TOutput>>> 개체를 사용하는 경우 반환된 System.Threading.Tasks.Task<IEnumerable<TOutput>> 개체가 완료된 경우에만 각 입력 요소의 처리가 완료된 것으로 간주됩니다.

다음 기본 예제에서는 문자열을 개별 문자 시퀀스로 분할하는 TransformManyBlock<TInput,TOutput> 개체를 만듭니다. TransformManyBlock<TInput,TOutput> 개체는 String 값을 입력으로 사용하고 Char 값을 출력으로 생성합니다.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

TransformManyBlock<TInput,TOutput> 사용하여 데이터 흐름 파이프라인의 각 입력에 대해 여러 독립 출력을 생성하는 전체 예제는 연습: 데이터 흐름 파이프라인만들기를 참조하세요.

병렬 처리 수준

모든 ActionBlock<TInput>, TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> 개체는 블록에서 처리할 준비가 될 때까지 입력 메시지를 버퍼링합니다. 기본적으로 이러한 클래스는 메시지를 받은 순서대로 한 번에 하나의 메시지로 처리합니다. ActionBlock<TInput>, TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> 개체가 여러 메시지를 동시에 처리할 수 있도록 병렬 처리 수준을 지정할 수도 있습니다. 동시 실행에 대한 자세한 내용은 이 문서의 뒷부분에 있는 병렬 처리 수준 지정 섹션을 참조하세요. 실행 데이터 흐름 블록이 한 번에 둘 이상의 메시지를 처리할 수 있도록 병렬 처리 수준을 설정하는 예제는 방법: 데이터 흐름 블록병렬 처리 수준 지정을 참조하세요.

대리자 형식 요약

다음 표에는 ActionBlock<TInput>, TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> 개체에 제공할 수 있는 대리자 형식이 요약되어 있습니다. 이 표에서는 대리자 형식이 동기적으로 작동하는지 비동기적으로 작동하는지 여부도 지정합니다.

유형 동기 대리자 형식 비동기 대리자 형식
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

실행 블록 형식으로 작업할 때 람다 식을 사용할 수도 있습니다. 실행 블록에서 람다 식을 사용하는 방법을 보여 주는 예제는 방법: 데이터 흐름 블록이 데이터수신할 때 작업 수행을 참조하세요.

블록 그룹화

그룹화 블록은 하나 이상의 원본 및 다양한 제약 조건 하에서 데이터를 결합합니다. TPL 데이터 흐름 라이브러리는 BatchBlock<T>, JoinBlock<T1,T2>BatchedJoinBlock<T1,T2>세 가지 조인 블록 형식을 제공합니다.

배치블록<T>

BatchBlock<T> 클래스는 일괄 처리라고 하는 입력 데이터 집합을 출력 데이터 배열로 결합합니다. BatchBlock<T> 개체를 만들 때 각 일괄 처리의 크기를 지정합니다. BatchBlock<T> 개체가 지정된 입력 요소 수를 받으면 해당 요소가 포함된 배열을 비동기적으로 전파합니다. BatchBlock<T> 개체가 완료된 상태로 설정되어 있지만 일괄 처리를 구성하기에 충분한 요소가 없는 경우 나머지 입력 요소가 포함된 최종 배열을 전파합니다.

BatchBlock<T> 클래스는 욕심 또는 비욕심 모드에서 작동합니다. 기본값인 greedy 모드에서 BatchBlock<T> 개체는 제공된 모든 메시지를 수락하고 지정된 요소 수를 받은 후 배열을 전파합니다. 비욕심 모드에서 BatchBlock<T> 개체는 충분한 소스가 일괄 처리를 형성하기 위해 블록에 메시지를 제공 할 때까지 들어오는 모든 메시지를 연기합니다. Greedy 모드는 처리 오버헤드가 적기 때문에 일반적으로 비욕심 모드보다 성능이 좋습니다. 그러나 원자적으로 여러 소스에서 소비를 조정해야 하는 경우 비탐욕적 모드를 사용할 수 있습니다. Greedy 생성자의 False 매개 변수에서 dataflowBlockOptions을(를) BatchBlock<T>으로 설정하여 비탐욕적 모드를 지정합니다.

다음 기본 예제에서는 일괄 처리에 10개 요소를 포함하는 Int32 개체에 여러 BatchBlock<T> 값을 게시합니다. 모든 값이 BatchBlock<T>로 전파되도록 이 예제에서는 Complete 메서드를 호출합니다. Complete 메서드는 BatchBlock<T> 개체를 완료된 상태로 설정하므로 BatchBlock<T> 개체는 나머지 요소를 최종 일괄 처리로 전파합니다.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");

Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

BatchBlock<T>를 사용하여 데이터베이스 삽입 작업의 효율성을 개선하는 완전한 예제를 보려면 안내: BatchBlock 및 BatchedJoinBlock을 사용하여 효율성을 개선하는 방법를 참조하세요.

JoinBlock<T1, T2, ...>

JoinBlock<T1,T2>JoinBlock<T1,T2,T3> 클래스는 입력 요소를 수집하고 해당 요소를 포함하는 System.Tuple<T1,T2> 또는 System.Tuple<T1,T2,T3> 개체를 전파합니다. JoinBlock<T1,T2>JoinBlock<T1,T2,T3> 클래스는 ITargetBlock<TInput>상속되지 않습니다. 대신 Target1구현하는 속성, Target2, Target3ITargetBlock<TInput>제공합니다.

BatchBlock<T>마찬가지로 JoinBlock<T1,T2>JoinBlock<T1,T2,T3> 욕심이나 비욕심 모드에서 작동합니다. 기본값인 greedy 모드에서 JoinBlock<T1,T2> 또는 JoinBlock<T1,T2,T3> 개체는 제공되는 모든 메시지를 수락하고 각 대상이 하나 이상의 메시지를 수신한 후 튜플을 전파합니다. 비욕심 모드에서 JoinBlock<T1,T2> 또는 JoinBlock<T1,T2,T3> 개체는 모든 대상에 튜플을 만드는 데 필요한 데이터가 제공될 때까지 들어오는 모든 메시지를 연기합니다. 이 시점에서 블록은 소스에서 필요한 모든 항목을 원자성으로 검색하기 위해 2단계 커밋 프로토콜을 사용합니다. 이 연기가 있으면, 다른 엔터티가 그동안 데이터를 처리하여 전체 시스템이 앞으로 진전할 수 있도록 합니다.

다음 기본 예제에서는 값을 계산하기 위해 JoinBlock<T1,T2,T3> 개체에 여러 데이터가 필요한 경우를 보여 줍니다. 이 예제에서는 산술 연산을 수행하기 위해 두 개의 JoinBlock<T1,T2,T3> 값과 Int32 값이 필요한 Char 개체를 만듭니다.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
         break;
      case '-':
         Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
         break;
      default:
         Console.WriteLine($"Unknown operator '{data.Item3}'.");
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

비욕심 모드의 JoinBlock<T1,T2> 개체를 사용하여 리소스를 협조적으로 공유하는 전체 예제는 방법: JoinBlock을 사용하여 여러 원본에서 데이터 읽기참조하세요.

BatchedJoinBlock<T1, T2, ...>

BatchedJoinBlock<T1,T2>BatchedJoinBlock<T1,T2,T3> 클래스는 입력 요소의 일괄 처리를 수집하고 해당 요소를 포함하는 System.Tuple(IList(T1), IList(T2)) 또는 System.Tuple(IList(T1), IList(T2), IList(T3)) 개체를 전파합니다. BatchedJoinBlock<T1,T2>BatchBlock<T>JoinBlock<T1,T2>의 조합이라고 생각하세요. BatchedJoinBlock<T1,T2> 개체를 만들 때 각 일괄 처리의 크기를 지정합니다. BatchedJoinBlock<T1,T2> 또한 Target1구현하는 속성, Target2ITargetBlock<TInput>제공합니다. 모든 대상에서 지정된 입력 요소 수를 받으면 BatchedJoinBlock<T1,T2> 개체가 해당 요소가 포함된 System.Tuple(IList(T1), IList(T2)) 개체를 비동기적으로 전파합니다.

다음 기본 예제에서는 결과, BatchedJoinBlock<T1,T2> 값 및 Int32 개체인 오류를 보유하는 Exception 개체를 만듭니다. 이 예제에서는 여러 작업을 수행하고 결과를 Target1 속성에 쓰고 Target2 개체의 BatchedJoinBlock<T1,T2> 속성에 오류를 씁니다. 성공 및 실패한 작업의 수를 미리 알 수 없어서 IList<T> 개체를 사용하면 각 대상이 0개 이상의 값을 받을 수 있습니다.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

BatchedJoinBlock<T1,T2> 사용하여 프로그램이 데이터베이스에서 읽는 동안 발생하는 모든 예외와 결과를 모두 캡처하는 전체 예제는 연습: BatchBlock 및 BatchedJoinBlock을 사용하여 효율성향상을 참조하세요.

데이터 흐름 블록 동작 구성

데이터 흐름 블록 형식의 생성자에 System.Threading.Tasks.Dataflow.DataflowBlockOptions 개체를 제공하여 추가 옵션을 사용하도록 설정할 수 있습니다. 이러한 옵션은 기본 작업 및 병렬 처리 수준을 관리하는 스케줄러와 같은 동작을 제어합니다. DataflowBlockOptions 특정 데이터 흐름 블록 형식과 관련된 동작을 지정하는 파생 형식도 있습니다. 다음 표에는 각 데이터 흐름 블록 형식과 연결된 옵션 유형이 요약되어 있습니다.

데이터 흐름 블록 형식 DataflowBlockOptions 형식
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

다음 섹션에서는 System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionsSystem.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions 클래스를 통해 사용할 수 있는 중요한 종류의 데이터 흐름 블록 옵션에 대한 추가 정보를 제공합니다.

작업 스케줄러 지정

미리 정의된 모든 데이터 흐름 블록은 TPL 작업 예약 메커니즘을 사용하여 데이터를 대상으로 전파하고, 원본에서 데이터를 수신하고, 데이터를 사용할 수 있게 되면 사용자 정의 대리자를 실행하는 등의 작업을 수행합니다. TaskScheduler는 작업을 스레드에 할당하는 작업 스케줄러를 나타내는 추상 클래스입니다. Default기본 작업 스케줄러는 ThreadPool 클래스를 사용하여 작업을 큐에 대기하고 실행합니다. 데이터 흐름 블록 개체를 생성할 때 TaskScheduler 속성을 설정하여 기본 작업 스케줄러를 재정의할 수 있습니다.

동일한 작업 스케줄러가 여러 데이터 흐름 블록을 관리하는 경우 여러 데이터 흐름 블록을 통해 정책을 적용할 수 있습니다. 예를 들어 여러 데이터 흐름 블록이 각각 동일한 ConcurrentExclusiveSchedulerPair 개체의 배타적 스케줄러를 대상으로 하도록 구성된 경우 이러한 블록에서 실행되는 모든 작업이 직렬화됩니다. 마찬가지로 이러한 블록이 동일한 ConcurrentExclusiveSchedulerPair 개체의 동시 스케줄러를 대상으로 하도록 구성되고 해당 스케줄러가 최대 동시성 수준을 가지도록 구성된 경우 이러한 블록의 모든 작업은 해당 동시 작업 수로 제한됩니다. ConcurrentExclusiveSchedulerPair 클래스를 사용하여 읽기 작업이 병렬로 수행되도록 설정하지만 다른 모든 작업에서만 쓰기 작업을 수행하는 예제는 방법: 데이터 흐름 블록작업 스케줄러 지정을 참조하세요. TPL의 작업 스케줄러에 대한 자세한 내용은 TaskScheduler 클래스 항목을 참조하세요.

병렬 처리 수준 지정

기본적으로 TPL 데이터 흐름 라이브러리에서 제공하는 세 가지 실행 블록 형식은 ActionBlock<TInput>, TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput>한 번에 하나의 메시지를 처리합니다. 이러한 데이터 흐름 블록 형식은 메시지를 받은 순서대로 처리합니다. 이러한 데이터 흐름 블록이 메시지를 동시에 처리할 수 있도록 하려면 데이터 흐름 블록 개체를 생성할 때 ExecutionDataflowBlockOptions.MaxDegreeOfParallelism 속성을 설정합니다.

MaxDegreeOfParallelism 기본값은 1로, 데이터 흐름 블록이 한 번에 하나의 메시지를 처리합니다. 이 속성을 1보다 큰 값으로 설정하면 데이터 흐름 블록에서 여러 메시지를 동시에 처리할 수 있습니다. 이 속성을 DataflowBlockOptions.Unbounded 설정하면 기본 작업 스케줄러가 최대 동시성 수준을 관리할 수 있습니다.

중요합니다

최대 병렬 처리 수준을 1보다 크게 지정하면 여러 메시지가 동시에 처리되므로 메시지가 수신되는 순서대로 처리되지 않을 수 있습니다. 그러나 메시지가 블록에서 출력되는 순서는 메시지가 수신되는 순서와 동일합니다.

MaxDegreeOfParallelism 속성은 최대 병렬 처리 수준을 나타내므로 데이터 흐름 블록은 지정한 것보다 적은 병렬 처리로 실행될 수 있습니다. 데이터 흐름 블록은 기능 요구 사항을 충족하거나 사용 가능한 시스템 리소스가 부족하기 때문에 병렬 처리 수준을 더 적게 사용할 수 있습니다. 데이터 흐름 블록은 지정한 것보다 더 많은 병렬 처리를 선택하지 않습니다.

MaxDegreeOfParallelism 속성 값은 각 데이터 흐름 블록 개체에만 적용됩니다. 예를 들어 4개의 데이터 흐름 블록 개체가 각각 최대 병렬 처리 수준을 위해 1을 지정하는 경우 4개의 데이터 흐름 블록 개체가 모두 병렬로 실행될 수 있습니다.

긴 작업이 병렬로 수행되도록 최대 병렬 처리 수준을 설정하는 예제는 방법: 데이터 흐름 블록병렬 처리 수준 지정을 참조하세요.

작업당 메시지 수 지정

미리 정의된 데이터 흐름 블록 형식은 태스크를 사용하여 여러 입력 요소를 처리합니다. 이렇게 하면 데이터를 처리하는 데 필요한 작업 개체 수를 최소화하여 애플리케이션을 보다 효율적으로 실행할 수 있습니다. 그러나 한 데이터 흐름 블록 집합의 태스크가 데이터를 처리하는 경우 다른 데이터 흐름 블록의 태스크는 메시지를 큐에 대기하여 처리 시간을 기다려야 할 수 있습니다. 데이터 흐름 작업 간의 공정성을 향상하려면 MaxMessagesPerTask 속성을 설정합니다. MaxMessagesPerTask 기본값인 DataflowBlockOptions.Unbounded설정되면 데이터 흐름 블록에서 사용하는 작업은 사용 가능한 만큼의 메시지를 처리합니다. MaxMessagesPerTask Unbounded이외의 값으로 설정되면 데이터 흐름 블록은 Task 개체당 최대 이 메시지 수를 처리합니다. MaxMessagesPerTask 속성을 설정하면 작업 간의 공정성이 높아질 수 있지만 시스템에서 필요한 것보다 더 많은 작업을 만들 수 있으므로 성능이 저하될 수 있습니다.

취소 활성화

TPL은 태스크가 협조적인 방식으로 취소를 조정할 수 있는 메커니즘을 제공합니다. 데이터 흐름 블록이 이 취소 메커니즘에 참여할 수 있도록 하려면 CancellationToken 속성을 설정합니다. 이 CancellationToken 개체가 취소된 상태로 설정되면 이 토큰을 모니터링하는 모든 데이터 흐름이 현재 항목의 실행을 완료하지만 후속 항목 처리를 시작하지는 않습니다. 또한 이러한 데이터 흐름 블록은 버퍼링된 메시지를 지우고, 원본 및 대상 블록에 대한 연결을 해제하고, 취소된 상태로 전환합니다. 취소된 상태로 전환하면 처리 중에 예외가 발생하지 않는 한 Completion 속성에 Status 속성이 Canceled설정됩니다. 이 경우 StatusFaulted로 설정됩니다.

Windows Forms 애플리케이션에서 취소를 사용하는 방법을 보여 주는 예제는 방법: 데이터 흐름 블록취소를 참조하세요. TPL의 취소에 대한 자세한 내용은 작업 취소참조하세요.

욕심 있는 동작과 욕심 없는 동작 지정

여러 그룹화 데이터 흐름 블록 형식은 greedy 또는 비욕심 모드에서 작동할 수 있습니다. 기본적으로 미리 정의된 데이터 흐름 블록 형식은 greedy 모드에서 작동합니다.

JoinBlock<T1,T2>같은 조인 블록 형식의 경우 greedy 모드는 조인할 해당 데이터를 아직 사용할 수 없는 경우에도 블록이 데이터를 즉시 수락한다는 것을 의미합니다. 비욕심 모드는 각 대상에서 조인을 완료할 수 있을 때까지 블록이 들어오는 모든 메시지를 연기한다는 것을 의미합니다. 연기된 메시지를 더 이상 사용할 수 없는 경우 조인 블록은 연기된 모든 메시지를 해제하고 프로세스를 다시 시작합니다. BatchBlock<T> 클래스의 경우 비욕심 모드에서 BatchBlock<T> 개체는 일괄 처리를 완료하기 위해 고유 소스에서 충분히 사용할 수 있을 때까지 들어오는 모든 메시지를 연기한다는 점을 제외하고 욕심과 비욕적인 동작은 비슷합니다.

데이터 흐름 블록에 대한 비욕심 모드를 지정하려면 GreedyFalse설정합니다. 비욕심 모드를 사용하여 여러 조인 블록이 데이터 원본을 보다 효율적으로 공유할 수 있도록 하는 방법을 보여 주는 예제는 방법: JoinBlock을 사용하여 여러 원본에서 데이터 읽기참조하세요.

사용자 지정 데이터 흐름 블록

TPL 데이터 흐름 라이브러리는 미리 정의된 여러 블록 형식을 제공하지만 사용자 지정 동작을 수행하는 추가 블록 형식을 만들 수 있습니다. ISourceBlock<TOutput> 또는 ITargetBlock<TInput> 인터페이스를 직접 구현하거나 Encapsulate 메서드를 사용하여 기존 블록 형식의 동작을 캡슐화하는 복합 블록을 빌드합니다. 사용자 지정 데이터 흐름 블록 기능을 구현하는 방법을 보여 주는 예제는 연습: 사용자 지정 데이터 흐름 블록 형식만들기를 참조하세요.

제목 설명
방법: 데이터 흐름 블록에 메시지를 쓰고 메시지를 읽는 방법 BufferBlock<T> 개체에 메시지를 쓰고 메시지를 읽는 방법을 보여 줍니다.
방법: Producer-Consumer 데이터 흐름 패턴 구현 데이터 흐름 모델을 사용하여 생산자가 데이터 흐름 블록에 메시지를 보내고 소비자가 해당 블록에서 메시지를 읽는 생산자-소비자 패턴을 구현하는 방법을 설명합니다.
방법: 데이터 흐름 블록이 데이터 수신할 때 작업 수행 실행 데이터 흐름 블록 형식, ActionBlock<TInput>, TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput>대리자를 제공하는 방법을 설명합니다.
단계별 가이드: 데이터 흐름 파이프라인 만들기 웹에서 텍스트를 다운로드하고 해당 텍스트에 대한 작업을 수행하는 데이터 흐름 파이프라인을 만드는 방법을 설명합니다.
방법: 데이터 흐름 블록 연결 해제 소스가 대상에 메시지를 제공 한 후 LinkTo 메서드를 사용하여 원본에서 대상 블록의 연결을 해제하는 방법을 보여 줍니다.
자습서: Windows Forms 애플리케이션에서 데이터플로우 사용 Windows Forms 애플리케이션에서 이미지 처리를 수행하는 데이터 흐름 블록 네트워크를 만드는 방법을 보여 줍니다.
데이터 흐름 블록 취소 방법 Windows Forms 애플리케이션에서 취소를 사용하는 방법을 보여 줍니다.
방법: JoinBlock을 사용하여 여러 원본에서 데이터 읽기 JoinBlock<T1,T2> 클래스를 사용하여 여러 원본에서 데이터를 사용할 수 있는 경우 작업을 수행하는 방법과 비욕심 모드를 사용하여 여러 조인 블록이 데이터 원본을 보다 효율적으로 공유할 수 있도록 하는 방법을 설명합니다.
방법: 데이터 흐름 블록 병렬 처리 수준 지정 실행 데이터 흐름 블록이 한 번에 둘 이상의 메시지를 처리할 수 있도록 MaxDegreeOfParallelism 속성을 설정하는 방법을 설명합니다.
방법: 데이터 흐름 블록 작업 스케줄러 지정 애플리케이션에서 데이터 흐름을 사용할 때 특정 작업 스케줄러를 연결하는 방법을 보여 줍니다.
설명: BatchBlock 및 BatchedJoinBlock을 사용하여 효율성을 개선하는 방법 BatchBlock<T> 클래스를 사용하여 데이터베이스 삽입 작업의 효율성을 개선하는 방법과 BatchedJoinBlock<T1,T2> 클래스를 사용하여 프로그램이 데이터베이스에서 읽는 동안 발생하는 결과와 예외를 모두 캡처하는 방법을 설명합니다.
연습: 사용자 지정 데이터 흐름 블록 형식 만들기 사용자 지정 동작을 구현하는 데이터 흐름 블록 형식을 만드는 두 가지 방법을 보여 줍니다.
TPL(작업 병렬 라이브러리) .NET Framework 애플리케이션에서 병렬 및 동시 프로그래밍을 간소화하는 라이브러리인 TPL을 소개합니다.