方法:データフロー ブロックのタスク スケジューラを指定する

このドキュメントでは、アプリケーションでデータ フローを使用する場合に特定のタスク スケジューラを関連付ける方法を示します。 この例では、Windows フォーム アプリケーションの System.Threading.Tasks.ConcurrentExclusiveSchedulerPair クラスを使用して、リーダー タスクがアクティブである場合と、ライター タスクがアクティブである場合を示します。 また、TaskScheduler.FromCurrentSynchronizationContext メソッドを使用してデータ フロー ブロックを有効にし、ユーザー インターフェイス スレッドで実行できるようにします。

注意

TPL データフロー ライブラリ (System.Threading.Tasks.Dataflow 名前空間) は、.NET と一緒には配布されません。 Visual Studio に System.Threading.Tasks.Dataflow 名前空間をインストールするには、プロジェクトを開き、[プロジェクト] メニューの [NuGet パッケージの管理] をクリックし、System.Threading.Tasks.Dataflow パッケージをオンラインで検索します。 または、.NET Core CLI を使ってインストールするには、dotnet add package System.Threading.Tasks.Dataflow を実行します。

Windows フォーム アプリケーションを作成するには

  1. Visual C# または Visual Basic の Windows フォーム アプリケーション プロジェクトを作成します。 以降の手順では、プロジェクトの名前は WriterReadersWinForms とします。

  2. メイン フォーム Form1.cs (Visual Basic の Form1.vb) のフォーム デザイナーで、4 つの CheckBox コントロールを追加します。 Text プロパティを、checkBox1 に対しては「リーダー 1」に、checkBox2 に対しては「リーダー 2」に、checkBox3 に対しては「リーダー 3」に、そして checkBox4 に対しては「ライター」に設定します。 コントロールごとに、Enabled プロパティを False に設定します。

  3. フォームに Timer コントロールを追加します。 Interval プロパティを 2500に設定します。

データ フロー機能の追加

このセクションでは、アプリケーションに参加するデータ フロー ブロックを作成する方法と、各データ フロー ブロックをタスク スケジューラとを関連付ける方法について説明します。

アプリケーションにデータ フロー機能を追加するには

  1. プロジェクトで、System.Threading.Tasks.Dataflow.dll への参照を追加します。

  2. Form1.cs (Visual Basic の Form1.vb) に次の using ステートメント (Visual Basic では Imports) が含まれていることを確認します。

    using System;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    using System.Windows.Forms;
    
    Imports System.Threading
    Imports System.Threading.Tasks
    Imports System.Threading.Tasks.Dataflow
    
    
  3. BroadcastBlock<T> データ メンバーを Form1 クラスに追加します。

    // Broadcasts values to an ActionBlock<int> object that is associated
    // with each check box.
    BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);
    
    ' Broadcasts values to an ActionBlock<int> object that is associated
    ' with each check box.
    Private broadcaster As New BroadcastBlock(Of Integer)(Nothing)
    
  4. Form1 コンストラクターで、InitializeComponent の呼び出しの後に、CheckBox オブジェクトの状態を切り替える ActionBlock<TInput> オブジェクトを作成します。

    // Create an ActionBlock<CheckBox> object that toggles the state
    // of CheckBox objects.
    // Specifying the current synchronization context enables the
    // action to run on the user-interface thread.
    var toggleCheckBox = new ActionBlock<CheckBox>(checkBox =>
    {
       checkBox.Checked = !checkBox.Checked;
    },
    new ExecutionDataflowBlockOptions
    {
       TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
    });
    
    ' Create an ActionBlock<CheckBox> object that toggles the state
    ' of CheckBox objects.
    ' Specifying the current synchronization context enables the 
    ' action to run on the user-interface thread.
    Dim toggleCheckBox = New ActionBlock(Of CheckBox)(Sub(checkBox) checkBox.Checked = Not checkBox.Checked, New ExecutionDataflowBlockOptions With {.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()})
    
  5. Form1 コンストラクターで、1 つの ConcurrentExclusiveSchedulerPair オブジェクトと 4 つの ActionBlock<TInput> オブジェクト (各 CheckBox オブジェクトに 1 つの ActionBlock<TInput> オブジェクト) を作成します。 それぞれの ActionBlock<TInput> オブジェクトに対して ExecutionDataflowBlockOptions オブジェクトを指定します。このオブジェクトの TaskScheduler プロパティは、リーダーの場合は ConcurrentScheduler プロパティに設定し、ライターの場合は ExclusiveScheduler プロパティに設定します。

    // Create a ConcurrentExclusiveSchedulerPair object.
    // Readers will run on the concurrent part of the scheduler pair.
    // The writer will run on the exclusive part of the scheduler pair.
    var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
    
    // Create an ActionBlock<int> object for each reader CheckBox object.
    // Each ActionBlock<int> object represents an action that can read
    // from a resource in parallel to other readers.
    // Specifying the concurrent part of the scheduler pair enables the
    // reader to run in parallel to other actions that are managed by
    // that scheduler.
    var readerActions =
       from checkBox in new CheckBox[] {checkBox1, checkBox2, checkBox3}
       select new ActionBlock<int>(milliseconds =>
       {
          // Toggle the check box to the checked state.
          toggleCheckBox.Post(checkBox);
    
          // Perform the read action. For demonstration, suspend the current
          // thread to simulate a lengthy read operation.
          Thread.Sleep(milliseconds);
    
          // Toggle the check box to the unchecked state.
          toggleCheckBox.Post(checkBox);
       },
       new ExecutionDataflowBlockOptions
       {
          TaskScheduler = taskSchedulerPair.ConcurrentScheduler
       });
    
    // Create an ActionBlock<int> object for the writer CheckBox object.
    // This ActionBlock<int> object represents an action that writes to
    // a resource, but cannot run in parallel to readers.
    // Specifying the exclusive part of the scheduler pair enables the
    // writer to run in exclusively with respect to other actions that are
    // managed by the scheduler pair.
    var writerAction = new ActionBlock<int>(milliseconds =>
    {
       // Toggle the check box to the checked state.
       toggleCheckBox.Post(checkBox4);
    
       // Perform the write action. For demonstration, suspend the current
       // thread to simulate a lengthy write operation.
       Thread.Sleep(milliseconds);
    
       // Toggle the check box to the unchecked state.
       toggleCheckBox.Post(checkBox4);
    },
    new ExecutionDataflowBlockOptions
    {
       TaskScheduler = taskSchedulerPair.ExclusiveScheduler
    });
    
    // Link the broadcaster to each reader and writer block.
    // The BroadcastBlock<T> class propagates values that it
    // receives to all connected targets.
    foreach (var readerAction in readerActions)
    {
       broadcaster.LinkTo(readerAction);
    }
    broadcaster.LinkTo(writerAction);
    
    ' Create a ConcurrentExclusiveSchedulerPair object.
    ' Readers will run on the concurrent part of the scheduler pair.
    ' The writer will run on the exclusive part of the scheduler pair.
    Dim taskSchedulerPair = New ConcurrentExclusiveSchedulerPair()
    
    ' Create an ActionBlock<int> object for each reader CheckBox object.
    ' Each ActionBlock<int> object represents an action that can read 
    ' from a resource in parallel to other readers.
    ' Specifying the concurrent part of the scheduler pair enables the 
    ' reader to run in parallel to other actions that are managed by 
    ' that scheduler.
    Dim readerActions = From checkBox In New CheckBox() {checkBox1, checkBox2, checkBox3} _
                        Select New ActionBlock(Of Integer)(Sub(milliseconds)
                                               ' Toggle the check box to the checked state.
                                               ' Perform the read action. For demonstration, suspend the current
                                               ' thread to simulate a lengthy read operation.
                                               ' Toggle the check box to the unchecked state.
                                               toggleCheckBox.Post(checkBox)
                                                               Thread.Sleep(milliseconds)
                                                               toggleCheckBox.Post(checkBox)
                                                           End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ConcurrentScheduler})
    
    ' Create an ActionBlock<int> object for the writer CheckBox object.
    ' This ActionBlock<int> object represents an action that writes to 
    ' a resource, but cannot run in parallel to readers.
    ' Specifying the exclusive part of the scheduler pair enables the 
    ' writer to run in exclusively with respect to other actions that are 
    ' managed by the scheduler pair.
    Dim writerAction = New ActionBlock(Of Integer)(Sub(milliseconds)
                                                       ' Toggle the check box to the checked state.
                                                       ' Perform the write action. For demonstration, suspend the current
                                                       ' thread to simulate a lengthy write operation.
                                                       ' Toggle the check box to the unchecked state.
                                                       toggleCheckBox.Post(checkBox4)
                                                       Thread.Sleep(milliseconds)
                                                       toggleCheckBox.Post(checkBox4)
                                                   End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ExclusiveScheduler})
    
    ' Link the broadcaster to each reader and writer block.
    ' The BroadcastBlock<T> class propagates values that it 
    ' receives to all connected targets.
    For Each readerAction In readerActions
        broadcaster.LinkTo(readerAction)
    Next readerAction
    broadcaster.LinkTo(writerAction)
    
  6. Form1 コンストラクターで、Timer オブジェクトを開始します。

    // Start the timer.
    timer1.Start();
    
    ' Start the timer.
    timer1.Start()
    
  7. メイン フォームのフォーム デザイナーで、タイマーの Tick イベントのイベント ハンドラーを作成します。

  8. タイマーに Tick イベントを実装します。

    // Event handler for the timer.
    private void timer1_Tick(object sender, EventArgs e)
    {
       // Post a value to the broadcaster. The broadcaster
       // sends this message to each target.
       broadcaster.Post(1000);
    }
    
    ' Event handler for the timer.
    Private Sub timer1_Tick(ByVal sender As Object, ByVal e As EventArgs) Handles timer1.Tick
        ' Post a value to the broadcaster. The broadcaster
        ' sends this message to each target. 
        broadcaster.Post(1000)
    End Sub
    

toggleCheckBox データ フロー ブロックはユーザー インターフェイスで機能するので、この操作をユーザー インターフェイス スレッドで実行することが重要です。 これを実現するため、構築時にこのオブジェクトは TaskScheduler プロパティが TaskScheduler.FromCurrentSynchronizationContext に設定された ExecutionDataflowBlockOptions オブジェクトを提供します。 FromCurrentSynchronizationContext メソッドは、現行の同期コンテキストで作業を実行する TaskScheduler オブジェクトを作成します。 Form1 コンストラクターはユーザー インターフェイス スレッドから呼び出されるので、toggleCheckBox データ フロー ブロックに対するアクションも、ユーザー インターフェイス スレッドで実行されます。

この例では、ConcurrentExclusiveSchedulerPair クラスを使用していくつかのデータ フロー ブロックが同時に実行できるようにし、別の 1 つのデータ フロー ブロックが、同じ ConcurrentExclusiveSchedulerPair オブジェクトで実行するその他すべてのデータ フロー ブロックに対して排他的に実行するようにします。 この手法は、複数のデータ フロー ブロックが 1 つのリソースを共有し、一部のデータ フロー ブロックがそのリソースへの排他アクセスを必要とする場合に、手動でそのリソースへのアクセスを同期する必要がなくなるため、便利です。 手動で同期する必要がなくなることで、コードの効率性が向上する可能性があります。

次の例は、Form1.cs (Visual Basic の Form1.vb) のコード全体を示しています。

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;

namespace WriterReadersWinForms
{
   public partial class Form1 : Form
   {
      // Broadcasts values to an ActionBlock<int> object that is associated
      // with each check box.
      BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);

      public Form1()
      {
         InitializeComponent();

         // Create an ActionBlock<CheckBox> object that toggles the state
         // of CheckBox objects.
         // Specifying the current synchronization context enables the
         // action to run on the user-interface thread.
         var toggleCheckBox = new ActionBlock<CheckBox>(checkBox =>
         {
            checkBox.Checked = !checkBox.Checked;
         },
         new ExecutionDataflowBlockOptions
         {
            TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
         });

         // Create a ConcurrentExclusiveSchedulerPair object.
         // Readers will run on the concurrent part of the scheduler pair.
         // The writer will run on the exclusive part of the scheduler pair.
         var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();

         // Create an ActionBlock<int> object for each reader CheckBox object.
         // Each ActionBlock<int> object represents an action that can read
         // from a resource in parallel to other readers.
         // Specifying the concurrent part of the scheduler pair enables the
         // reader to run in parallel to other actions that are managed by
         // that scheduler.
         var readerActions =
            from checkBox in new CheckBox[] {checkBox1, checkBox2, checkBox3}
            select new ActionBlock<int>(milliseconds =>
            {
               // Toggle the check box to the checked state.
               toggleCheckBox.Post(checkBox);

               // Perform the read action. For demonstration, suspend the current
               // thread to simulate a lengthy read operation.
               Thread.Sleep(milliseconds);

               // Toggle the check box to the unchecked state.
               toggleCheckBox.Post(checkBox);
            },
            new ExecutionDataflowBlockOptions
            {
               TaskScheduler = taskSchedulerPair.ConcurrentScheduler
            });

         // Create an ActionBlock<int> object for the writer CheckBox object.
         // This ActionBlock<int> object represents an action that writes to
         // a resource, but cannot run in parallel to readers.
         // Specifying the exclusive part of the scheduler pair enables the
         // writer to run in exclusively with respect to other actions that are
         // managed by the scheduler pair.
         var writerAction = new ActionBlock<int>(milliseconds =>
         {
            // Toggle the check box to the checked state.
            toggleCheckBox.Post(checkBox4);

            // Perform the write action. For demonstration, suspend the current
            // thread to simulate a lengthy write operation.
            Thread.Sleep(milliseconds);

            // Toggle the check box to the unchecked state.
            toggleCheckBox.Post(checkBox4);
         },
         new ExecutionDataflowBlockOptions
         {
            TaskScheduler = taskSchedulerPair.ExclusiveScheduler
         });

         // Link the broadcaster to each reader and writer block.
         // The BroadcastBlock<T> class propagates values that it
         // receives to all connected targets.
         foreach (var readerAction in readerActions)
         {
            broadcaster.LinkTo(readerAction);
         }
         broadcaster.LinkTo(writerAction);

         // Start the timer.
         timer1.Start();
      }

      // Event handler for the timer.
      private void timer1_Tick(object sender, EventArgs e)
      {
         // Post a value to the broadcaster. The broadcaster
         // sends this message to each target.
         broadcaster.Post(1000);
      }
   }
}
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow


Namespace WriterReadersWinForms
    Partial Public Class Form1
        Inherits Form
        ' Broadcasts values to an ActionBlock<int> object that is associated
        ' with each check box.
        Private broadcaster As New BroadcastBlock(Of Integer)(Nothing)

        Public Sub New()
            InitializeComponent()

            ' Create an ActionBlock<CheckBox> object that toggles the state
            ' of CheckBox objects.
            ' Specifying the current synchronization context enables the 
            ' action to run on the user-interface thread.
            Dim toggleCheckBox = New ActionBlock(Of CheckBox)(Sub(checkBox) checkBox.Checked = Not checkBox.Checked, New ExecutionDataflowBlockOptions With {.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()})

            ' Create a ConcurrentExclusiveSchedulerPair object.
            ' Readers will run on the concurrent part of the scheduler pair.
            ' The writer will run on the exclusive part of the scheduler pair.
            Dim taskSchedulerPair = New ConcurrentExclusiveSchedulerPair()

            ' Create an ActionBlock<int> object for each reader CheckBox object.
            ' Each ActionBlock<int> object represents an action that can read 
            ' from a resource in parallel to other readers.
            ' Specifying the concurrent part of the scheduler pair enables the 
            ' reader to run in parallel to other actions that are managed by 
            ' that scheduler.
            Dim readerActions = From checkBox In New CheckBox() {checkBox1, checkBox2, checkBox3} _
                                Select New ActionBlock(Of Integer)(Sub(milliseconds)
                                                       ' Toggle the check box to the checked state.
                                                       ' Perform the read action. For demonstration, suspend the current
                                                       ' thread to simulate a lengthy read operation.
                                                       ' Toggle the check box to the unchecked state.
                                                       toggleCheckBox.Post(checkBox)
                                                                       Thread.Sleep(milliseconds)
                                                                       toggleCheckBox.Post(checkBox)
                                                                   End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ConcurrentScheduler})

            ' Create an ActionBlock<int> object for the writer CheckBox object.
            ' This ActionBlock<int> object represents an action that writes to 
            ' a resource, but cannot run in parallel to readers.
            ' Specifying the exclusive part of the scheduler pair enables the 
            ' writer to run in exclusively with respect to other actions that are 
            ' managed by the scheduler pair.
            Dim writerAction = New ActionBlock(Of Integer)(Sub(milliseconds)
                                                               ' Toggle the check box to the checked state.
                                                               ' Perform the write action. For demonstration, suspend the current
                                                               ' thread to simulate a lengthy write operation.
                                                               ' Toggle the check box to the unchecked state.
                                                               toggleCheckBox.Post(checkBox4)
                                                               Thread.Sleep(milliseconds)
                                                               toggleCheckBox.Post(checkBox4)
                                                           End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ExclusiveScheduler})

            ' Link the broadcaster to each reader and writer block.
            ' The BroadcastBlock<T> class propagates values that it 
            ' receives to all connected targets.
            For Each readerAction In readerActions
                broadcaster.LinkTo(readerAction)
            Next readerAction
            broadcaster.LinkTo(writerAction)

            ' Start the timer.
            timer1.Start()
        End Sub

        ' Event handler for the timer.
        Private Sub timer1_Tick(ByVal sender As Object, ByVal e As EventArgs) Handles timer1.Tick
            ' Post a value to the broadcaster. The broadcaster
            ' sends this message to each target. 
            broadcaster.Post(1000)
        End Sub
    End Class
End Namespace

関連項目