方法:データフロー ブロックのタスク スケジューラを指定する
このドキュメントでは、アプリケーションでデータ フローを使用する場合に特定のタスク スケジューラを関連付ける方法を示します。 この例では、Windows フォーム アプリケーションの System.Threading.Tasks.ConcurrentExclusiveSchedulerPair クラスを使用して、リーダー タスクがアクティブである場合と、ライター タスクがアクティブである場合を示します。 また、TaskScheduler.FromCurrentSynchronizationContext メソッドを使用してデータ フロー ブロックを有効にし、ユーザー インターフェイス スレッドで実行できるようにします。
注意
TPL データフロー ライブラリ (System.Threading.Tasks.Dataflow 名前空間) は、.NET と一緒には配布されません。 Visual Studio に System.Threading.Tasks.Dataflow 名前空間をインストールするには、プロジェクトを開き、[プロジェクト] メニューの [NuGet パッケージの管理] をクリックし、System.Threading.Tasks.Dataflow
パッケージをオンラインで検索します。 または、.NET Core CLI を使ってインストールするには、dotnet add package System.Threading.Tasks.Dataflow
を実行します。
Windows フォーム アプリケーションを作成するには
Visual C# または Visual Basic の Windows フォーム アプリケーション プロジェクトを作成します。 以降の手順では、プロジェクトの名前は
WriterReadersWinForms
とします。メイン フォーム Form1.cs (Visual Basic の Form1.vb) のフォーム デザイナーで、4 つの CheckBox コントロールを追加します。 Text プロパティを、
checkBox1
に対しては「リーダー 1」に、checkBox2
に対しては「リーダー 2」に、checkBox3
に対しては「リーダー 3」に、そしてcheckBox4
に対しては「ライター」に設定します。 コントロールごとに、Enabled プロパティをFalse
に設定します。
データ フロー機能の追加
このセクションでは、アプリケーションに参加するデータ フロー ブロックを作成する方法と、各データ フロー ブロックをタスク スケジューラとを関連付ける方法について説明します。
アプリケーションにデータ フロー機能を追加するには
プロジェクトで、System.Threading.Tasks.Dataflow.dll への参照を追加します。
Form1.cs (Visual Basic の場合は Form1.vb) に次の
using
ディレクティブ (Visual Basic ではImports
) が含まれていることを確認します。using System; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using System.Windows.Forms;
Imports System.Threading Imports System.Threading.Tasks Imports System.Threading.Tasks.Dataflow
BroadcastBlock<T> データ メンバーを
Form1
クラスに追加します。// Broadcasts values to an ActionBlock<int> object that is associated // with each check box. BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);
' Broadcasts values to an ActionBlock<int> object that is associated ' with each check box. Private broadcaster As New BroadcastBlock(Of Integer)(Nothing)
Form1
コンストラクターで、InitializeComponent
の呼び出しの後に、CheckBox オブジェクトの状態を切り替える ActionBlock<TInput> オブジェクトを作成します。// Create an ActionBlock<CheckBox> object that toggles the state // of CheckBox objects. // Specifying the current synchronization context enables the // action to run on the user-interface thread. var toggleCheckBox = new ActionBlock<CheckBox>(checkBox => { checkBox.Checked = !checkBox.Checked; }, new ExecutionDataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() });
' Create an ActionBlock<CheckBox> object that toggles the state ' of CheckBox objects. ' Specifying the current synchronization context enables the ' action to run on the user-interface thread. Dim toggleCheckBox = New ActionBlock(Of CheckBox)(Sub(checkBox) checkBox.Checked = Not checkBox.Checked, New ExecutionDataflowBlockOptions With {.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()})
Form1
コンストラクターで、1 つの ConcurrentExclusiveSchedulerPair オブジェクトと 4 つの ActionBlock<TInput> オブジェクト (各 CheckBox オブジェクトに 1 つの ActionBlock<TInput> オブジェクト) を作成します。 それぞれの ActionBlock<TInput> オブジェクトに対して ExecutionDataflowBlockOptions オブジェクトを指定します。このオブジェクトの TaskScheduler プロパティは、リーダーの場合は ConcurrentScheduler プロパティに設定し、ライターの場合は ExclusiveScheduler プロパティに設定します。// Create a ConcurrentExclusiveSchedulerPair object. // Readers will run on the concurrent part of the scheduler pair. // The writer will run on the exclusive part of the scheduler pair. var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(); // Create an ActionBlock<int> object for each reader CheckBox object. // Each ActionBlock<int> object represents an action that can read // from a resource in parallel to other readers. // Specifying the concurrent part of the scheduler pair enables the // reader to run in parallel to other actions that are managed by // that scheduler. var readerActions = from checkBox in new CheckBox[] {checkBox1, checkBox2, checkBox3} select new ActionBlock<int>(milliseconds => { // Toggle the check box to the checked state. toggleCheckBox.Post(checkBox); // Perform the read action. For demonstration, suspend the current // thread to simulate a lengthy read operation. Thread.Sleep(milliseconds); // Toggle the check box to the unchecked state. toggleCheckBox.Post(checkBox); }, new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerPair.ConcurrentScheduler }); // Create an ActionBlock<int> object for the writer CheckBox object. // This ActionBlock<int> object represents an action that writes to // a resource, but cannot run in parallel to readers. // Specifying the exclusive part of the scheduler pair enables the // writer to run in exclusively with respect to other actions that are // managed by the scheduler pair. var writerAction = new ActionBlock<int>(milliseconds => { // Toggle the check box to the checked state. toggleCheckBox.Post(checkBox4); // Perform the write action. For demonstration, suspend the current // thread to simulate a lengthy write operation. Thread.Sleep(milliseconds); // Toggle the check box to the unchecked state. toggleCheckBox.Post(checkBox4); }, new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerPair.ExclusiveScheduler }); // Link the broadcaster to each reader and writer block. // The BroadcastBlock<T> class propagates values that it // receives to all connected targets. foreach (var readerAction in readerActions) { broadcaster.LinkTo(readerAction); } broadcaster.LinkTo(writerAction);
' Create a ConcurrentExclusiveSchedulerPair object. ' Readers will run on the concurrent part of the scheduler pair. ' The writer will run on the exclusive part of the scheduler pair. Dim taskSchedulerPair = New ConcurrentExclusiveSchedulerPair() ' Create an ActionBlock<int> object for each reader CheckBox object. ' Each ActionBlock<int> object represents an action that can read ' from a resource in parallel to other readers. ' Specifying the concurrent part of the scheduler pair enables the ' reader to run in parallel to other actions that are managed by ' that scheduler. Dim readerActions = From checkBox In New CheckBox() {checkBox1, checkBox2, checkBox3} _ Select New ActionBlock(Of Integer)(Sub(milliseconds) ' Toggle the check box to the checked state. ' Perform the read action. For demonstration, suspend the current ' thread to simulate a lengthy read operation. ' Toggle the check box to the unchecked state. toggleCheckBox.Post(checkBox) Thread.Sleep(milliseconds) toggleCheckBox.Post(checkBox) End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ConcurrentScheduler}) ' Create an ActionBlock<int> object for the writer CheckBox object. ' This ActionBlock<int> object represents an action that writes to ' a resource, but cannot run in parallel to readers. ' Specifying the exclusive part of the scheduler pair enables the ' writer to run in exclusively with respect to other actions that are ' managed by the scheduler pair. Dim writerAction = New ActionBlock(Of Integer)(Sub(milliseconds) ' Toggle the check box to the checked state. ' Perform the write action. For demonstration, suspend the current ' thread to simulate a lengthy write operation. ' Toggle the check box to the unchecked state. toggleCheckBox.Post(checkBox4) Thread.Sleep(milliseconds) toggleCheckBox.Post(checkBox4) End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ExclusiveScheduler}) ' Link the broadcaster to each reader and writer block. ' The BroadcastBlock<T> class propagates values that it ' receives to all connected targets. For Each readerAction In readerActions broadcaster.LinkTo(readerAction) Next readerAction broadcaster.LinkTo(writerAction)
Form1
コンストラクターで、Timer オブジェクトを開始します。// Start the timer. timer1.Start();
' Start the timer. timer1.Start()
メイン フォームのフォーム デザイナーで、タイマーの Tick イベントのイベント ハンドラーを作成します。
タイマーに Tick イベントを実装します。
// Event handler for the timer. private void timer1_Tick(object sender, EventArgs e) { // Post a value to the broadcaster. The broadcaster // sends this message to each target. broadcaster.Post(1000); }
' Event handler for the timer. Private Sub timer1_Tick(ByVal sender As Object, ByVal e As EventArgs) Handles timer1.Tick ' Post a value to the broadcaster. The broadcaster ' sends this message to each target. broadcaster.Post(1000) End Sub
toggleCheckBox
データ フロー ブロックはユーザー インターフェイスで機能するので、この操作をユーザー インターフェイス スレッドで実行することが重要です。 これを実現するため、構築時にこのオブジェクトは TaskScheduler プロパティが TaskScheduler.FromCurrentSynchronizationContext に設定された ExecutionDataflowBlockOptions オブジェクトを提供します。 FromCurrentSynchronizationContext メソッドは、現行の同期コンテキストで作業を実行する TaskScheduler オブジェクトを作成します。 Form1
コンストラクターはユーザー インターフェイス スレッドから呼び出されるので、toggleCheckBox
データ フロー ブロックに対するアクションも、ユーザー インターフェイス スレッドで実行されます。
この例では、ConcurrentExclusiveSchedulerPair クラスを使用していくつかのデータ フロー ブロックが同時に実行できるようにし、別の 1 つのデータ フロー ブロックが、同じ ConcurrentExclusiveSchedulerPair オブジェクトで実行するその他すべてのデータ フロー ブロックに対して排他的に実行するようにします。 この手法は、複数のデータ フロー ブロックが 1 つのリソースを共有し、一部のデータ フロー ブロックがそのリソースへの排他アクセスを必要とする場合に、手動でそのリソースへのアクセスを同期する必要がなくなるため、便利です。 手動で同期する必要がなくなることで、コードの効率性が向上する可能性があります。
例
次の例は、Form1.cs (Visual Basic の Form1.vb) のコード全体を示しています。
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;
namespace WriterReadersWinForms
{
public partial class Form1 : Form
{
// Broadcasts values to an ActionBlock<int> object that is associated
// with each check box.
BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);
public Form1()
{
InitializeComponent();
// Create an ActionBlock<CheckBox> object that toggles the state
// of CheckBox objects.
// Specifying the current synchronization context enables the
// action to run on the user-interface thread.
var toggleCheckBox = new ActionBlock<CheckBox>(checkBox =>
{
checkBox.Checked = !checkBox.Checked;
},
new ExecutionDataflowBlockOptions
{
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
});
// Create a ConcurrentExclusiveSchedulerPair object.
// Readers will run on the concurrent part of the scheduler pair.
// The writer will run on the exclusive part of the scheduler pair.
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
// Create an ActionBlock<int> object for each reader CheckBox object.
// Each ActionBlock<int> object represents an action that can read
// from a resource in parallel to other readers.
// Specifying the concurrent part of the scheduler pair enables the
// reader to run in parallel to other actions that are managed by
// that scheduler.
var readerActions =
from checkBox in new CheckBox[] {checkBox1, checkBox2, checkBox3}
select new ActionBlock<int>(milliseconds =>
{
// Toggle the check box to the checked state.
toggleCheckBox.Post(checkBox);
// Perform the read action. For demonstration, suspend the current
// thread to simulate a lengthy read operation.
Thread.Sleep(milliseconds);
// Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox);
},
new ExecutionDataflowBlockOptions
{
TaskScheduler = taskSchedulerPair.ConcurrentScheduler
});
// Create an ActionBlock<int> object for the writer CheckBox object.
// This ActionBlock<int> object represents an action that writes to
// a resource, but cannot run in parallel to readers.
// Specifying the exclusive part of the scheduler pair enables the
// writer to run in exclusively with respect to other actions that are
// managed by the scheduler pair.
var writerAction = new ActionBlock<int>(milliseconds =>
{
// Toggle the check box to the checked state.
toggleCheckBox.Post(checkBox4);
// Perform the write action. For demonstration, suspend the current
// thread to simulate a lengthy write operation.
Thread.Sleep(milliseconds);
// Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox4);
},
new ExecutionDataflowBlockOptions
{
TaskScheduler = taskSchedulerPair.ExclusiveScheduler
});
// Link the broadcaster to each reader and writer block.
// The BroadcastBlock<T> class propagates values that it
// receives to all connected targets.
foreach (var readerAction in readerActions)
{
broadcaster.LinkTo(readerAction);
}
broadcaster.LinkTo(writerAction);
// Start the timer.
timer1.Start();
}
// Event handler for the timer.
private void timer1_Tick(object sender, EventArgs e)
{
// Post a value to the broadcaster. The broadcaster
// sends this message to each target.
broadcaster.Post(1000);
}
}
}
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Namespace WriterReadersWinForms
Partial Public Class Form1
Inherits Form
' Broadcasts values to an ActionBlock<int> object that is associated
' with each check box.
Private broadcaster As New BroadcastBlock(Of Integer)(Nothing)
Public Sub New()
InitializeComponent()
' Create an ActionBlock<CheckBox> object that toggles the state
' of CheckBox objects.
' Specifying the current synchronization context enables the
' action to run on the user-interface thread.
Dim toggleCheckBox = New ActionBlock(Of CheckBox)(Sub(checkBox) checkBox.Checked = Not checkBox.Checked, New ExecutionDataflowBlockOptions With {.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()})
' Create a ConcurrentExclusiveSchedulerPair object.
' Readers will run on the concurrent part of the scheduler pair.
' The writer will run on the exclusive part of the scheduler pair.
Dim taskSchedulerPair = New ConcurrentExclusiveSchedulerPair()
' Create an ActionBlock<int> object for each reader CheckBox object.
' Each ActionBlock<int> object represents an action that can read
' from a resource in parallel to other readers.
' Specifying the concurrent part of the scheduler pair enables the
' reader to run in parallel to other actions that are managed by
' that scheduler.
Dim readerActions = From checkBox In New CheckBox() {checkBox1, checkBox2, checkBox3} _
Select New ActionBlock(Of Integer)(Sub(milliseconds)
' Toggle the check box to the checked state.
' Perform the read action. For demonstration, suspend the current
' thread to simulate a lengthy read operation.
' Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox)
Thread.Sleep(milliseconds)
toggleCheckBox.Post(checkBox)
End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ConcurrentScheduler})
' Create an ActionBlock<int> object for the writer CheckBox object.
' This ActionBlock<int> object represents an action that writes to
' a resource, but cannot run in parallel to readers.
' Specifying the exclusive part of the scheduler pair enables the
' writer to run in exclusively with respect to other actions that are
' managed by the scheduler pair.
Dim writerAction = New ActionBlock(Of Integer)(Sub(milliseconds)
' Toggle the check box to the checked state.
' Perform the write action. For demonstration, suspend the current
' thread to simulate a lengthy write operation.
' Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox4)
Thread.Sleep(milliseconds)
toggleCheckBox.Post(checkBox4)
End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ExclusiveScheduler})
' Link the broadcaster to each reader and writer block.
' The BroadcastBlock<T> class propagates values that it
' receives to all connected targets.
For Each readerAction In readerActions
broadcaster.LinkTo(readerAction)
Next readerAction
broadcaster.LinkTo(writerAction)
' Start the timer.
timer1.Start()
End Sub
' Event handler for the timer.
Private Sub timer1_Tick(ByVal sender As Object, ByVal e As EventArgs) Handles timer1.Tick
' Post a value to the broadcaster. The broadcaster
' sends this message to each target.
broadcaster.Post(1000)
End Sub
End Class
End Namespace
関連項目
.NET