データフロー (タスク並列ライブラリ)
タスク並列ライブラリ (TPL) で提供されるデータ フロー コンポーネントは、コンカレンシー対応アプリケーションの堅牢性の強化に役立てることができます。 これらのデータ フロー コンポーネントは TPL データ フロー ライブラリと総称されます。 データ フロー モデルは、粒度の粗いデータ フローおよびパイプライン処理タスクのためのインプロセス メッセージ パッシングを提供し、アクター ベースのプログラミング モデルを推進します。 データ フロー コンポーネントは、TPL の種類とスケジュール インフラストラクチャの上でビルドされ、非同期プログラミングをサポートするために C#、Visual Basic、および F# 言語と統合されています。 相互に非同期通信を行う必要がある複数の操作を行う場合、またはデータが使用可能になったときにデータを処理する場合に、これらのデータ フロー コンポーネントは役立ちます。 たとえば、Web カメラからのイメージ データを処理するアプリケーションを考えてみます。 データ フロー モデルを使用すると、イメージ フレームが使用可能になったときに、それをアプリケーションで処理できます。 たとえば、アプリケーションが輝度修正や赤目補正などを実行してイメージ フレームを向上させる場合、データ フロー コンポーネントのパイプラインを作成できます。 パイプラインの各ステージは、イメージを変換するために、TPL が提供する機能のような、粒度の粗い並列機能を使用する場合があります。
ここでは、TPL データ フロー ライブラリの概要を示します。 プログラミング モデル、定義済みのデータ フロー ブロックの型、およびアプリケーションの特定の要件を満たすためのデータ フロー ブロックの構成方法を説明します。
注意
TPL データフロー ライブラリ (System.Threading.Tasks.Dataflow 名前空間) は、.NET と一緒には配布されません。 Visual Studio に System.Threading.Tasks.Dataflow 名前空間をインストールするには、プロジェクトを開き、[プロジェクト] メニューの [NuGet パッケージの管理] をクリックし、System.Threading.Tasks.Dataflow
パッケージをオンラインで検索します。 または、.NET Core CLI を使ってインストールするには、dotnet add package System.Threading.Tasks.Dataflow
を実行します。
プログラミング モデル
TPL データ フロー ライブラリはメッセージ パッシングおよび、並列化され、CPU 負荷が高く、I/O 負荷が高く、スループットが高く、待機時間が短いアプリケーションのための基盤を提供します。 また、システム上でのデータのバッファリング方法や移動について、明示的に制御できます。 データ フロー プログラミング モデルの理解を深めるため、非同期的にディスクからイメージを読み込み、それらの合成イメージを作成するアプリケーションを考えてみます。 従来のプログラミング モデルでは、通常、タスクを協調させ、共有データにアクセスするには、コールバックおよびロックなどの同期オブジェクトを使用する必要があります。 データ フロー プログラミング モデルを使用すると、イメージがディスクから読み込まれたときにそれを処理する、データ フロー オブジェクトを作成できます。 データ フロー モデルでは、データが使用可能になったときの処理方法と、データ間の依存関係を宣言します。 ランタイムがデータ間の依存関係を管理するため、通常は共有データへのアクセスの同期要件を回避できます。 さらに、ランタイムのスケジュールは非同期のデータの到着に基づいて動作するため、基になるスレッドを効率的に管理することによって、データ フローの応答性とスループットが向上します。 Windows フォーム アプリケーションでイメージ処理を実装するためにデータフロー プログラミング モデルを使う例については、「チュートリアル:Windows フォーム アプリケーションでのデータフローの使用」を参照してください。
ソースとターゲット
TPL データ フロー ライブラリは、データのバッファリングと処理を行うデータ構造体であるデータ フロー ブロックで構成されます。 TPL はソース ブロック、ターゲット ブロック、および伝達子ブロックの 3 種類のデータ フロー ブロックを定義します。 ソース ブロックはデータのソースとして機能し、それを読み取ることができます。 ターゲット ブロックはデータのレシーバーとして機能し、それに書き込むことができます。 伝達子ブロックは、ソース ブロックとターゲット ブロックのどちらとしても機能し、読み取りも書き込みもできます。 TPL は System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> インターフェイスを定義してソースを表し、System.Threading.Tasks.Dataflow.ITargetBlock<TInput> によってターゲットを表し、System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> によって伝達子を表します。 IPropagatorBlock<TInput,TOutput> は、ISourceBlock<TOutput> と ITargetBlock<TInput> の両方から継承します。
TPL データ フロー ライブラリは、ISourceBlock<TOutput>、ITargetBlock<TInput>、および IPropagatorBlock<TInput,TOutput> のインターフェイスを実装する、複数の定義済みのデータ フロー ブロックの型を提供します。 これらのデータ フロー ブロックの型については、このドキュメントの「定義済みのデータ フロー ブロックの型」のセクションで説明します。
ブロックの接続
データ フロー ブロックを接続して、データ フロー ブロックのリニア シーケンスであるパイプラインを作成するか、またはデータ フロー ブロックのグラフであるネットワークを作成できます。 パイプラインは、ネットワークの 1 つの形態です。 パイプラインまたはネットワークでは、データが使用可能になると、ソースはターゲットに非同期的にデータを伝達します。 ISourceBlock<TOutput>.LinkTo メソッドは、ターゲット ブロックにソースのデータ フロー ブロックをリンクします。 ソースは、ターゲットにリンクしないか、または複数のターゲットにリンクできます。ターゲットはソースからリンクされないか、または複数のソースからリンクできます。 パイプラインまたはネットワークとの間でデータ フロー ブロックを同時に追加または削除できます。 定義済みのデータ フロー ブロックの型は、リンクとリンク解除のすべてのスレッド セーフな側面を処理します。
データフロー ブロックを接続して基本的なパイプラインを作成する例については、「チュートリアル:データフロー パイプラインの作成」を参照してください。 データフロー ブロックを接続して、より複雑なネットワークを作成する例については、「チュートリアル:Windows フォーム アプリケーションでのデータフローの使用」を参照してください。 ソースがターゲットにメッセージを提供した後でソースからターゲットのリンクを解除する例については、「方法:データ フロー ブロックのリンクを解除する」を参照してください。
フィルター処理
ISourceBlock<TOutput>.LinkTo メソッドを呼び出してソースをターゲットにリンクする場合、デリゲートを指定して、ターゲット ブロックがメッセージの値に基づいて、メッセージを受け入れるか拒否するかを決めることができます。 このフィルター機構は、データ フロー ブロックが特定の値のみを確実に受信するために便利な方法です。 定義済みのデータ フロー ブロックの型のほとんどの場合に、ソース ブロックが複数のターゲット ブロックに接続されている場合は、ターゲット ブロックがメッセージを拒否すると、ソースはそのメッセージを次のターゲットに提供します。 ソースがターゲットにメッセージを提供する順序はソースによって定義され、ソースの種類によって異なる場合があります。 1 つのターゲットがメッセージを受け入れると、ほとんどのソース ブロックの型がメッセージの提供を停止します。 この規則の 1 つの例外は、BroadcastBlock<T> クラスです。いくつかのターゲットがメッセージを拒否した場合でも、各メッセージをすべてのターゲットに提供します。 フィルター処理を使って、特定のメッセージのみを処理する例については、「チュートリアル:Windows フォーム アプリケーションでのデータフローの使用」を参照してください。
重要
それぞれの定義済みのソースのデータ フロー ブロックの型は、メッセージを受信した順にそれが伝達されることを保証するため、ソース ブロックは各メッセージを読み取ってから、次のメッセージを処理する必要があります。 したがって、フィルター処理を使用して複数のターゲットをソースに接続する場合、各メッセージを少なくとも 1 つのターゲット ブロックが受信するようにします。 そうしない場合、アプリケーションでデッドロックが発生する可能性があります。
メッセージ パッシング
データ フロー プログラミング モデルは、プログラムの独立したコンポーネントがメッセージの送信によって相互に通信する、メッセージ パッシングの概念に関連しています。 アプリケーション コンポーネントの間でメッセージを伝達する方法の 1 つは、Post (同期) および SendAsync (非同期) メソッドを呼び出してターゲット データフロー ブロックにメッセージを送信し、Receive、ReceiveAsync および、TryReceive メソッドを呼び出してソース ブロックからメッセージを受け取ることです。 入力データをヘッド ノード (ターゲット ブロック) に送信し、出力データをパイプラインのターミナル ノードまたはネットワーク (1 つ以上のソース ブロック) のターミナル ノードから受信することにより、これらのメソッドとデータ フロー パイプラインまたはネットワークを結合することができます。 Choose メソッドを使用して、データが使用可能な、指定されたソースの先頭から読み取り、そのデータにアクションを実行することもできます。
ソース ブロックは、ITargetBlock<TInput>.OfferMessage メソッドを呼び出して、ターゲット ブロックにデータを提供します。 ターゲット ブロックは、提供されたメッセージに、3 つの方法のうちの 1 つで応答します。メッセージを受け入れるか、メッセージを拒否するか、またはメッセージを延期できます。 ターゲットがメッセージを受け入れると、OfferMessage メソッドは Accepted を返します。 ターゲットがメッセージを拒否すると、OfferMessage メソッドは Declined を返します。 ターゲットがソースからメッセージを受け取らないことを要求すると、OfferMessage は DecliningPermanently を返します。 その戻り値を受信した後は、定義済みのソース ブロックの型は、リンクされたターゲットにメッセージを提供せず、自動的にそのターゲットからリンク解除します。
後で使用するために、ターゲット ブロックがメッセージを延期すると、OfferMessage メソッドは Postponed を返します。 メッセージを延期したターゲット ブロックは、後から ISourceBlock<TOutput>.ReserveMessage メソッドを呼び出して、提供されたメッセージの予約を試みることができます。 この時点で、メッセージはまだ使用できてターゲット ブロックから使用できるか、または他のターゲットに取得されています。 ターゲット ブロックがメッセージを後から必要とする場合、またはメッセージを必要としない場合には、それぞれ ISourceBlock<TOutput>.ConsumeMessage または ReleaseReservation メソッドを呼び出します。 通常は、メッセージの予約は、最短一致のモードで動作するデータ フロー ブロックの型で使用されます。 最短一致のモードについては、このドキュメントの後で説明します。 延期されたメッセージを予約せずに、ターゲット ブロックは ISourceBlock<TOutput>.ConsumeMessage メソッドを使用して、直接延期されたメッセージを使用しようとすることもできます。
データ フロー ブロックの完了
データ フロー ブロックは、完了の概念をサポートしています。 完了した状態にあるデータ フロー ブロックは、処理を実行しません。 各データ フロー ブロックには、ブロックの完了ステータスを表す "完了タスク" と呼ばれる System.Threading.Tasks.Task オブジェクトが関連付けられています。 Task オブジェクトの終了を待機できるため、完了タスクを使用して、データ フロー ネットワークの 1 つ以上のターミナル ノードが終了するまで待機できます。 IDataflowBlock インターフェイスは、データ フロー ブロックに完了の要求を通知する Complete メソッドを定義し、またデータ フロー ブロックの完了タスクを返す Completion プロパティを定義します。 ISourceBlock<TOutput> と ITargetBlock<TInput> はどちらも IDataflowBlock インターフェイスを継承します。
データ フロー ブロックがエラーなしで完了したか、1 つのエラーが発生したか、複数のエラーが発生したか、取り消されたかを知る 2 つの方法があります。 最初の方法は、try
-catch
ブロック (Visual Basic では Try
-Catch
) の完了タスクで Task.Wait メソッドを呼び出すことです。 次の例では、入力値が 0 未満の場合に ActionBlock<TInput> をスローする ArgumentOutOfRangeException オブジェクトを作成します。 この例では、完了タスクで AggregateException を呼び出すと、Wait がスローされます。 ArgumentOutOfRangeException には InnerExceptions オブジェクトの AggregateException プロパティを使用してアクセスします。
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine("n = {0}", n);
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine("Encountered {0}: {1}",
e.GetType().Name, e.Message);
return true;
});
}
/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
この例は、例外が実行データ フロー ブロックのデリゲートで処理されない場合を示します。 そのようなブロックの本体で例外を処理することをお勧めします。 そのようにできない場合は、ブロックはそれが取り消されたように動作し、受信メッセージを処理しません。
データ フロー ブロックが明示的に取り消されると、AggregateException オブジェクトには OperationCanceledException プロパティの InnerExceptions が含まれます。 データ フローの取り消しの詳細については、「取り消しの有効化」セクションを参照してください。
データ フロー ブロックの完了ステータスを判定する 2 番目の方法は、完了タスクの継続を使用するか、または C# と Visual Basic の非同期言語機能を使用して、完了したタスクを非同期的に待つことです。 Task.ContinueWith メソッドに提供するデリゲートは、継続元タスクを表す Task オブジェクトを受け取ります。 Completion プロパティの場合は、継続のデリゲートは完了タスクを受け取ります。 次の例は前の例に似ていますが、ContinueWith メソッドを使用してデータ フロー操作全体のステータスを出力する継続タスクも作成している点が異なります。
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine("n = {0}", n);
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
Console.WriteLine("The status of the completion task is '{0}'.",
task.Status);
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine("Encountered {0}: {1}",
e.GetType().Name, e.Message);
return true;
});
}
/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Create a continuation task that prints the overall
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' The status of the completion task is 'Faulted'.
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
また、継続タスクの本体で IsCanceled などのプロパティを使用して、データ フロー ブロックの完了ステータスに関する追加情報を決定することもできます。 継続タスク、および継続タスクと取り消し処理やエラー処理との関連の詳細については、「継続タスクを使用したタスクの連結」、「タスクのキャンセル」、および例外処理に関するページを参照してください。
定義済みのデータ フロー ブロックの型
TPL データ フロー ライブラリは、いくつかの定義済みのデータ フロー ブロックの型を提供します。 これらの型は、バッファリング ブロック、実行ブロック、グループ化ブロックの 3 種類に分けられます。 次のセクションでは、これらの種類を構成するブロックの型について説明します。
バッファリング ブロック
バッファリング ブロックはデータ コンシューマーが使用するデータを保持します。 TPL データ フロー ライブラリは System.Threading.Tasks.Dataflow.BufferBlock<T>、System.Threading.Tasks.Dataflow.BroadcastBlock<T> と System.Threading.Tasks.Dataflow.WriteOnceBlock<T> の 3 つのバッファリング ブロックの型を提供します。
BufferBlock<T>
BufferBlock<T> クラスは、汎用的な非同期メッセージング構造体を表します。 このクラスでは、複数のソースが書き込むことができるメッセージ、または複数のターゲットが読み取ることができるメッセージの先入れ先出し (FIFO) のキューを格納します。 ターゲットが BufferBlock<T> オブジェクトからメッセージを受信すると、そのメッセージはメッセージ キューから削除されます。 そのため、BufferBlock<T> オブジェクトには複数のターゲットを設定できますが、各メッセージを受信するターゲットは 1 つだけです。 BufferBlock<T> クラスは、複数のメッセージを別のコンポーネントに渡し、そのコンポーネントで各メッセージを受信する必要がある場合に便利です。
次の基本的な例は、Int32 オブジェクトに複数の BufferBlock<T> の値をポストし、その値をそのオブジェクトから読み込みます。
// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
/* Output:
0
1
2
*/
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()
' Post several messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
' Output:
' 0
' 1
' 2
'
BufferBlock<T> オブジェクトへのメッセージの書き込み方法や、オブジェクトからのメッセージの読み取り方法の完全な例については、「方法: データフロー ブロックに対してメッセージの読み取りと書き込みを行う」をご覧ください。
BroadcastBlock<T>
BroadcastBlock<T> クラスは、複数のメッセージを別のコンポーネントに渡すときに、そのコンポーネントで必要になるのが最新の値のみである場合に便利です。 また、このクラスは、メッセージを複数のコンポーネントにブロードキャストする場合にも便利です。
次の基本的な例は、Double の値を BroadcastBlock<T> オブジェクトにポストし、その値をそのオブジェクトから複数回読み込みます。 値は読み取られた後も BroadcastBlock<T> オブジェクトから削除されないため、毎回同じ値を使用できます。
// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);
// Post a message to the block.
broadcastBlock.Post(Math.PI);
// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(broadcastBlock.Receive());
}
/* Output:
3.14159265358979
3.14159265358979
3.14159265358979
*/
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)
' Post a message to the block.
broadcastBlock.Post(Math.PI)
' Receive the messages back from the block several times.
For i As Integer = 0 To 2
Console.WriteLine(broadcastBlock.Receive())
Next i
' Output:
' 3.14159265358979
' 3.14159265358979
' 3.14159265358979
'
BroadcastBlock<T> を使って複数のターゲット ブロックにメッセージをブロードキャストする方法を示す例について詳しくは、「方法: データフロー ブロックのタスク スケジューラを指定する」をご覧ください。
WriteOnceBlock<T>
WriteOnceBlock<T> クラスは BroadcastBlock<T> クラスに似ていますが、WriteOnceBlock<T> オブジェクトに 1 回しか書き込むことができない点が異なります。 WriteOnceBlock<T> は C# の readonly (Visual Basic では ReadOnly) キーワードと似ていると考えることができますが、構築時でなく、値を読み取った後は、WriteOnceBlock<T> オブジェクトを変更できなくなる点が異なります。 BroadcastBlock<T> クラスと同様に、ターゲットが WriteOnceBlock<T> オブジェクトからメッセージを受信しても、そのメッセージはそのオブジェクトから削除されません。 そのため、複数のターゲットがメッセージのコピーを受信します。 WriteOnceBlock<T> のクラスは、複数のメッセージの最初のメッセージだけを伝達する場合に便利です。
次の基本的な例は、String オブジェクトに複数の WriteOnceBlock<T> の値をポストし、その値をそのオブジェクトから読み込みます。 WriteOnceBlock<T> オブジェクトには 1 回だけ書き込むことができるため、WriteOnceBlock<T> オブジェクトは 1 つのメッセージを受信した後は、それ以降のメッセージを破棄します。
// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);
// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
() => writeOnceBlock.Post("Message 1"),
() => writeOnceBlock.Post("Message 2"),
() => writeOnceBlock.Post("Message 3"));
// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());
/* Sample output:
Message 2
*/
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)
' Post several messages to the block in parallel. The first
' message to be received is written to the block.
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))
' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())
' Sample output:
' Message 2
'
WriteOnceBlock<T> を使って終了した最初の操作の値を受け取る方法の例について詳しくは、「方法: データフロー ブロックのリンクを解除する」をご覧ください。
実行ブロック
実行ブロックは、受け取ったデータのそれぞれに、ユーザーが指定したデリゲートを呼び出します。 TPL データ フロー ライブラリは ActionBlock<TInput>、System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> と System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> の 3 つの実行ブロックの型を提供します。
ActionBlock<T>
ActionBlock<TInput> クラスは、データを受け取るとデリゲートを呼び出すターゲット ブロックです。 ActionBlock<TInput> オブジェクトは、データが使用可能になったときに非同期的に実行できるデリゲートと考えることができます。 ActionBlock<TInput> オブジェクトに提供するデリゲートは、Action<T> 型またはSystem.Func<TInput, Task>
型を使用できます。 ActionBlock<TInput> オブジェクトを Action<T> と共に使用すると、各入力要素の処理はデリゲートが返されたときに完了したと見なされます。 ActionBlock<TInput> オブジェクトを System.Func<TInput, Task>
と共に使用すると、各入力要素の処理は返された Task オブジェクトが終了した場合にのみ、完了したと見なされます。 この 2 つの方法を使って、ActionBlock<TInput> を使用して各入力要素を同期的にも非同期的にも処理することができます。
次の基本的な例では、Int32 オブジェクトに複数の ActionBlock<TInput> の値をポストします。 ActionBlock<TInput> オブジェクトは、コンソールにそれらの値を出力します。 次にこの例では、ブロックを完了した状態に設定し、すべてのデータ フロー タスクの終了を待機します。
// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
actionBlock.Post(i * 10);
}
// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();
/* Output:
0
10
20
*/
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))
' Post several messages to the block.
For i As Integer = 0 To 2
actionBlock.Post(i * 10)
Next i
' Set the block to the completed state and wait for all
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()
' Output:
' 0
' 10
' 20
'
ActionBlock<TInput> クラスでデリゲートを使う方法を示す完全な例については、「方法: データフロー ブロックでデータを受信したときにアクションを実行する」をご覧ください。
TransformBlock<TInput, TOutput>
TransformBlock<TInput,TOutput> クラスは ActionBlock<TInput> クラスに似ていますが、ソースとしてもターゲットとしても動作する点が異なります。 TransformBlock<TInput,TOutput> オブジェクトに渡すデリゲートは TOutput
型の値を返します。 TransformBlock<TInput,TOutput> オブジェクトに提供するデリゲートは、System.Func<TInput, TOutput>
型またはSystem.Func<TInput, Task<TOutput>>
型を使用できます。 TransformBlock<TInput,TOutput> オブジェクトを System.Func<TInput, TOutput>
と共に使用すると、各入力要素の処理はデリゲートが返されたときに完了したと見なされます。 TransformBlock<TInput,TOutput> オブジェクトを System.Func<TInput, Task<TOutput>>
と共に使用すると、各入力要素の処理は返された Task<TResult> オブジェクトが終了した場合にのみ、完了したと見なされます。 ActionBlock<TInput> と同様に、この 2 つの方法を使って、TransformBlock<TInput,TOutput> を使用して各入力要素を同期的にも非同期的にも処理することができます。
次の基本的な例では、入力の平方根を計算する TransformBlock<TInput,TOutput> オブジェクトを作成します。 TransformBlock<TInput,TOutput> オブジェクトは、入力として Int32 の値を受け取り、出力として Double の値を生成します。
// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));
// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);
// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(transformBlock.Receive());
}
/* Output:
3.16227766016838
4.47213595499958
5.47722557505166
*/
' Create a TransformBlock<int, double> object that
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))
' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)
' Read the output messages from the block.
For i As Integer = 0 To 2
Console.WriteLine(transformBlock.Receive())
Next i
' Output:
' 3.16227766016838
' 4.47213595499958
' 5.47722557505166
'
Windows フォーム アプリケーションでイメージ処理を実行するデータフロー ブロックのネットワークで TransformBlock<TInput,TOutput> を使う例について詳しくは、「チュートリアル: Windows フォーム アプリケーションでのデータフローの使用」をご覧ください。
TransformManyBlock<TInput, TOutput>
TransformManyBlock<TInput,TOutput> クラスは TransformBlock<TInput,TOutput> クラスに似ていますが、各入力値に 1 つの出力値を生成するのでなく、TransformManyBlock<TInput,TOutput> は各入力値に出力値を生成しないか、または 1 つ以上の出力値を生成する点が異なります。 TransformManyBlock<TInput,TOutput> オブジェクトに提供するデリゲートは、System.Func<TInput, IEnumerable<TOutput>>
型またはSystem.Func<TInput, Task<IEnumerable<TOutput>>>
型を使用できます。 TransformManyBlock<TInput,TOutput> オブジェクトを System.Func<TInput, IEnumerable<TOutput>>
と共に使用すると、各入力要素の処理はデリゲートが返されたときに完了したと見なされます。 TransformManyBlock<TInput,TOutput> オブジェクトを System.Func<TInput, Task<IEnumerable<TOutput>>>
と共に使用すると、各入力要素の処理は返された System.Threading.Tasks.Task<IEnumerable<TOutput>>
オブジェクトが終了した場合にのみ、完了したと見なされます。
次の基本的な例は、文字列を個別の文字のシーケンスに分割する TransformManyBlock<TInput,TOutput> オブジェクトを作成します。 TransformManyBlock<TInput,TOutput> オブジェクトは、入力として String の値を受け取り、出力として Char の値を生成します。
// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
s => s.ToCharArray());
// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");
// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
Console.WriteLine(transformManyBlock.Receive());
}
/* Output:
H
e
l
l
o
W
o
r
l
d
*/
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())
' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")
' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
Console.WriteLine(transformManyBlock.Receive())
Next i
' Output:
' H
' e
' l
' l
' o
' W
' o
' r
' l
' d
'
TransformManyBlock<TInput,TOutput> を使ってデータフロー パイプラインの各入力に複数の独立した出力を生成する方法について詳しくは、「チュートリアル: データフロー パイプラインの作成」をご覧ください。
並列化の次数
すべての ActionBlock<TInput>, TransformBlock<TInput,TOutput> と TransformManyBlock<TInput,TOutput> オブジェクトは、ブロックが入力メッセージを処理できるようになるまで、入力メッセージをバッファリングします。 既定では、これらのクラスはメッセージを受信した順序で、一度に 1 つずつ処理します。 ActionBlock<TInput>、TransformBlock<TInput,TOutput> と TransformManyBlock<TInput,TOutput> のオブジェクトを有効化し、並列化の次数を指定して、複数のメッセージを同時に処理することもできます。 同時実行に関する詳細については、このドキュメントの後の「並列化の次数の指定」のセクションを参照してください。 並列化の次数を設定して実行データフロー ブロックを有効化し、同時に複数のメッセージを処理する例については、「方法:データ フロー ブロックで並列処理の範囲を指定する」を参照してください。
デリゲート型の概要
次の表に、ActionBlock<TInput>、TransformBlock<TInput,TOutput>、および TransformManyBlock<TInput,TOutput> オブジェクトに指定できるデリゲート型の概要を示します。 このテーブルでは、デリゲート型が同期的または非同期的に動作するかどうかについても示しています。
種類 | 同期的なデリゲート型 | 非同期的なデリゲート型 |
---|---|---|
ActionBlock<TInput> | System.Action |
System.Func<TInput, Task> |
TransformBlock<TInput,TOutput> | System.Func<TInput, TOutput> |
System.Func<TInput, Task<TOutput>> |
TransformManyBlock<TInput,TOutput> | System.Func<TInput, IEnumerable<TOutput>> |
System.Func<TInput, Task<IEnumerable<TOutput>>> |
実行ブロックの型を使用する場合にラムダ式を使用することもできます。 実行ブロックと共にラムダ式を使用する方法を示した例については、「方法:データフロー ブロックでデータを受信したときにアクションを実行する」を参照してください。
グループ化ブロック
グループ化ブロックは、さまざまな制約の下で 1 つ以上のソースからデータをまとめます。 TPL データ フロー ライブラリは BatchBlock<T>、JoinBlock<T1,T2> と BatchedJoinBlock<T1,T2> の 3 つの結合ブロックの型を提供します。
BatchBlock<T>
BatchBlock<T> クラスは、バッチと呼ばれる入力データのセットを、出力データの配列に結合します。 BatchBlock<T> オブジェクトを作成するときに、各バッチのサイズを指定します。 BatchBlock<T> オブジェクトは、指定した数の入力要素を受け取ると、その要素を含む配列を非同期的に伝達します。 BatchBlock<T> オブジェクトが完了状態に設定されているが、バッチを形成するために十分な要素を含んでいない場合には、残りの入力要素を含む最終的な配列を伝達します。
BatchBlock<T> クラスは、"最長一致" モードまたは "最短一致" モードのどちらかで動作します。 既定では最長一致モードで、BatchBlock<T> オブジェクトは提供されたすべてのメッセージを受け取り、指定された数の要素を受け取った後に、配列を伝達します。 最短一致モードでは、BatchBlock<T> オブジェクトは、バッチを作成するために十分なソースがブロックへのメッセージを提供するまで、すべての受信メッセージを延期します。 最長一致モードでは通常、最短一致モードよりも処理のオーバーヘッドが低いため、パフォーマンスがよくなります。 ただし、アトミックな方法で複数のソースの使用量を調整する必要がある場合、最短一致モードを使用できます。 最短一致モードを指定するには、Greedy コンストラクターの False
パラメーターの dataflowBlockOptions
へ BatchBlock<T> を設定します。
次の基本的な例では、複数の Int32 の値を、バッチに 10 個の要素を持つ BatchBlock<T> オブジェクトにポストします。 BatchBlock<T> からすべての値が伝達されることを保証するために、この例では Complete メソッドを呼び出します。 Complete メソッドは BatchBlock<T> オブジェクトを完了状態に設定するため、BatchBlock<T> オブジェクトは残りのすべての要素を最終バッチとして伝達します。
// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);
// Post several values to the block.
for (int i = 0; i < 13; i++)
{
batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();
// Print the sum of both batches.
Console.WriteLine("The sum of the elements in batch 1 is {0}.",
batchBlock.Receive().Sum());
Console.WriteLine("The sum of the elements in batch 2 is {0}.",
batchBlock.Receive().Sum());
/* Output:
The sum of the elements in batch 1 is 45.
The sum of the elements in batch 2 is 33.
*/
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)
' Post several values to the block.
For i As Integer = 0 To 12
batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()
' Print the sum of both batches.
Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())
Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())
' Output:
' The sum of the elements in batch 1 is 45.
' The sum of the elements in batch 2 is 33.
'
BatchBlock<T> を使ってデータベース挿入操作の効率を向上させる完全な例については、「チュートリアル: BatchBlock および BatchedJoinBlock を使用した効率の向上」をご覧ください。
JoinBlock<T1, T2, ...>
JoinBlock<T1,T2> と JoinBlock<T1,T2,T3> クラスは、入力要素を収集して、それらの要素を含む System.Tuple<T1,T2> または System.Tuple<T1,T2,T3> オブジェクトを伝達します。 JoinBlock<T1,T2> クラスと JoinBlock<T1,T2,T3> クラスは、ITargetBlock<TInput> を継承しません。 代わりに、Target1 を実装する Target2、Target3、および ITargetBlock<TInput> のプロパティを提供します。
BatchBlock<T> と同様に、JoinBlock<T1,T2> および JoinBlock<T1,T2,T3> は、最長一致モードまたは最短一致モードのどちらかで動作します。 既定では最長一致モードで、JoinBlock<T1,T2> または JoinBlock<T1,T2,T3> オブジェクトは提供されたすべてのメッセージを受け取り、各ターゲットが少なくとも 1 つのメッセージを受け取った後で、タプルを伝達します。 最短一致モードでは、JoinBlock<T1,T2> または JoinBlock<T1,T2,T3> オブジェクトは、すべてのターゲットがタプルを作成するために必要なデータを提供されるまで、すべての受信メッセージを延期します。 この時点で、ブロックは 2 フェーズ コミット プロトコルによって、ソースからすべての必要な項目をアトミックに取得します。 この延期により、他のエンティティは当面の間データを使用でき、システム全体が進行できます。
次の基本的な例は、値を計算するために JoinBlock<T1,T2,T3> オブジェクトが複数のデータを必要とするケースを示します。 この例では、算術演算を実行するために 2 つの JoinBlock<T1,T2,T3> の値と 1 つの Int32 の値を必要とする Char オブジェクトを作成します。
// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();
// Post two values to each target of the join.
joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);
joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);
joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');
// Receive each group of values and apply the operator part
// to the number parts.
for (int i = 0; i < 2; i++)
{
var data = joinBlock.Receive();
switch (data.Item3)
{
case '+':
Console.WriteLine("{0} + {1} = {2}",
data.Item1, data.Item2, data.Item1 + data.Item2);
break;
case '-':
Console.WriteLine("{0} - {1} = {2}",
data.Item1, data.Item2, data.Item1 - data.Item2);
break;
default:
Console.WriteLine("Unknown operator '{0}'.", data.Item3);
break;
}
}
/* Output:
3 + 5 = 8
6 - 4 = 2
*/
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()
' Post two values to each target of the join.
joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)
joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)
joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)
' Receive each group of values and apply the operator part
' to the number parts.
For i As Integer = 0 To 1
Dim data = joinBlock.Receive()
Select Case data.Item3
Case "+"c
Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
Case "-"c
Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
Case Else
Console.WriteLine("Unknown operator '{0}'.", data.Item3)
End Select
Next i
' Output:
' 3 + 5 = 8
' 6 - 4 = 2
'
最短一致モードで JoinBlock<T1,T2> オブジェクトを使って協調的にリソースを共有する例について詳しくは、「方法: JoinBlock を使用して複数のソースからデータを読み込む」をご覧ください。
BatchedJoinBlock<T1, T2, ...>
BatchedJoinBlock<T1,T2> と BatchedJoinBlock<T1,T2,T3> クラスは、入力要素のバッチを収集して、それらの要素を含む System.Tuple(IList(T1), IList(T2))
または System.Tuple(IList(T1), IList(T2), IList(T3))
オブジェクトを伝達します。 BatchedJoinBlock<T1,T2> は BatchBlock<T> と JoinBlock<T1,T2> の組み合わせであると考えることができます。 BatchedJoinBlock<T1,T2> オブジェクトを作成するときに、各バッチのサイズを指定します。 BatchedJoinBlock<T1,T2> は、Target1 を実装する Target2 および ITargetBlock<TInput> のプロパティを提供します。 すべてのターゲットから指定した数の入力要素を受け取ると、BatchedJoinBlock<T1,T2> オブジェクトは、それらの要素を含む System.Tuple(IList(T1), IList(T2))
オブジェクトを非同期的に伝達します。
次の基本的な例では、結果を保持する BatchedJoinBlock<T1,T2> オブジェクト、Int32 の値、エラーである Exception オブジェクトを作成します。 この例では複数の操作を実行し、結果を Target1 オブジェクトの Target2 プロパティに書き込み、エラーを BatchedJoinBlock<T1,T2> プロパティに書き込みます。 操作の成功と失敗の数はあらかじめ不明であるため、IList<T> オブジェクトは、各ターゲット値を受け取らないことも、複数の値を受け取ることも可能です。
// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
if (n < 0)
throw new ArgumentOutOfRangeException();
return n;
};
// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);
// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
try
{
// Post the result of the worker to the
// first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i));
}
catch (ArgumentOutOfRangeException e)
{
// If an error occurred, post the Exception to the
// second target of the block.
batchedJoinBlock.Target2.Post(e);
}
}
// Read the results from the block.
var results = batchedJoinBlock.Receive();
// Print the results to the console.
// Print the results.
foreach (int n in results.Item1)
{
Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
Console.WriteLine(e.Message);
}
/* Output:
5
6
13
55
0
Specified argument was out of the range of valid values.
Specified argument was out of the range of valid values.
*/
' For demonstration, create a Func<int, int> that
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
Return n
End Function
' Create a BatchedJoinBlock<int, Exception> object that holds
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)
' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
Try
' Post the result of the worker to the
' first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i))
Catch e As ArgumentOutOfRangeException
' If an error occurred, post the Exception to the
' second target of the block.
batchedJoinBlock.Target2.Post(e)
End Try
Next i
' Read the results from the block.
Dim results = batchedJoinBlock.Receive()
' Print the results to the console.
' Print the results.
For Each n As Integer In results.Item1
Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
Console.WriteLine(e.Message)
Next e
' Output:
' 5
' 6
' 13
' 55
' 0
' Specified argument was out of the range of valid values.
' Specified argument was out of the range of valid values.
'
BatchedJoinBlock<T1,T2> を使用して、プログラムがデータベースを読み取る間にその結果と発生する例外の両方をキャプチャする例について詳しくは、「チュートリアル: BatchBlock および BatchedJoinBlock を使用した効率の向上」をご覧ください。
データ フロー ブロックの動作の構成
データ フロー ブロックの型のコンストラクターに System.Threading.Tasks.Dataflow.DataflowBlockOptions のオブジェクトを提供することによって、追加のオプションを有効にできます。 これらのオプションは、基になるタスクと並列化の次数を管理するスケジューラなどの動作を制御します。 DataflowBlockOptions には、特定のデータ フロー ブロックの型に固有の動作を指定する派生型があります。 各データ フロー ブロックの型に関連付けられているオプション型の概要を次の表に示します。
以下のセクションでは、System.Threading.Tasks.Dataflow.DataflowBlockOptions、System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions と System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions クラスを通じて利用できる、重要なデータ フロー ブロックのオプションに関する追加情報を提供します。
タスク スケジューラの指定
すべての定義済みのデータ フロー ブロックは TPL タスク スケジューリング メカニズムを使って、ターゲットへのデータの伝達、ソースからのデータの受け取り、データが使用可能になったときのユーザー定義のデリゲートの実行、などのアクティビティを実行します。 TaskScheduler は、タスクをスレッドのキューに置くタスク スケジューラを表す抽象クラスです。 既定のタスク スケジューラである Default は、ThreadPool クラスを使用して作業をキューに置き、実行します。 データ フロー ブロック オブジェクトを構成する場合、TaskScheduler プロパティを設定して、既定のタスク スケジューラをオーバーライドできます。
同じタスク スケジューラが複数のデータ フロー ブロックを管理する場合、それらにポリシーを強制的に適用できます。 たとえば、複数のデータ フロー ブロックが、それぞれ同じ ConcurrentExclusiveSchedulerPair オブジェクトの排他スケジューラを対象とするように構成されている場合、これらのブロックにわたって実行されるすべての操作がシリアル化されます。 同様に、これらのブロックが同じ ConcurrentExclusiveSchedulerPair オブジェクトのコンカレント スケジューラをターゲットとして構成され、そのスケジューラが最大コンカレンシー レベルに構成されている場合、これらのブロックのすべての操作は、そのコンカレント操作の数に制限されます。 ConcurrentExclusiveSchedulerPair クラスを使って、読み取り操作は並列で実行可能とするものの、書き込み操作はすべての操作で排他的に行う例について詳しくは、「方法: データフロー ブロックのタスク スケジューラを指定する」をご覧ください。 TPL のタスク スケジューラの詳細については、TaskScheduler クラスに関するトピックを参照してください。
並列化の次数の指定
既定では、TPL データ フロー ライブラリが提供する 3 つの実行ブロックの型、ActionBlock<TInput>、TransformBlock<TInput,TOutput> および TransformManyBlock<TInput,TOutput> は、一度に 1 つのメッセージを処理します。 これらのデータ フロー ブロックの型は、メッセージを受け取った順番でそれを処理します。 データ フロー ブロックのオブジェクトを構築するときに、これらのデータ フロー ブロックがメッセージを同時に操作できるようにするには、ExecutionDataflowBlockOptions.MaxDegreeOfParallelism のプロパティを設定します。
MaxDegreeOfParallelism の既定値は 1 で、これはデータ フロー ブロックが一度に 1 つのメッセージを処理することを保証します。 このプロパティに 1 を超える値に設定すると、データ フロー ブロックは複数のメッセージを同時に処理できます。 このプロパティを DataflowBlockOptions.Unbounded に設定すると、基になるタスク スケジューラは最大のコンカレンシーの程度を管理することができます。
重要
並列処理の最大範囲に 1 を超える数を指定すると、複数のメッセージを同時に処理するため、メッセージが受信した順序で処理されない場合があります。 ただし、ブロックからのメッセージが出力される順序は、メッセージが受信された順序と同じです。
MaxDegreeOfParallelism プロパティは並列処理の最大範囲を表すため、データ フロー ブロックは、指定より低い並列化の度合いで実行される場合があります。 機能要件を満たすため、または使用可能なシステム リソースの不足のため、データ フロー ブロックは、より低い並列化の度合いを使う場合があります。 データ フロー ブロックは、指定より大きな並列化を選択することはありません。
MaxDegreeOfParallelism プロパティの値は、各データ フロー ブロックのオブジェクトに排他的です。 たとえば、4 つのデータ フロー ブロックのオブジェクトが並列処理の最大範囲にそれぞれ 1 を指定した場合、4 つすべてのデータ フロー ブロック オブジェクトを並列に実行できる場合もあります。
並列処理の最大範囲を設定して時間のかかる操作を並列に行う例については、「方法:データ フロー ブロックで並列処理の範囲を指定する」を参照してください。
タスクごとのメッセージ数の指定
定義済みのデータ フロー ブロックの型は、複数の入力要素を処理するタスクを使用します。 これによって、データを処理するために必要なタスクのオブジェクトの数を最小限に抑えることができ、アプリケーションを効率的に実行できます。 ただし、一連のデータ フロー ブロックのタスクがデータを処理する場合、他のデータ フロー ブロックのタスクはメッセージをキューに置いて処理の時間を待機する必要がある場合があります。 データ フロー タスク間での高い公平性を有効にするには、MaxMessagesPerTask プロパティを設定します。 MaxMessagesPerTask が既定値である DataflowBlockOptions.Unbounded に設定されている場合、データ フロー ブロックが使用するタスクは、可能な限り多くのメッセージを処理します。 MaxMessagesPerTask が Unbounded 以外の値に設定されている場合、データ フロー ブロックは、Task オブジェクトごとに最大でこの数のメッセージを処理します。 MaxMessagesPerTask プロパティを設定すると、タスク間の公平性を向上できますが、システムが必要以上のタスクを作成する場合があり、その場合にはパフォーマンスが低下する場合があります。
取り消しの有効化
TPL はタスクが協調的な方法によって取り消しの調整をできる機構を提供します。 データ フロー ブロックがこの取り消し機構に参加するためには、CancellationToken プロパティを設定します。 CancellationToken オブジェクトを取り消し状態に設定すると、このトークンを監視するすべてのデータ フロー ブロックは現在の項目の実行を終了しますが、後続の項目を開始しません。 また、これらのデータ フロー ブロックはすべてのバッファリングされたメッセージをクリアし、すべてのソースとターゲット ブロックへの接続を解放し、取り消し状態に遷移します。 取り消し状態に遷移すると、処理中に例外が発生しない限り、Completion プロパティは Status プロパティを Canceled に設定します。 この場合、Status は Faulted に設定されます。
Windows フォーム アプリケーションで取り消しを使う方法を示した例については、「方法:データフロー ブロックをキャンセルする」をご覧ください。 TPL での取り消し処理の詳細については、「タスクのキャンセル」を参照してください。
最長一致と最短一致の動作の指定
複数のグループ化データ フロー ブロックの型は最長一致モードまたは最短一致モードのどちらかで動作します。 既定では、定義済みのデータ フロー ブロックの型は、最長一致モードで動作します。
JoinBlock<T1,T2> などの結合ブロックでは、最長一致モードは、対応する結合データが使用できない場合でも、ブロックが直ちにデータを受け入れることを意味します。 最短一致モードは、ターゲットのそれぞれで結合が完了できるように使用可能になるまで、ブロックがすべての受信メッセージを延期することを意味します。 延期されたメッセージのいずれかが使用できなくなった場合、結合ブロックは延期されたすべてのメッセージを解放し、プロセスを再起動します。 BatchBlock<T> クラスにおいては、最長一致と最短一致の動作は似ていますが、最短一致の場合には、バッチを完了するために十分なメッセージを別のソースから使用できるようになるまで BatchBlock<T> オブジェクトがすべての受信メッセージを延期する点が異なります。
データ フロー ブロックに最短一致モードを指定するには、Greedy を False
に設定します。 最短一致モードを使い、複数の結合ブロックを有効にして、データ ソースをより効率的に共有する例については、「方法:JoinBlock を使用して複数のソースからデータを読み込む」を参照してください。
カスタム データ フロー ブロック
TPL データ フロー ライブラリは多くの定義済みブロックの型を提供しますが、カスタム動作を実行する追加のブロックの型を作成できます。 ISourceBlock<TOutput> または ITargetBlock<TInput> インターフェイスを直接実装するか、または Encapsulate メソッドを使用して、既存のブロックの型の動作をカプセル化する複雑なブロックをビルドします。 カスタム データフロー ブロック機能の実装方法を示した例については、「チュートリアル:カスタム データフロー ブロックの型の作成」を参照してください。
関連トピック
Title | 説明 |
---|---|
方法: データフロー ブロックに対してメッセージの読み取りと書き込みを行う | BufferBlock<T> オブジェクトにメッセージを書き込む方法とメッセージを読み取る方法を示します。 |
方法: プロデューサー/コンシューマーのデータフロー パターンを実装する | データ フロー モデルを使ってプロデューサー/コンシューマー パターンを実装し、プロデューサーがデータ フロー ブロックにメッセージを送信して、コンシューマーがそのブロックからメッセージを読み取る方法を示します。 |
方法: データフロー ブロックでデータを受信したときにアクションを実行する | デリゲートを実行データ フロー ブロックの型である、ActionBlock<TInput>、TransformBlock<TInput,TOutput> と TransformManyBlock<TInput,TOutput> に提供する方法について説明します。 |
チュートリアル: データフロー パイプラインの作成 | Web からテキストをダウンロードし、そのテキストの操作を実行する、データ フロー パイプラインを作成する方法について説明します。 |
方法: データフロー ブロックのリンクを解除する | LinkTo メソッドを使って、ソースがターゲットにメッセージを提供した後に、ターゲット ブロックをソースからリンク解除する方法を示します。 |
チュートリアル: Windows フォーム アプリケーションでのデータフローの使用 | Windows フォーム アプリケーションでイメージ処理を実行する、データ フロー ブロックのネットワークを作成する方法を示します。 |
方法: データフロー ブロックをキャンセルする | Windows フォーム アプリケーションで取り消しを使う方法を説明します。 |
方法: JoinBlock を使用して複数のソースからデータを読み込む | データが複数のソースから使用できるようになった場合に JoinBlock<T1,T2> クラスを使って操作を実行する方法、および最短一致モードを使って、複数の結合ブロックがデータ ソースをより効率的に共有できる方法について説明します。 |
方法: データフロー ブロックで並列処理の範囲を指定する | MaxDegreeOfParallelism プロパティを設定して、実行データ フロー ブロックが一度に 1 つ以上のメッセージを処理できる方法について説明します。 |
方法: データフロー ブロックのタスク スケジューラを指定する | アプリケーションでデータ フローを使用する場合に特定のタスク スケジューラを関連付ける方法を示します。 |
チュートリアル: BatchBlock および BatchedJoinBlock を使用した効率の向上 | BatchBlock<T> クラスを使用してデータベースの挿入操作の効率を向上する方法、および BatchedJoinBlock<T1,T2> クラスを使用して、プログラムがデータベースを読み取る間にその結果と発生する例外の両方をキャプチャする方法について説明します。 |
チュートリアル: カスタム データフロー ブロックの型の作成 | カスタム動作を実装するデータ フロー ブロックの型を作成する 2 とおりの方法を示します。 |
タスク並列ライブラリ (TPL) | .NET Framework アプリケーションの並列処理と並列プログラミングを簡略化するライブラリである TPL を紹介します。 |
.NET