次の方法で共有


.NET の問題

ThreadPool の順序実行

Stephen Toub

質問: 私のシステムのコンポーネントの多くは非同期的に実行する必要があり、その意味で、Microsoft .NET Framework ThreadPool が適切なソリューションだと思います。しかし、各コンポーネントの作業項目は順序に従って処理される必要があるため、作業項目が 2 つ同時に実行されないようにする必要があると思うのですが、これは特別な場合に限られるのでしょうか。もっとも、複数のコンポーネントは同時に実行できるため、問題ありません。実際、それが要求されるのです。何かアドバイスはありますか。

回答: これは、あなたが考えているほど限定的な問題ではありません。たとえばメッセージ渡しの場合など、さまざまな重要シナリオで発生します。パイプラインの複数のステージを常にアクティブにすることにより、並列実行のメリットを得られるパイプライン実装を考えてみましょう。

たとえば、ファイルからデータを読み取り、圧縮し、暗号化し、新しいファイルとして書き出すパイプラインがあるとします。圧縮は暗号化と同時に実行できますが、一方の出力が一方の入力になるため、同じデータに対して 2 つの処理を同時に実行することはできません。圧縮ルーチンでデータを圧縮し、それを暗号化のために暗号化ルーチンに送ります。その時点で、圧縮ルーチンは次のデータを処理できます。

多くの圧縮アルゴリズムや暗号化アルゴリズムで管理される状態は、以後のデータの圧縮方法や暗号化方法に影響を与えるため、順序の管理が重要となります (この例では、ファイルの処理方法については気にしないでください。結果を復号化し展開することで、元のファイルのすべてのデータを正しい順序で復元することが目的です)。

さまざまな解決方法が考えられます。1 つ目は、単純に各コンポーネントに 1 つのスレッドを指定する方法です。この DedicatedThread には、実行する作業項目の先入れ先出し (FIFO) キューと、そのキューをサービスする 1 つのスレッドが含まれます。コンポーネントに実行する必要がある作業項目がある場合、その作業項目がキューにダンプされ、最終的にスレッドによって作業項目が取得されて実行されます。スレッドは 1 つしかないため、一度に実行される項目は 1 つのみです。また、FIFO キューが使用されるため、作業項目は生成された順序で処理されます。

2009 年 1 月の「.NET の問題」コラムで解説した例と同じように、ここでは単純な WorkItem クラスを使用して、実行する作業を指定します。図 1 を参照してください。この WorkItem クラスを使用する DedicatedThread の実装を図 2 に示します。実装の大部分は、簡単な BlockingQueue<T> 実装で行われています (.NET Framework 4.0 には、このような実装にもっと適した BlockingCollection<T> タイプが含まれています)。DedicatedThread のコンストラクタにより、単純に BlockingQueue<T> インスタンスが作成され、スレッドが準備されます。スレッドではキューに作業項目が入力されるのを常に待ち受けており、入力されると、作業項目を実行します。

図 1 作業項目を取得する

internal class WorkItem {
  public WaitCallback Callback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = s => {
    var item = (WorkItem)s;
    item.Callback(item.State);
 };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else Callback(State);
  }
}

図 2 DedicatedThread の実装

public class DedicatedThread {
  private BlockingQueue<WorkItem> _workItems = 
    new BlockingQueue<WorkItem>();

  public DedicatedThread() {
    new Thread(() => {
      while (true) { workItems.Dequeue().Execute(); }
    }) { IsBackground = true }.Start();
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _workItems.Enqueue(new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() });
  }

  private class BlockingQueue<T> {
    private Queue<T> _queue = new Queue<T>();
    private Semaphore _gate = new Semaphore(0, Int32.MaxValue);

    public void Enqueue(T item) {
      lock (_queue) _queue.Enqueue(item);
      _gate.Release();
    }

    public T Dequeue() {
      _gate.WaitOne();
      lock (_queue) return _queue.Dequeue();
    }
  }
}

これにより、このシナリオで求められる基本的な機能が得られ、要件も満たされるかもしれませんが、いくつか重大な欠点があります。まず、スレッドは 1 つのコンポーネントにつき 1 つ予約されます。コンポーネントが 1 つまたは 2 つの場合、問題となることはありません。ただし、コンポーネントが多数ある場合には、スレッドの数が膨大になる可能性があります。そのため、パフォーマンスが低下するおそれがあります。

また、この実装はそれほど堅牢ではありません。たとえば、コンポーネントを破棄する場合、どうなるでしょうか。どのようにして、スレッドのブロッキングを停止するように指定するのでしょうか。さらに、作業項目から例外がスローされた場合、どうなるでしょうか。

余談ですが、この解決方法が Windows の一般的なメッセージ ポンプで使用されるものと同じであることは興味深いことです。メッセージ ポンプはメッセージの到着をループ処理で待ち受け、到着したメッセージをディスパッチ (処理) した後、元に戻って次のメッセージを待ち受けます。特定のウィンドウのメッセージは、1 つのスレッドで処理されます。この類似性について、図 3 のコードで説明します。このコードは、図 2 のコードと非常によく似た動作を実行します。Control を作成する新しいスレッドが準備され、ハンドルが初期化されます。次に、Application.Run を使用してメッセージ ループが実行されます。作業項目をこのスレッドのキューに入れるには、単純に Control の BeginInvoke メソッドを使用します。この方法をお勧めするわけではありませんが、前に示した DedicatedThread による解決方法と基本的な概念は同じであることに注目してください。

図 3 UI メッセージ ループとの類似性

public class WindowsFormsDedicatedThread {
  private Control _control;

  public WindowsFormsDedicatedThread() {
    using (var mre = new ManualResetEvent(false)) {
      new Thread(() => {
        _control = new Control();
        var forceHandleCreation = _control.Handle;
        mre.Set();
        Application.Run();
      }) { IsBackground = true }.Start();
      mre.WaitOne();
    }
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _control.BeginInvoke(callback, state);
  }
} 

2 つ目の解決方法では、実行に ThreadPool を使用します。専用キューをサービスする新しいカスタム スレッドを各コンポーネントに準備するのではなく、各コンポーネントでキューのみを維持します。その際、同じキューの 2 つの要素が同時に処理されないようにします。この方法では、必要なスレッド数の制御、スレッドの挿入と削除の処理、信頼性に関する問題の処理を ThreadPool 自体で実行でき、さらに新規スレッドを準備するという手間も省けます。

この解決方法の実装を、図 4 に示します。FifoExecution クラスでは、処理対象の作業項目のキューと、作業項目を処理するために要求が ThreadPool に発行されたかどうかを表すブール値という、2 つのフィールドのみが管理されます。どちらのフィールドも、作業項目リストでのロックにより保護されます。その他の部分では、2 つのメソッドのみが実装されています。

図 4 FifoExecution を実装する

public class FifoExecution {
  private Queue<WorkItem> _workItems = new Queue<WorkItem>();
  private bool _delegateQueuedOrRunning = false;

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    var item = new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() };
    lock (_workItems) {
      _workItems.Enqueue(item);
      if (!_delegateQueuedOrRunning) {
        _delegateQueuedOrRunning = true;
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null);
      }
    }
  }

  private void ProcessQueuedItems(object ignored) {
    while (true) {
      WorkItem item;
      lock (_workItems) {
        if (_workItems.Count == 0) {
          _delegateQueuedOrRunning = false;
          break;
        }
        item = _workItems.Dequeue();
      }
      try { item.Execute(); }
      catch {
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
          null);
        throw;
      }
    }
  }
}

1 つ目のメソッドは QueueUserWorkItem です。このメソッドのシグネチャは、ThreadPool により公開されるシグネチャと一致します (ThreadPool により、WaitCallback のみを受け入れる便利なオーバーロードも得られ、このオーバーロードを選択して追加することができます)。このメソッドでは、まず保存する WorkItem を作成した後、ロックを取得します。WorkItem の作成中は、共有状態にアクセスしません。そのため、ロックを可能な限り小さく維持するために、ロックを取得する前に項目の取得が実行されます。ロックの適用後、作成された作業項目が作業項目キューに入れられます。

次に、キュー内の作業項目を処理する要求が ThreadPool に行われたかどうかを確認し、もし行われていなければ、その要求を行います (さらに、今後に備えてそれを記録します)。この ThreadPool に対する要求は、単純に ThreadPool のスレッドの 1 つを使用して、ProcessQueuedItems メソッドを実行するというものです。

ThreadPool スレッドにより呼び出された ProcessQueuedItems は、ループに入ります。ProcessQueuedItems はこのループ内でロックを取得し、ロックを維持しながら、処理が必要な作業項目が存在するかどうかを確認します。作業項目が存在しない場合、要求フラグをリセットし (以後にキューに入れられた項目が再びプールからの処理を要求するように設定します)、終了します。処理する作業項目が存在する場合、次の作業項目を取得し、ロックを解除して、処理を実行し、すべての動作をもう一度最初から開始します。キュー内に未処理の項目がなくなるまで、ループ処理を続けます。

これは単純ですが、強力な実装です。コンポーネントでは、FifoExecution インスタンスを作成し、それを使用して作業項目をスケジュールできるようになります。各 FifoExecution インスタンスで、キューに入れられた作業項目は一度に 1 つのみ、キューに入れられた順序に従って実行できます。さらに、別々の FifoExecution インスタンスの作業項目は同時に実行できます。最も大きな利点は、スレッド管理の手間がなくなり、面倒な (しかし非常に重要な) スレッド管理という作業を ThreadPool に任せられることです。

すべてのコンポーネントのプールが常に作業項目でいっぱいになるという、極端な状況の場合、元の DedicatedThread の実装と同じように、1 つのコンポーネントに 1 つのスレッドという状態に近づいていきます。しかし、このような状態は、ThreadPool によってそれが適切だと判断された場合にのみ発生します。コンポーネントのプールの飽和状態が収まると、要求されるスレッドは大幅に減少します。

また、例外に応じて ThreadPool に適切な処理を実行させるなどの利点もあります。DedicatedThread 実装の場合、項目が例外をスローした場合にはどうなるでしょうか。スレッドは停止しますが、アプリケーションの構成によっては、プロセスが停止しない場合があります。その場合、DedicatedThread のキューに作業項目の入力が開始されますが、その作業項目が実行されることはありません。FifoExecution の場合、ThreadPool は単純にスレッドを追加して、停止したスレッドの影響を埋め合わせます。

図 5 に、FifoExecution クラスを利用した単純なデモ アプリケーションを示します。このアプリケーションでは、パイプラインに 3 つのステージがあります。各ステージでは、現在処理中のデータの ID が書き出されます (単純なループ繰り返しです)。次に、処理を実行し (ここでは Thread.SpinWait で表されています)、データを次のステージに渡します (これも単純なループ繰り返しです)。結果を別々に出力して見やすくするために、ステップごとに異なる番号のタブを使って情報を出力します。図 6 に示す出力内容でわかるように、各ステージ (列) で作業の順序が正しく維持されています。

図 5 FifoExecution のデモンストレーション

static void Main(string[] args) {
  var stage1 = new FifoExecution();
  var stage2 = new FifoExecution();
  var stage3 = new FifoExecution();

  for (int i = 0; i < 100; i++) {
    stage1.QueueUserWorkItem(one => {
      Console.WriteLine("" + one);
      Thread.SpinWait(100000000);

      stage2.QueueUserWorkItem(two => {
        Console.WriteLine("\t\t" + two);
        Thread.SpinWait(100000000);

        stage3.QueueUserWorkItem(three => {
          Console.WriteLine("\t\t\t\t" + three);
          Thread.SpinWait(100000000);
        }, two);
      }, one);
    }, i);
  }

   Console.ReadLine();
}

fig06.gif

図 6 デモ アプリケーションの出力

パイプラインのステージ間に公平性がないことも、興味深い点です。たとえば、図 6 では stage1 が 21 回目の反復に達していますが、stage2 はまだ 13 回目であり、stage3 は 9 回目です。このような結果になる大きな原因は、ProcessQueuedItems の実装方法にあります。このサンプル アプリケーションでは stage1 に 100 個の作業項目を非常に速くプッシュするため、stage1 を処理するプールからのスレッドが ProcessQueuedItems ループを独占し、stage1 の作業がなくなるまでループから戻らないと考えられます。そのため、他のステージと比較して不公平な偏りが生じます。アプリケーションでこのような挙動が見られる場合、問題となります。ProcessQueuedItems の実装を次のように変更することで、ステージ間の公平性を高めることができます。

private void ProcessQueuedItems(object ignored) {
  WorkItem item;
  lock (_workItems) {
    if (_workItems.Count == 0) {
      _delegateQueuedOrRunning = false;
      return;
    }
    item = _workItems.Dequeue();
  }
  try { item.Execute(); }
  finally {
    ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
      null);
  }
}

このコードの場合、ProcessQueuedItems は処理する項目が存在してもループせず、自分自身を再帰的に ThreadPool のキューに入力します。これにより、ProcessQueuedItems 自身の優先順位が他のステージの項目よりも下になります。この変更により、図 5 に示すアプリケーションからの出力は、図 7 のようになります。この新しい出力結果では、stage2 と stage3 の扱いが公平性を増していることがわかります (まだステージ間に若干の差がありますが、これはパイプラインですから、この程度の差は当然生じます)。

fig07.gif

図 7 より公平なスケジュールを使用した新しい出力

言うまでもなく、公平性を増すとその代償が生じます。それぞれの作業項目がスケジューラを経由することになるため、コストが増加します。このことが、アプリケーションで許容されるトレードオフかどうかを判断する必要があります。たとえば、作業項目で実行する作業が完全に実体的である場合、このオーバーヘッドは無視でき、意識されることもないでしょう。

これは、機能を追加する ThreadPool を基にすることで、自分でカスタム スレッド プールを作成することなくシステムを構築できることを示す 1 つの例に過ぎません。その他の例については、MSDN マガジンの「.NET の問題」の以前のコラムを参照してください。

ご意見やご質問は、netqa@microsoft.com まで英語でお送りください。

Stephen Toub は、Microsoft の並列コンピューティング プラットフォーム チームのシニア プログラム マネージャです。また、MSDN Magazine の寄稿編集者でもあります。