任务并行库(TPL)提供数据流组件,以帮助提高启用并发的应用程序的可靠性。 这些数据流组件统称为 TPL 数据流库。 此数据流模型通过为粗粒度数据流和管道任务提供进程内消息传递来提升基于执行组件的编程。 数据流组件基于 TPL 的类型和计划基础结构构建,并与 C#、Visual Basic 和 F# 语言支持集成,用于异步编程。 如果需要多个操作彼此进行异步通信,或者希望在数据可用时就进行处理,这些数据流组件非常有用。 例如,请考虑处理来自 Web 相机的图像数据的应用程序。 通过使用数据流模型,应用程序可以在图像帧可用时进行处理。 例如,如果应用程序通过执行浅色更正或红眼减少来增强图像帧,则可以创建数据流组件的 管道 。 管道的每个阶段都可以使用更粗粒度的并行功能(例如 TPL 提供的功能)来转换图像。
本文档概述了 TPL 数据流库。 其中介绍了编程模型、预定义的数据流块类型,以及如何配置数据流块以满足应用程序的特定要求。
注释
TPL 数据流库( System.Threading.Tasks.Dataflow 命名空间)未随 .NET 一起分发。 若要在 Visual Studio 中安装System.Threading.Tasks.Dataflow命名空间,请打开项目,从“项目”菜单中选择“管理 NuGet 包”,然后联机搜索包System.Threading.Tasks.Dataflow
。 或者,若要使用 .NET Core CLI 安装它,请运行 dotnet add package System.Threading.Tasks.Dataflow
。
编程模型
TPL 数据流库为消息传递和并行化 CPU 密集型和 I/O 密集型应用程序提供了基础,这些应用程序具有高吞吐量和低延迟。 它还为您提供了对数据缓冲和在系统中传输方式的显式控制。 为了更好地了解数据流编程模型,请考虑一个应用程序,该程序从磁盘异步加载图像并创建这些图像的合成图像。 传统的编程模型通常要求使用回调和同步对象(如锁)来协调任务并访问共享数据。 通过使用数据流编程模型,可以创建在从磁盘读取图像时处理图像的数据流对象。 在数据流模型中,声明数据在可用时如何处理数据,以及数据之间的任何依赖关系。 由于运行时管理数据之间的依赖关系,因此通常可以避免同步对共享数据的访问。 此外,由于运行时根据数据的异步到达计划工作,数据流可以通过有效地管理基础线程来提高响应能力和吞吐量。 有关使用数据流编程模型在 Windows 窗体应用程序中实现图像处理的示例,请参阅 演练:在 Windows 窗体应用程序中使用数据流。
源和目标
TPL 数据流库由 数据流块组成,这些块是缓冲和处理数据的数据结构。 TPL 定义了三种类型的数据流块: 源块、 目标块和 传播器块。 源块充当数据源,可从中读取。 目标块作为数据接收方,可以写入。 传播器块既充当源块又充当目标块,并且可以从中读取和写入。 TPL 定义 System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> 表示源、 System.Threading.Tasks.Dataflow.ITargetBlock<TInput> 表示目标以及 System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> 表示传播器的接口。 IPropagatorBlock<TInput,TOutput> 继承自两者 ISourceBlock<TOutput>,以及 ITargetBlock<TInput>。
TPL 数据流库提供了若干预定义的数据流块类型,这些块实现了ISourceBlock<TOutput>、ITargetBlock<TInput> 和 IPropagatorBlock<TInput,TOutput> 接口。 这些数据流块类型在本文档的 “预定义数据流块类型”部分中进行了介绍。
连接块
可以将数据流块连接起来形成管道,即数据流块的线性序列,或者形成网络,即数据流块的图形结构。 管道是一种网络形式。 在管道或网络中,源在数据可用时以异步方式将数据传播到目标。 该方法 ISourceBlock<TOutput>.LinkTo 将源数据流块链接到目标块。 源可以链接到零个或多个目标;目标可以从零个或多个源进行链接。 可以同时向管道或网络添加或删除数据流块。 预定义的数据流块类型处理链接和取消链接的所有线程安全方面。
有关连接数据流块以形成基本管道的示例,请参阅 演练:创建数据流管道。 有关连接数据流块以形成更复杂的网络的示例,请参阅 演练:在 Windows 窗体应用程序中使用数据流。 有关在源提供目标消息后从源取消链接目标的示例,请参阅 “如何:取消链接数据流块”。
筛选
当调用 ISourceBlock<TOutput>.LinkTo 方法将源链接到目标时,可以提供一个委托,该委托根据消息的值来确定目标块是接受还是拒绝该消息。 此筛选机制是保证数据流块仅接收特定值的有用方法。 对于大多数预定义的数据流块类型,如果源块连接到多个目标块,当目标块拒绝消息时,源会将该消息提供给下一个目标。 源向目标提供消息的顺序由源定义,根据源的类型而异。 大多数源块类型在一个目标接受该消息后停止提供消息。 此规则的一个例外是 BroadcastBlock<T> 类,该类向所有目标提供每条消息,即使某些目标拒绝该邮件。 有关使用筛选仅处理某些消息的示例,请参阅 演练:在 Windows 窗体应用程序中使用数据流。
重要
由于每个预定义的源数据流块类型都保证消息按接收顺序传播出来,因此每个消息都必须从源块中读取,然后源块才能处理下一条消息。 因此,使用筛选将多个目标连接到源时,请确保至少有一个目标块接收每个消息。 否则,您的应用程序可能发生死锁。
消息传递
数据流编程模型与 消息传递的概念相关,其中程序的独立组件通过发送消息相互通信。 在应用程序组件之间传播消息的一种方法是调用Post(同步)和SendAsync(异步)方法来将消息发送到目标数据流块,以及ReceiveReceiveAsyncTryReceive用于从源块接收消息的方法。 可以将这些方法与数据流管道或网络相结合,方法是将输入数据发送到头节点(目标块),以及从管道的终端节点或网络的终端节点(一个或多个源块)接收输出数据。 可以使用方法 Choose 从提供的第一个源中读取数据,并对该数据执行操作。
源块通过调用 ITargetBlock<TInput>.OfferMessage 该方法向目标块提供数据。 目标块通过以下三种方式之一响应提供的消息:它可以接受消息、拒绝消息或推迟消息。 当目标接受消息时,该方法 OfferMessage 返回 Accepted。 当目标拒绝消息时,该方法 OfferMessage 返回 Declined。 当目标要求它不再从源接收任何消息时, OfferMessage 返回 DecliningPermanently。 在收到此类返回值后,预定义的源块类型不会向链接目标提供消息,并且它们会自动从此类目标取消链接。
当目标块推迟消息供以后使用时,该方法 OfferMessage 将 Postponed返回。 推迟消息的目标块可以稍后调用 ISourceBlock<TOutput>.ReserveMessage 方法以尝试保留提供的消息。 此时,消息仍可用,可由目标块使用,或者消息已被另一个目标占用。 当目标块稍后需要消息或不再需要消息时,它将分别调用 ISourceBlock<TOutput>.ConsumeMessage 或 ReleaseReservation 方法。 消息预留通常由以非贪婪模式运行的数据流块类型使用。 本文档稍后将介绍非贪婪模式。 除了保留推迟的消息,目标块也可以使用 ISourceBlock<TOutput>.ConsumeMessage 方法来尝试直接使用推迟的消息。
数据流块完成
数据流块还支持 完成的概念。 处于已完成状态的数据流块不会执行任何进一步的工作。 每个数据流块都有一个关联的 System.Threading.Tasks.Task 对象,称为 完成任务,表示块的完成状态。 由于您可以通过使用完成任务的操作来等待 Task 对象完成,您也可以等待数据流网络中的一个或多个终末节点的完成。 该 IDataflowBlock 接口定义 Complete 方法,该方法通知数据流块请求完成,以及 Completion 返回数据流块完成任务的属性。 同时ISourceBlock<TOutput>继承ITargetBlock<TInput>IDataflowBlock接口。
有两种方法可以确定数据流块是完成而不出错、遇到一个或多个错误还是已取消。 第一种方法是在 Task.Waittry
- 块(在 Visual Basic 中为 catch
Try
-)中对完成任务调用 Catch
方法。 以下示例创建一个 ActionBlock<TInput> 对象,该对象在输入值小于零时引发 ArgumentOutOfRangeException 。 当此示例在完成任务后调用 AggregateException 时,将引发 Wait。 通过 ArgumentOutOfRangeException 对象的 InnerExceptions 属性访问 AggregateException 。
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine($"n = {n}");
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
return true;
});
}
/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
此示例演示在执行数据流块的委托中异常变成不可处理的情况。 建议您在这样的块主体中处理异常。 然而,你如果没能这么做,块就表现得好像是它被取消了,而且不会处理传入消息。
当数据流块显式取消时,AggregateException 对象将在 OperationCanceledException 属性中包含 InnerExceptions。 有关数据流取消的详细信息,请参阅 “启用取消 ”部分。
确定数据流块完成状态的第二种方法是使用完成任务的延续,或使用 C# 和 Visual Basic 的异步语言功能异步等待完成任务。 您提供给 Task.ContinueWith 方法的委托采用表示前面任务的 Task 对象。 就 Completion 属性来说,延续的委托自行采用完成任务。 以下示例与上一个示例类似,只不过它还使用ContinueWith 方法创建一个延续任务,以打印数据流操作的状态。
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine($"n = {n}");
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
return true;
});
}
/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Create a continuation task that prints the overall
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' The status of the completion task is 'Faulted'.
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
还可以使用延续任务正文等 IsCanceled 属性来确定有关数据流块完成状态的其他信息。 有关延续任务及其与取消和错误处理的关系的更多信息,请参阅 通过使用延续任务来链接任务、任务取消和异常处理。
预定义数据流块类型
TPL 数据流库提供多个预定义的数据流块类型。 这些类型分为三个类别: 缓冲块、 执行块和 分组块。 以下部分介绍构成这些类别的块类型。
缓冲块
缓冲块保存数据供数据使用者使用。 TPL 数据流库提供三种缓冲块类型: System.Threading.Tasks.Dataflow.BufferBlock<T>、 System.Threading.Tasks.Dataflow.BroadcastBlock<T>和 System.Threading.Tasks.Dataflow.WriteOnceBlock<T>。
BufferBlock<T>
该 BufferBlock<T> 类表示常规用途异步消息传送结构。 该类存储一个先进先出(FIFO)消息队列,这些消息可以由多个源写入,也可以由多个目标读取。 当目标从 BufferBlock<T> 对象接收消息时,该消息将从消息队列中删除。 因此,尽管一个 BufferBlock<T> 对象可以有多个目标,但只有一个目标将接收每个消息。 如果要将多个消息传递给另一个组件,并且该组件必须接收每个消息,该 BufferBlock<T> 类非常有用。
下面的基本示例将多个 Int32 值发布到对象 BufferBlock<T> ,然后从该对象中读取这些值。
// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
/* Output:
0
1
2
*/
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()
' Post several messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
' Output:
' 0
' 1
' 2
'
有关演示如何向对象写入和读取消息 BufferBlock<T> 的完整示例,请参阅 “如何:向数据流块写入消息”和“从数据流块读取消息”。
BroadcastBlock<T>
当必须将多个消息传递给另一个组件时,该 BroadcastBlock<T> 类非常有用,但该组件只需要最新的值。 需向多个组件广播消息时,此类也很有用。
以下基本示例将值 Double 发布到对象 BroadcastBlock<T> ,然后多次从该对象中读取该值。 由于值在读取后不会从 BroadcastBlock<T> 对象中删除,因此每次都提供相同的值。
// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);
// Post a message to the block.
broadcastBlock.Post(Math.PI);
// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(broadcastBlock.Receive());
}
/* Output:
3.14159265358979
3.14159265358979
3.14159265358979
*/
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)
' Post a message to the block.
broadcastBlock.Post(Math.PI)
' Receive the messages back from the block several times.
For i As Integer = 0 To 2
Console.WriteLine(broadcastBlock.Receive())
Next i
' Output:
' 3.14159265358979
' 3.14159265358979
' 3.14159265358979
'
有关演示如何用于 BroadcastBlock<T> 将消息广播到多个目标块的完整示例,请参阅 “如何:在数据流块中指定任务计划程序”。
WriteOnceBlock<T>
该 WriteOnceBlock<T> 类类似于 BroadcastBlock<T> 类,只是 WriteOnceBlock<T> 对象只能写入一次。 可以将 WriteOnceBlock<T> 视为类似于 C# 的 readonly(在 Visual Basic 中为 ReadOnly)关键字,只是 WriteOnceBlock<T> 对象是在接收到值之后而不是在构造时变得不可变的。 BroadcastBlock<T>与类一样,当目标从WriteOnceBlock<T>对象接收消息时,该消息不会从该对象中删除。 因此,多个目标接收消息的副本。 如果只想传播多个消息中的第一个消息,则 WriteOnceBlock<T> 类非常有用。
下面的基本示例将多个 String 值发布到对象 WriteOnceBlock<T> ,然后从该对象中读取该值。 WriteOnceBlock<T> 由于对象只能写入一次,因此WriteOnceBlock<T> 对象在收到消息后会放弃后续消息。
// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);
// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
() => writeOnceBlock.Post("Message 1"),
() => writeOnceBlock.Post("Message 2"),
() => writeOnceBlock.Post("Message 3"));
// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());
/* Sample output:
Message 2
*/
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)
' Post several messages to the block in parallel. The first
' message to be received is written to the block.
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))
' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())
' Sample output:
' Message 2
'
有关展示了如何使用 WriteOnceBlock<T> 接收完成的第一项操作值的完整示例,请参阅如何:取消链接数据流块。
执行块
执行块为每条接收数据调用用户提供的委托。 TPL 数据流库提供三种执行块类型: ActionBlock<TInput>、 System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>和 System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>。
ActionBlock<T>
ActionBlock<TInput> 类在接收数据时是调用委托的目标块。 将 ActionBlock<TInput> 对象视为数据可用时异步运行的委托。 提供给ActionBlock<TInput>对象的委托可以是Action<T>类型或System.Func<TInput, Task>
类型。 当通过 ActionBlock<TInput> 使用 Action<T> 对象时,每个输入元素的处理在委托返回时视为已完成。 使用ActionBlock<TInput>对象与System.Func<TInput, Task>
时,仅当返回的Task对象完成时,才会将每个输入元素的处理视为已完成。 通过使用这两种机制,可以同时用于 ActionBlock<TInput> 每个输入元素的同步处理和异步处理。
以下基本示例将多个 Int32 值发布到对象 ActionBlock<TInput> 。 对象 ActionBlock<TInput> 将这些值打印到控制台。 然后,此示例将块设置为已完成状态,并等待所有数据流任务完成。
// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
actionBlock.Post(i * 10);
}
// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();
/* Output:
0
10
20
*/
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))
' Post several messages to the block.
For i As Integer = 0 To 2
actionBlock.Post(i * 10)
Next i
' Set the block to the completed state and wait for all
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()
' Output:
' 0
' 10
' 20
'
有关展示了如何结合使用委托和 ActionBlock<TInput> 类的完整示例,请参阅如何:在数据流块收到数据时执行操作。
TransformBlock<TInput、TOutput>
该 TransformBlock<TInput,TOutput> 类类似于 ActionBlock<TInput> 类,只不过它既充当源又充当目标。 传递给 TransformBlock<TInput,TOutput> 对象的委托返回类型为 TOutput
的值。 提供给TransformBlock<TInput,TOutput>对象的委托可以是System.Func<TInput, TOutput>
类型或System.Func<TInput, Task<TOutput>>
类型。 当您将TransformBlock<TInput,TOutput>对象与System.Func<TInput, TOutput>
一起使用时,每个输入元素的处理被视为在委托返回后完成。 使用一个与TransformBlock<TInput,TOutput>一起使用的System.Func<TInput, Task<TOutput>>
对象时,仅当返回的Task<TResult>对象完成时,才会将每个输入元素的处理视为完成。 与ActionBlock<TInput>一样,通过使用这两种机制,您可以将TransformBlock<TInput,TOutput>用于每个输入元素的同步和异步处理。
以下基本示例创建一个 TransformBlock<TInput,TOutput> 对象,该对象计算其输入的平方根。 该 TransformBlock<TInput,TOutput> 对象采用 Int32 值作为输入并生成 Double 值作为输出。
// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));
// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);
// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(transformBlock.Receive());
}
/* Output:
3.16227766016838
4.47213595499958
5.47722557505166
*/
' Create a TransformBlock<int, double> object that
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))
' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)
' Read the output messages from the block.
For i As Integer = 0 To 2
Console.WriteLine(transformBlock.Receive())
Next i
' Output:
' 3.16227766016838
' 4.47213595499958
' 5.47722557505166
'
有关展示了如何在数据流块网络中使用 TransformBlock<TInput,TOutput> 在 Windows 窗体应用中执行图像处理的完整示例,请参阅演练:在 Windows 窗体应用中使用数据流。
TransformManyBlock<TInput、TOutput>
该 TransformManyBlock<TInput,TOutput> 类类似于 TransformBlock<TInput,TOutput> 类,但为每个 TransformManyBlock<TInput,TOutput> 输入值生成零个或多个输出值,而不是为每个输入值生成一个输出值。 提供给TransformManyBlock<TInput,TOutput>对象的委托可以是System.Func<TInput, IEnumerable<TOutput>>
类型或System.Func<TInput, Task<IEnumerable<TOutput>>>
类型。 当您将TransformManyBlock<TInput,TOutput>对象与System.Func<TInput, IEnumerable<TOutput>>
一起使用时,每个输入元素的处理被视为在委托返回后完成。 使用 TransformManyBlock<TInput,TOutput> 对象和 System.Func<TInput, Task<IEnumerable<TOutput>>>
时,只有当返回的 System.Threading.Tasks.Task<IEnumerable<TOutput>>
对象完成后,才会将每个输入元素的处理视为已完成。
以下基本示例创建一个 TransformManyBlock<TInput,TOutput> 对象,该对象将字符串拆分为其各个字符序列。 该 TransformManyBlock<TInput,TOutput> 对象采用 String 值作为输入并生成 Char 值作为输出。
// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
s => s.ToCharArray());
// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");
// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
Console.WriteLine(transformManyBlock.Receive());
}
/* Output:
H
e
l
l
o
W
o
r
l
d
*/
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())
' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")
' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
Console.WriteLine(transformManyBlock.Receive())
Next i
' Output:
' H
' e
' l
' l
' o
' W
' o
' r
' l
' d
'
有关用于 TransformManyBlock<TInput,TOutput> 为数据流管道中的每个输入生成多个独立输出的完整示例,请参阅 演练:创建数据流管道。
并行度
每个 ActionBlock<TInput>、TransformBlock<TInput,TOutput> 和 TransformManyBlock<TInput,TOutput> 对象都会缓冲输入消息,直到块准备好处理它们。 默认情况下,这些类按接收消息的顺序处理消息,一次一条消息。 还可以指定并行度,以便 ActionBlock<TInput>、TransformBlock<TInput,TOutput> 和 TransformManyBlock<TInput,TOutput> 对象可以同时处理多个消息。 有关并发执行的详细信息,请参阅本文档后面的“指定并行度”部分。 有关设置并行度以使执行数据流块一次处理多个消息的示例,请参阅 如何:在数据流块中指定并行度。
委托类型概述
下表汇总了可以提供给 ActionBlock<TInput>、TransformBlock<TInput,TOutput> 和 TransformManyBlock<TInput,TOutput> 对象的委托类型。 此表还指定委托类型是同步还是异步运行。
类型 | 同步委托类型 | 异步委托类型 |
---|---|---|
ActionBlock<TInput> | System.Action |
System.Func<TInput, Task> |
TransformBlock<TInput,TOutput> | System.Func<TInput, TOutput> |
System.Func<TInput, Task<TOutput>> |
TransformManyBlock<TInput,TOutput> | System.Func<TInput, IEnumerable<TOutput>> |
System.Func<TInput, Task<IEnumerable<TOutput>>> |
使用执行块类型时,还可以使用 lambda 表达式。 有关如何将 lambda 表达式与执行代码块配合使用的示例,请参阅 “如何:在数据流块接收数据时执行操作”。
分组块
分组块在各种约束下合并一个或多个源的数据。 TPL 数据流库提供三种联接块类型: BatchBlock<T>、 JoinBlock<T1,T2>和 BatchedJoinBlock<T1,T2>。
BatchBlock<T>
该 BatchBlock<T> 类将一组输入数据(称为批处理)合并到输出数据的数组中。 创建BatchBlock<T>对象时,请指定每个批次的大小。 BatchBlock<T>当对象接收指定的输入元素计数时,它会异步传播包含这些元素的数组。 如果对象 BatchBlock<T> 设置为已完成状态,但不包含足够的元素来形成批处理,则会传播包含剩余输入元素的最终数组。
该 BatchBlock<T> 类以 贪婪 或 非贪婪 模式运行。 在贪婪模式(默认值)中,对象 BatchBlock<T> 接受它提供的每个消息,并在收到指定元素计数后传播数组。 在非贪婪模式下, BatchBlock<T> 对象会推迟所有传入消息,直到有足够的源向块提供消息以形成批处理。 贪婪模式的性能通常优于非贪婪模式,因为它需要更少的处理开销。 但是,当必须以原子方式协调来自多个源的消耗时,可以使用非贪婪模式。 在Greedy构造函数的False
参数中,将dataflowBlockOptions
设置为BatchBlock<T>以指定非贪婪模式。
以下基本示例将多个 Int32 值发布到一个 BatchBlock<T> 对象,该对象包含一个批处理中的十个元素。 为了保证所有值都从中 BatchBlock<T>传播出来,此示例调用该方法 Complete 。 该方法 Complete 将 BatchBlock<T> 对象设置为已完成状态,因此,该 BatchBlock<T> 对象会将所有剩余元素作为最终批处理传播出来。
// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);
// Post several values to the block.
for (int i = 0; i < 13; i++)
{
batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();
// Print the sum of both batches.
Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");
Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");
/* Output:
The sum of the elements in batch 1 is 45.
The sum of the elements in batch 2 is 33.
*/
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)
' Post several values to the block.
For i As Integer = 0 To 12
batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()
' Print the sum of both batches.
Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())
Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())
' Output:
' The sum of the elements in batch 1 is 45.
' The sum of the elements in batch 2 is 33.
'
有关使用 BatchBlock<T> 提高数据库插入作业效率的完整示例,请参阅 演练:使用 BatchBlock 和 BatchedJoinBlock 提高效率。
JoinBlock<T1、T2、...>
JoinBlock<T1,T2>和JoinBlock<T1,T2,T3>类收集输入元素,并传播包含这些元素的System.Tuple<T1,T2>或System.Tuple<T1,T2,T3>对象。 JoinBlock<T1,T2>类和JoinBlock<T1,T2,T3>类不继承自ITargetBlock<TInput>。 相反,它们提供Target1、Target2和Target3这些实现ITargetBlock<TInput>的属性。
类似于 BatchBlock<T>,JoinBlock<T1,T2> 和 JoinBlock<T1,T2,T3> 可以在贪婪或非贪婪模式下运行。 在默认贪婪模式下,JoinBlock<T1,T2> 或 JoinBlock<T1,T2,T3> 对象在其每个目标接收至少一条消息之后接受提供的每条消息并传播元组。 在非贪婪模式下,JoinBlock<T1,T2>或JoinBlock<T1,T2,T3>对象会推迟所有传入消息,直到所有目标已经被提供用于创建元组所需的数据。 此时,块参与两阶段提交协议,以原子方式从源中检索所有必需的项。 这种推迟使另一个实体能够同时使用数据,使整个系统能够向前推进。
以下基本示例演示了一 JoinBlock<T1,T2,T3> 个对象需要多个数据来计算值的情况。 此示例创建一个对象,该对象需要两JoinBlock<T1,T2,T3>个Int32值和一个Char值来执行算术运算。
// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();
// Post two values to each target of the join.
joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);
joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);
joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');
// Receive each group of values and apply the operator part
// to the number parts.
for (int i = 0; i < 2; i++)
{
var data = joinBlock.Receive();
switch (data.Item3)
{
case '+':
Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
break;
case '-':
Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
break;
default:
Console.WriteLine($"Unknown operator '{data.Item3}'.");
break;
}
}
/* Output:
3 + 5 = 8
6 - 4 = 2
*/
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()
' Post two values to each target of the join.
joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)
joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)
joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)
' Receive each group of values and apply the operator part
' to the number parts.
For i As Integer = 0 To 1
Dim data = joinBlock.Receive()
Select Case data.Item3
Case "+"c
Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
Case "-"c
Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
Case Else
Console.WriteLine("Unknown operator '{0}'.", data.Item3)
End Select
Next i
' Output:
' 3 + 5 = 8
' 6 - 4 = 2
'
有关使用 JoinBlock<T1,T2> 非贪婪模式下的对象协作共享资源的完整示例,请参阅 How to: Use JoinBlock to Read Data From Multiple Sources.
BatchedJoinBlock<T1、T2、...>
这些 BatchedJoinBlock<T1,T2> 和 BatchedJoinBlock<T1,T2,T3> 类收集一批输入元素,并传播出 System.Tuple(IList(T1), IList(T2))
或 System.Tuple(IList(T1), IList(T2), IList(T3))
包含这些元素的对象。 请把BatchedJoinBlock<T1,T2>当作BatchBlock<T>和JoinBlock<T1,T2>的组合。 指定创建 BatchedJoinBlock<T1,T2> 对象时每个批的大小。
BatchedJoinBlock<T1,T2>还提供实现Target1的属性Target2和 ITargetBlock<TInput>。 当从所有目标接收指定输入元素计数时,该 BatchedJoinBlock<T1,T2> 对象会异步传播 System.Tuple(IList(T1), IList(T2))
包含这些元素的对象。
以下基本示例创建一个BatchedJoinBlock<T1,T2>对象,该对象保存结果、Int32值和Exception对象形式的错误。 此示例执行多个操作,并将结果写入 Target1 属性,将错误写入 Target2 属性,属于 BatchedJoinBlock<T1,T2> 对象。 由于成功和失败的作计数事先未知,因此 IList<T> 对象使每个目标都能接收零个或多个值。
// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
if (n < 0)
throw new ArgumentOutOfRangeException();
return n;
};
// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);
// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
try
{
// Post the result of the worker to the
// first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i));
}
catch (ArgumentOutOfRangeException e)
{
// If an error occurred, post the Exception to the
// second target of the block.
batchedJoinBlock.Target2.Post(e);
}
}
// Read the results from the block.
var results = batchedJoinBlock.Receive();
// Print the results to the console.
// Print the results.
foreach (int n in results.Item1)
{
Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
Console.WriteLine(e.Message);
}
/* Output:
5
6
13
55
0
Specified argument was out of the range of valid values.
Specified argument was out of the range of valid values.
*/
' For demonstration, create a Func<int, int> that
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
Return n
End Function
' Create a BatchedJoinBlock<int, Exception> object that holds
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)
' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
Try
' Post the result of the worker to the
' first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i))
Catch e As ArgumentOutOfRangeException
' If an error occurred, post the Exception to the
' second target of the block.
batchedJoinBlock.Target2.Post(e)
End Try
Next i
' Read the results from the block.
Dim results = batchedJoinBlock.Receive()
' Print the results to the console.
' Print the results.
For Each n As Integer In results.Item1
Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
Console.WriteLine(e.Message)
Next e
' Output:
' 5
' 6
' 13
' 55
' 0
' Specified argument was out of the range of valid values.
' Specified argument was out of the range of valid values.
'
有关使用 BatchedJoinBlock<T1,T2> 捕获程序从数据库读取时产生的结果和任何异常的完整示例,请参阅 演练:使用 BatchBlock 和 BatchedJoinBlock 提高效率。
配置数据流块行为
可以通过向数据流块类型的构造函数提供 System.Threading.Tasks.Dataflow.DataflowBlockOptions 对象来启用其他选项。 这些选项控制诸如管理底层任务的调度程序和并行度等行为。 DataflowBlockOptions此外,还具有派生类型,用于指定特定于某些数据流块类型的行为。 下表总结了哪些选项类型与每个数据流块类型相关联。
以下各节提供了有关可通过System.Threading.Tasks.Dataflow.DataflowBlockOptions和System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionsSystem.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions类提供的数据流块选项的重要类型的其他信息。
任务调度器的指定
每个预定义的数据流块都使用 TPL 任务计划机制执行活动,例如将数据传播到目标、从源接收数据,以及在数据可用时运行用户定义的委托。 TaskScheduler 是一个抽象类,表示将任务排入线程的任务计划程序。 默认任务计划程序 Default使用 ThreadPool 类对工作进行排队和执行。 可以通过在构造数据流块对象时设置 TaskScheduler 属性来替代默认任务计划程序。
当同一任务计划程序管理多个数据流块时,它可以跨这些块强制实施策略。 例如,如果每个数据流块都配置为面向同一 ConcurrentExclusiveSchedulerPair 对象的独占计划程序,则会序列化跨这些块运行的所有工作。 同样,如果将这些块配置为面向同一个ConcurrentExclusiveSchedulerPair对象的并发调度器,并且该调度器配置了最大并发数,则这些块的所有工作仅限于该数量的并发操作。 有关展示了如何使用 ConcurrentExclusiveSchedulerPair 类让读取操作并行执行(但写入操作独立于其他所有操作)的示例,请参阅如何:在数据流块中指定任务计划程序。 有关 TPL 中的任务计划程序的详细信息,请参阅 TaskScheduler 类主题。
指定并行度
默认情况下,TPL 数据流库提供的三种执行块类型ActionBlock<TInput>、TransformBlock<TInput,TOutput> 和 TransformManyBlock<TInput,TOutput>一次处理一条消息。 这些数据流块类型还会按照接收消息的顺序处理消息。 若要启用这些数据流块以并发处理消息,在构造数据流块对象时设置 ExecutionDataflowBlockOptions.MaxDegreeOfParallelism 属性。
默认值 MaxDegreeOfParallelism 为 1,它保证数据流块一次处理一条消息。 将此属性设置为大于 1 的值可使数据流块并发处理多个消息。 将此属性设置为 DataflowBlockOptions.Unbounded 使基础任务计划程序能够管理最大并发度。
重要
指定大于 1 的最大并行度时,将同时处理多个消息,因此可能不会按照接收消息的顺序进行处理。 但是,消息从块输出的顺序与接收消息的顺序相同。
由于该 MaxDegreeOfParallelism 属性表示最大并行度,因此数据流块的执行程度可能比指定的并行度要小。 数据流块可能会使用较少程度的并行度来满足其功能要求,或者因为缺少可用的系统资源。 数据流块永远不会选择比指定更多的并行度。
该属性的值 MaxDegreeOfParallelism 对于每个数据流块对象都是独占的。 例如,如果每个数据流块对象为最大并行度指定 1 个,则所有四个数据流块对象都可以并行运行。
有关设置最大并行度以启用并行冗长操作的示例,请参阅如何:指定数据流块中的并行度。
指定每个任务的消息数
预定义的数据流块类型使用任务来处理多个输入元素。 这有助于最大程度地减少处理数据所需的任务对象数,从而使应用程序能够更高效地运行。 但是,当一组数据流处理块的任务在处理数据时,其他数据流处理块的任务可能需要通过消息排队来等待处理时间。 若要在数据流任务之间实现更好的公平性,请设置 MaxMessagesPerTask 属性。 当 MaxMessagesPerTask 设置为 DataflowBlockOptions.Unbounded默认值时,数据流块使用的任务将处理尽可能多的消息。 如果 MaxMessagesPerTask 设置为非 Unbounded 的值,则数据流块在每个 Task 对象中最多处理该数量的消息。 尽管设置 MaxMessagesPerTask 属性可以提高任务之间的公平性,但它可能会导致系统创建比必要的任务多,这可以减少性能。
启用取消功能
TPL 提供了一种机制,能使任务以一种合作的方式协调取消。 若要使数据流块能够参与此取消机制,请设置 CancellationToken 该属性。 当此 CancellationToken 对象设置为已取消状态时,监视此令牌的所有数据流块将完成当前项的执行,但不开始处理后续项。 这些数据流块还清除任何缓冲的消息、释放与任何源和目标块的连接,以及转换为已取消的状态。 通过转换为已取消状态,该 Completion 属性将 Status 属性设置为 Canceled,除非在处理过程中发生异常。 在这种情况下, Status 设置为 Faulted.
有关演示如何在 Windows 窗体应用程序中使用取消的示例,请参阅 “如何:取消数据流块”。 有关 TPL 中的取消的详细信息,请参阅 任务取消。
指定贪婪与非贪婪行为
多个分组数据流块类型可以在 贪婪 或 非贪婪 模式下运行。 默认情况下,预定义的数据流块类型以贪婪模式运行。
对于联接块类型(例如 JoinBlock<T1,T2>),贪婪模式意味着即使用于联接的相应数据尚未可用,该块也会立即接受数据。 非贪婪模式意味着块推迟所有传入的消息,直到在其每个目标上有一个可完成联接。 如果任何推迟的消息不再可用,则联接块会释放所有已推迟的消息并重启进程。 BatchBlock<T>对于类来说,贪婪和非贪婪行为是类似的,但非贪婪模式下,BatchBlock<T>对象会推迟所有传入消息,直到从不同的源获取足够的消息才能完成批处理。
若要为数据流块指定非贪婪模式,请设置为 GreedyFalse
。 有关演示如何使用非贪婪模式使多个联接块更有效地共享数据源的示例,请参阅 How to: Use JoinBlock to Read Data From Multiple Sources.
自定义数据流块
尽管 TPL 数据流库提供了许多预定义的块类型,但可以创建执行自定义行为的附加块类型。 直接实现ISourceBlock<TOutput>或ITargetBlock<TInput>接口,或者使用Encapsulate方法生成封装现有块类型行为的复杂块。 有关如何实现自定义数据流块功能的示例,请参阅 演练:创建自定义数据流块类型。
相关主题
标题 | DESCRIPTION |
---|---|
如何:向数据流块写入和读取消息 | 演示如何向对象写入消息并从中 BufferBlock<T> 读取消息。 |
如何:实现 Producer-Consumer 数据流模式 | 介绍如何使用数据流模型实现生成者-使用者模式,其中生成者将消息发送到数据流块,使用者从该块读取消息。 |
如何在数据流块接收数据时执行操作 | 说明如何为ActionBlock<TInput>、TransformBlock<TInput,TOutput>和TransformManyBlock<TInput,TOutput>等执行数据流块类型提供委托。 |
演练:创建数据流管道 | 介绍如何创建一个数据流管道,该管道从 Web 下载文本并对该文本进行操作。 |
如何:取消链接数据流块 | 演示如何在源向目标提供消息后,使用 LinkTo 方法将目标块从源中取消链接。 |
演练:在 Windows 窗体应用程序中使用数据流 | 演示如何创建在 Windows 窗体应用程序中执行图像处理的数据流块网络。 |
如何:取消数据流块 | 演示如何在 Windows 窗体应用程序中使用取消操作。 |
如何:使用 JoinBlock 从多个源读取数据 | 介绍如何在多个源提供数据时使用该 JoinBlock<T1,T2> 类来执行作,以及如何使用非贪婪模式使多个联接块更有效地共享数据源。 |
如何:在数据流块中指定并行度 | 介绍如何设置 MaxDegreeOfParallelism 属性以启用执行数据流块以一次处理多个消息。 |
如何:在数据流块中指定任务计划程序 | 演示如何在应用程序中使用数据流时关联特定任务计划程序。 |
演练:使用 BatchBlock 和 BatchedJoinBlock 提高效率 | 介绍如何使用 BatchBlock<T> 类来提高数据库插入作的效率,以及如何使用 BatchedJoinBlock<T1,T2> 该类捕获在程序从数据库读取时发生的结果和任何异常。 |
演练:创建自定义数据流块类型 | 演示了创建实现自定义行为的数据流块类型的两种方法。 |
任务并行库 (TPL) | 介绍 TPL,该库简化了 .NET Framework 应用程序中的并行和并发编程。 |