如何:在数据流块中指定任务计划程序
本文档演示在应用程序中使用数据流时如何关联特定任务计划程序。 示例在 Windows 窗体应用程序中使用 System.Threading.Tasks.ConcurrentExclusiveSchedulerPair 类来显示读取器任务处于活动状态的时间和编写器任务处于活动状态的时间。 它还使用 TaskScheduler.FromCurrentSynchronizationContext 方法使数据流块能够在用户界面线程上运行。
注意
TPL 数据流库(System.Threading.Tasks.Dataflow 命名空间)不随 .NET 一起分发。 若要在 Visual Studio 中安装 System.Threading.Tasks.Dataflow 命名空间,请打开项目,选择“项目”菜单中的“管理 NuGet 包”,再在线搜索 System.Threading.Tasks.Dataflow
包。 或者,若要使用 .NET Core CLI 进行安装,请运行 dotnet add package System.Threading.Tasks.Dataflow
。
创建 Windows 窗体应用程序
创建一个 Visual C# 或 Visual Basic“Windows 窗体应用程序”项目。 在以下步骤中,该项目命名为
WriterReadersWinForms
。在主窗体的窗体设计器中,Form1.cs(对于 Visual Basic 则为 Form1.vb)添加了四个 CheckBox 控件。 将
checkBox1
、checkBox2
、checkBox3
、checkBox4
的 Text 属性分别设置为“读取器 1”Text、“读取器 2”checkBox1
、“读取器 3”checkBox2
和“编写器”checkBox3
。 将每个控件的 Enabled 属性设置为False
。
添加数据流功能
本节介绍如何创建参与应用程序的数据流块以及如何将每个数据流块与任务计划程序关联。
在应用程序中添加数据流功能
在项目中,添加对 System.Threading.Tasks.Dataflow.dll 的引用。
确保 Form1.cs(对于 Visual Basic 则为 Form1.vb)包含以下
using
指令(Visual Basic 中为Imports
)。using System; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using System.Windows.Forms;
Imports System.Threading Imports System.Threading.Tasks Imports System.Threading.Tasks.Dataflow
将 BroadcastBlock<T> 数据成员添加到
Form1
类。// Broadcasts values to an ActionBlock<int> object that is associated // with each check box. BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);
' Broadcasts values to an ActionBlock<int> object that is associated ' with each check box. Private broadcaster As New BroadcastBlock(Of Integer)(Nothing)
在
Form1
构造函数中,在调用InitializeComponent
后,创建一个 ActionBlock<TInput> 对象,该对象将切换 CheckBox 对象的状态。// Create an ActionBlock<CheckBox> object that toggles the state // of CheckBox objects. // Specifying the current synchronization context enables the // action to run on the user-interface thread. var toggleCheckBox = new ActionBlock<CheckBox>(checkBox => { checkBox.Checked = !checkBox.Checked; }, new ExecutionDataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() });
' Create an ActionBlock<CheckBox> object that toggles the state ' of CheckBox objects. ' Specifying the current synchronization context enables the ' action to run on the user-interface thread. Dim toggleCheckBox = New ActionBlock(Of CheckBox)(Sub(checkBox) checkBox.Checked = Not checkBox.Checked, New ExecutionDataflowBlockOptions With {.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()})
在
Form1
构造函数中,创建一个 ConcurrentExclusiveSchedulerPair 对象和四个 ActionBlock<TInput> 对象,每个 ActionBlock<TInput> 对象对应一个 CheckBox 对象。 对每个 ActionBlock<TInput> 对象,指定一个 ExecutionDataflowBlockOptions 对象,该对象将读取器的 TaskScheduler 属性设置为 ConcurrentScheduler 属性,将编写器的设置为 ExclusiveScheduler 属性。// Create a ConcurrentExclusiveSchedulerPair object. // Readers will run on the concurrent part of the scheduler pair. // The writer will run on the exclusive part of the scheduler pair. var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(); // Create an ActionBlock<int> object for each reader CheckBox object. // Each ActionBlock<int> object represents an action that can read // from a resource in parallel to other readers. // Specifying the concurrent part of the scheduler pair enables the // reader to run in parallel to other actions that are managed by // that scheduler. var readerActions = from checkBox in new CheckBox[] {checkBox1, checkBox2, checkBox3} select new ActionBlock<int>(milliseconds => { // Toggle the check box to the checked state. toggleCheckBox.Post(checkBox); // Perform the read action. For demonstration, suspend the current // thread to simulate a lengthy read operation. Thread.Sleep(milliseconds); // Toggle the check box to the unchecked state. toggleCheckBox.Post(checkBox); }, new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerPair.ConcurrentScheduler }); // Create an ActionBlock<int> object for the writer CheckBox object. // This ActionBlock<int> object represents an action that writes to // a resource, but cannot run in parallel to readers. // Specifying the exclusive part of the scheduler pair enables the // writer to run in exclusively with respect to other actions that are // managed by the scheduler pair. var writerAction = new ActionBlock<int>(milliseconds => { // Toggle the check box to the checked state. toggleCheckBox.Post(checkBox4); // Perform the write action. For demonstration, suspend the current // thread to simulate a lengthy write operation. Thread.Sleep(milliseconds); // Toggle the check box to the unchecked state. toggleCheckBox.Post(checkBox4); }, new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerPair.ExclusiveScheduler }); // Link the broadcaster to each reader and writer block. // The BroadcastBlock<T> class propagates values that it // receives to all connected targets. foreach (var readerAction in readerActions) { broadcaster.LinkTo(readerAction); } broadcaster.LinkTo(writerAction);
' Create a ConcurrentExclusiveSchedulerPair object. ' Readers will run on the concurrent part of the scheduler pair. ' The writer will run on the exclusive part of the scheduler pair. Dim taskSchedulerPair = New ConcurrentExclusiveSchedulerPair() ' Create an ActionBlock<int> object for each reader CheckBox object. ' Each ActionBlock<int> object represents an action that can read ' from a resource in parallel to other readers. ' Specifying the concurrent part of the scheduler pair enables the ' reader to run in parallel to other actions that are managed by ' that scheduler. Dim readerActions = From checkBox In New CheckBox() {checkBox1, checkBox2, checkBox3} _ Select New ActionBlock(Of Integer)(Sub(milliseconds) ' Toggle the check box to the checked state. ' Perform the read action. For demonstration, suspend the current ' thread to simulate a lengthy read operation. ' Toggle the check box to the unchecked state. toggleCheckBox.Post(checkBox) Thread.Sleep(milliseconds) toggleCheckBox.Post(checkBox) End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ConcurrentScheduler}) ' Create an ActionBlock<int> object for the writer CheckBox object. ' This ActionBlock<int> object represents an action that writes to ' a resource, but cannot run in parallel to readers. ' Specifying the exclusive part of the scheduler pair enables the ' writer to run in exclusively with respect to other actions that are ' managed by the scheduler pair. Dim writerAction = New ActionBlock(Of Integer)(Sub(milliseconds) ' Toggle the check box to the checked state. ' Perform the write action. For demonstration, suspend the current ' thread to simulate a lengthy write operation. ' Toggle the check box to the unchecked state. toggleCheckBox.Post(checkBox4) Thread.Sleep(milliseconds) toggleCheckBox.Post(checkBox4) End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ExclusiveScheduler}) ' Link the broadcaster to each reader and writer block. ' The BroadcastBlock<T> class propagates values that it ' receives to all connected targets. For Each readerAction In readerActions broadcaster.LinkTo(readerAction) Next readerAction broadcaster.LinkTo(writerAction)
在
Form1
构造函数中,启动 Timer 对象。// Start the timer. timer1.Start();
' Start the timer. timer1.Start()
在主窗体的窗体设计器中,为计时器的 Tick 事件创建事件处理程序。
实现计时器的 Tick 事件。
// Event handler for the timer. private void timer1_Tick(object sender, EventArgs e) { // Post a value to the broadcaster. The broadcaster // sends this message to each target. broadcaster.Post(1000); }
' Event handler for the timer. Private Sub timer1_Tick(ByVal sender As Object, ByVal e As EventArgs) Handles timer1.Tick ' Post a value to the broadcaster. The broadcaster ' sends this message to each target. broadcaster.Post(1000) End Sub
因为 toggleCheckBox
数据流块是在用户界面上操作,所以此操作要在用户界面线程上执行,这一点很重要。 为此,在构造期间,此对象提供一个将 TaskScheduler 属性设置为 TaskScheduler.FromCurrentSynchronizationContext 的 ExecutionDataflowBlockOptions 对象。 FromCurrentSynchronizationContext 方法会创建一个在当前同步上下文中执行工作的 TaskScheduler 对象。 因为 Form1
构造函数是从用户界面线程中调用的,所以 toggleCheckBox
数据流块的操作也会在用户线程中运行。
对于在同一 ConcurrentExclusiveSchedulerPair 对象上运行的所有其他数据流块,此示例还使用了 ConcurrentExclusiveSchedulerPair 类使某些数据流块能够并发操作,而使其他数据流块能够单独执行。 当多个数据流块共享资源,但有些需要对该资源独占访问时,这种技术很有用,因为它不需要手动同步对该资源的访问。 手动同步的消除会使代码更高效。
示例
下面的示例演示 Form1.cs(对于 Visual Basic 则为 Form1.vb)的完整代码。
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;
namespace WriterReadersWinForms
{
public partial class Form1 : Form
{
// Broadcasts values to an ActionBlock<int> object that is associated
// with each check box.
BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);
public Form1()
{
InitializeComponent();
// Create an ActionBlock<CheckBox> object that toggles the state
// of CheckBox objects.
// Specifying the current synchronization context enables the
// action to run on the user-interface thread.
var toggleCheckBox = new ActionBlock<CheckBox>(checkBox =>
{
checkBox.Checked = !checkBox.Checked;
},
new ExecutionDataflowBlockOptions
{
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
});
// Create a ConcurrentExclusiveSchedulerPair object.
// Readers will run on the concurrent part of the scheduler pair.
// The writer will run on the exclusive part of the scheduler pair.
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
// Create an ActionBlock<int> object for each reader CheckBox object.
// Each ActionBlock<int> object represents an action that can read
// from a resource in parallel to other readers.
// Specifying the concurrent part of the scheduler pair enables the
// reader to run in parallel to other actions that are managed by
// that scheduler.
var readerActions =
from checkBox in new CheckBox[] {checkBox1, checkBox2, checkBox3}
select new ActionBlock<int>(milliseconds =>
{
// Toggle the check box to the checked state.
toggleCheckBox.Post(checkBox);
// Perform the read action. For demonstration, suspend the current
// thread to simulate a lengthy read operation.
Thread.Sleep(milliseconds);
// Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox);
},
new ExecutionDataflowBlockOptions
{
TaskScheduler = taskSchedulerPair.ConcurrentScheduler
});
// Create an ActionBlock<int> object for the writer CheckBox object.
// This ActionBlock<int> object represents an action that writes to
// a resource, but cannot run in parallel to readers.
// Specifying the exclusive part of the scheduler pair enables the
// writer to run in exclusively with respect to other actions that are
// managed by the scheduler pair.
var writerAction = new ActionBlock<int>(milliseconds =>
{
// Toggle the check box to the checked state.
toggleCheckBox.Post(checkBox4);
// Perform the write action. For demonstration, suspend the current
// thread to simulate a lengthy write operation.
Thread.Sleep(milliseconds);
// Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox4);
},
new ExecutionDataflowBlockOptions
{
TaskScheduler = taskSchedulerPair.ExclusiveScheduler
});
// Link the broadcaster to each reader and writer block.
// The BroadcastBlock<T> class propagates values that it
// receives to all connected targets.
foreach (var readerAction in readerActions)
{
broadcaster.LinkTo(readerAction);
}
broadcaster.LinkTo(writerAction);
// Start the timer.
timer1.Start();
}
// Event handler for the timer.
private void timer1_Tick(object sender, EventArgs e)
{
// Post a value to the broadcaster. The broadcaster
// sends this message to each target.
broadcaster.Post(1000);
}
}
}
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Namespace WriterReadersWinForms
Partial Public Class Form1
Inherits Form
' Broadcasts values to an ActionBlock<int> object that is associated
' with each check box.
Private broadcaster As New BroadcastBlock(Of Integer)(Nothing)
Public Sub New()
InitializeComponent()
' Create an ActionBlock<CheckBox> object that toggles the state
' of CheckBox objects.
' Specifying the current synchronization context enables the
' action to run on the user-interface thread.
Dim toggleCheckBox = New ActionBlock(Of CheckBox)(Sub(checkBox) checkBox.Checked = Not checkBox.Checked, New ExecutionDataflowBlockOptions With {.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()})
' Create a ConcurrentExclusiveSchedulerPair object.
' Readers will run on the concurrent part of the scheduler pair.
' The writer will run on the exclusive part of the scheduler pair.
Dim taskSchedulerPair = New ConcurrentExclusiveSchedulerPair()
' Create an ActionBlock<int> object for each reader CheckBox object.
' Each ActionBlock<int> object represents an action that can read
' from a resource in parallel to other readers.
' Specifying the concurrent part of the scheduler pair enables the
' reader to run in parallel to other actions that are managed by
' that scheduler.
Dim readerActions = From checkBox In New CheckBox() {checkBox1, checkBox2, checkBox3} _
Select New ActionBlock(Of Integer)(Sub(milliseconds)
' Toggle the check box to the checked state.
' Perform the read action. For demonstration, suspend the current
' thread to simulate a lengthy read operation.
' Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox)
Thread.Sleep(milliseconds)
toggleCheckBox.Post(checkBox)
End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ConcurrentScheduler})
' Create an ActionBlock<int> object for the writer CheckBox object.
' This ActionBlock<int> object represents an action that writes to
' a resource, but cannot run in parallel to readers.
' Specifying the exclusive part of the scheduler pair enables the
' writer to run in exclusively with respect to other actions that are
' managed by the scheduler pair.
Dim writerAction = New ActionBlock(Of Integer)(Sub(milliseconds)
' Toggle the check box to the checked state.
' Perform the write action. For demonstration, suspend the current
' thread to simulate a lengthy write operation.
' Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox4)
Thread.Sleep(milliseconds)
toggleCheckBox.Post(checkBox4)
End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ExclusiveScheduler})
' Link the broadcaster to each reader and writer block.
' The BroadcastBlock<T> class propagates values that it
' receives to all connected targets.
For Each readerAction In readerActions
broadcaster.LinkTo(readerAction)
Next readerAction
broadcaster.LinkTo(writerAction)
' Start the timer.
timer1.Start()
End Sub
' Event handler for the timer.
Private Sub timer1_Tick(ByVal sender As Object, ByVal e As EventArgs) Handles timer1.Tick
' Post a value to the broadcaster. The broadcaster
' sends this message to each target.
broadcaster.Post(1000)
End Sub
End Class
End Namespace