方法 : producer スレッドと consumer スレッドを同期する (C# プログラミング ガイド)
更新 : 2007 年 11 月
次の例は lock キーワードと AutoResetEvent クラスおよび ManualResetEvent クラスを使用して、プライマリ スレッドと 2 つのワーカー スレッドの間でスレッドの同期を行う方法を示しています。詳細については、「lock ステートメント (C# リファレンス)」を参照してください。
この例では、補助的なスレッドであるワーカー スレッドを 2 つ作成します。一方のスレッドでは要素を生成し、スレッド セーフではないジェネリック キューに格納します。詳細については、「Queue<T>」を参照してください。もう一方のスレッドでは、このキューの項目を使用します。また、プライマリ スレッドでは定期的にキューの内容を表示して、3 つのスレッドがキューにアクセスできるようにします。さらに、lock キーワードを使用してキューへのアクセスを同期し、キューの状態が破損しないようにします。
lock キーワードを使用して同時アクセスを回避するだけでなく、2 つのイベント オブジェクトによってさらに確実な同期を実現します。一方のイベント オブジェクトはワーカー スレッドに対して終了を通知するために使用され、もう一方は、新しい項目がキューに追加された際に producer スレッドから consumer スレッドに通知するために使用されます。この 2 つのイベント オブジェクトは SyncEvents というクラスにカプセル化されます。カプセル化によって、consumer スレッドを表すオブジェクトおよび producer スレッドを表すオブジェクトにこれらのイベントを容易に渡すことができます。SyncEvents クラスは次のように定義されています。
public class SyncEvents
{
public SyncEvents()
{
_newItemEvent = new AutoResetEvent(false);
_exitThreadEvent = new ManualResetEvent(false);
_eventArray = new WaitHandle[2];
_eventArray[0] = _newItemEvent;
_eventArray[1] = _exitThreadEvent;
}
public EventWaitHandle ExitThreadEvent
{
get { return _exitThreadEvent; }
}
public EventWaitHandle NewItemEvent
{
get { return _newItemEvent; }
}
public WaitHandle[] EventArray
{
get { return _eventArray; }
}
private EventWaitHandle _newItemEvent;
private EventWaitHandle _exitThreadEvent;
private WaitHandle[] _eventArray;
}
AutoResetEvent クラスは "新しい項目" イベントに対して使用されます。consumer スレッドがこのイベントに応答するたびに、このイベントを自動的にリセットする必要があるためです。また、ManualResetEvent クラスは "終了" イベントに対して使用されます。このイベントがシグナル状態になったときに複数のスレッドが応答するようにする必要があるためです。代わりに AutoResetEvent を使用した場合、1 つのスレッドだけがこのイベントに応答した後、このイベントは非シグナル状態に戻ります。他のスレッドは応答せず、この場合は終了しません。
SyncEvents クラスは 2 つのイベントを作成し、2 つの異なった形式で格納します。1 つは、AutoResetEvent および ManualResetEvent 双方の基本クラスである EventWaitHandle として格納し、もう 1 つは WaitHandle に基づく配列内に格納します。後ほど consumer スレッドの説明で示すように、consumer スレッドがいずれのイベントにも応答できるようにするには、この配列が必要です。
consumer スレッドと producer スレッドは、それぞれ Consumer クラスと Producer クラスによって表されます。これらのクラスはいずれも ThreadRun というメソッドを定義します。これらのメソッドは、Main メソッドが作成するワーカー スレッドのエントリ ポイントとして使用されます。
Producer クラスが定義する ThreadRun メソッドは次のようになります。
// Producer.ThreadRun
public void ThreadRun()
{
int count = 0;
Random r = new Random();
while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
{
lock (((ICollection)_queue).SyncRoot)
{
while (_queue.Count < 20)
{
_queue.Enqueue(r.Next(0,100));
_syncEvents.NewItemEvent.Set();
count++;
}
}
}
Console.WriteLine("Producer thread: produced {0} items", count);
}
このメソッドは、"スレッドの終了" イベントがシグナル状態になるまでループします。このイベントの状態は、SyncEvents クラスによって定義される ExitThreadEvent プロパティを使用して、WaitOne メソッドによりテストされます。この場合、WaitOne によって使用される最初の引数がゼロ (メソッドからすぐに制御が戻ることを示す) であるため、現在のスレッドをブロックせずにイベントの状態がチェックされます。WaitOne が true を返す場合、当該のイベントは現在シグナル状態になっています。この場合、ThreadRun メソッドから制御が戻るため、ワーカー スレッドはこのメソッドの実行を終了します。
"スレッドの終了" イベントがシグナル状態になるまで、Producer.ThreadStart メソッドはキュー内に 20 の項目を維持しようとします。項目は 0 から 100 までの整数のいずれかです。consumer スレッドとプライマリ スレッドが同時にコレクションにアクセスしないようにするには、新しい項目を追加する前にコレクションをロックする必要があります。これは、lock キーワードを使用して行います。lock に渡される引数は、ICollection インターフェイスを介して公開される SyncRoot フィールドです。このフィールドは、スレッド アクセスを同期するために提供されている専用のフィールドです。lock に続くコード ブロックに含まれるすべての命令に対して、コレクションへの排他アクセスが付与されます。producer がキューに新しい項目を追加するたびに、"新しい項目" イベントの Set メソッドが呼び出されます。これは consumer スレッドに対し、中断状態から抜け出して新しい項目を処理するように通知します。
Consumer オブジェクトも ThreadRun というメソッドを定義します。producer バージョンの ThreadRun と同様に、このメソッドは Main メソッドが作成するワーカー スレッドによって実行されます。ただし、consumer バージョンの ThreadStart は 2 つのイベントに応答する必要があります。Consumer.ThreadRun メソッドは次のようになります。
// Consumer.ThreadRun
public void ThreadRun()
{
int count = 0;
while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
{
lock (((ICollection)_queue).SyncRoot)
{
int item = _queue.Dequeue();
}
count++;
}
Console.WriteLine("Consumer Thread: consumed {0} items", count);
}
このメソッドは WaitAny を使用して、指定した配列内の待機ハンドルのいずれかがシグナル状態になるまで consumer スレッドをブロックします。この場合、配列には 2 つのハンドルがあり、1 つはワーカー スレッドを終了するためのもので、もう 1 つは新しい項目がコレクションに追加されたことを示すためのものです。WaitAny は、シグナル状態になったイベントのインデックスを返します。"新しい項目" イベントは配列の最初にあるため、ゼロというインデックスは新しい項目を示します。この場合、1 のインデックスをチェックします。このインデックスは "スレッドの終了" イベントを示し、このメソッドが項目を使用し続けているかどうかを確認するために使用されます。"新しい項目" イベントがシグナル状態になっている場合、lock を使用してコレクションへの排他アクセスを取得し、新しい項目を使用できます。この例では、何千という項目を生成したり使用したりするため、使用される各項目は表示されません。その代わりに Main を使用してキューの内容を定期的に表示できます。これについては次に説明します。
Main メソッドでは、次に示すように、まず、その内容が生成および使用されるキューと、既に説明した SyncEvents のインスタンスを作成します。
Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();
次に Main は、ワーカー スレッドで使用する Producer オブジェクトと Consumer オブジェクトを構成します。ただし、このステップでは実際のワーカー スレッドを作成したり起動したりすることはありません。
Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);
キューおよび同期イベント オブジェクトは、Consumer スレッドと Producer スレッドの両方にコンストラクタ引数として渡されます。これによって、両方のオブジェクトにはそれぞれのタスクを実行するうえで必要な共有リソースが提供されます。次に、各オブジェクトの ThreadRun メソッドを引数として使用して、2 つの新しい Thread オブジェクトが作成されます。各ワーカー スレッドは開始時に、スレッドのエントリ ポイントとしてこの引数を使用します。
次に、Main は次に示すように Start メソッドを呼び出して、2 つのワーカー スレッドを起動します。
producerThread.Start();
consumerThread.Start();
この時点で、2 つの新しいワーカー スレッドが作成され、現在 Main メソッドを実行しているプライマリ スレッドとは無関係に、非同期の実行が開始されます。実際、次に Main は、Sleep メソッドを呼び出してプライマリ スレッドを中断します。このメソッドは、指定した時間 (ミリ秒単位) だけ現在実行中のスレッドを中断します。指定した時間が経過すると、Main が再度アクティブになり、この時点でキューの内容を表示します。Main は次に示すように、この処理を 4 回繰り返します。
for (int i=0; i<4; i++)
{
Thread.Sleep(2500);
ShowQueueContents(queue);
}
最後に、Main は、"スレッドの終了" イベントの Set メソッドを呼び出して、ワーカー スレッドに対し終了を通知します。次に、各ワーカー スレッドで Join メソッドを呼び出し、各ワーカー スレッドがイベントに応答して終了するまでプライマリ スレッドをブロックします。
スレッドの同期の最後の例として、ShowQueueContents メソッドについて説明します。このメソッドは、consumer スレッドや producer スレッドと同様、lock を使用してキューへの排他アクセスを取得します。ただしこの場合、ShowQueueContents はすべてのコレクションを列挙するので、排他アクセスがとりわけ重要になります。コレクションの列挙ではコレクション全体の内容を走査する必要があるため、非同期操作によるデータ破損が特に生じやすくなるからです。
ShowQueueContents は Main によって呼び出されるため、このメソッドがプライマリ スレッドによって実行されることに注意してください。つまり、このメソッドが項目のキューへの排他アクセスを取得した時点で、実際に producer スレッドと consumer スレッドの両方がキューへのアクセスをブロックされることになります。ShowQueueContents は、次のようにキューをロックして内容を列挙します。
private static void ShowQueueContents(Queue<int> q)
{
lock (((ICollection)q).SyncRoot)
{
foreach (int item in q)
{
Console.Write("{0} ", item);
}
}
Console.WriteLine();
}
完全な例を次に示します。
使用例
using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;
public class SyncEvents
{
public SyncEvents()
{
_newItemEvent = new AutoResetEvent(false);
_exitThreadEvent = new ManualResetEvent(false);
_eventArray = new WaitHandle[2];
_eventArray[0] = _newItemEvent;
_eventArray[1] = _exitThreadEvent;
}
public EventWaitHandle ExitThreadEvent
{
get { return _exitThreadEvent; }
}
public EventWaitHandle NewItemEvent
{
get { return _newItemEvent; }
}
public WaitHandle[] EventArray
{
get { return _eventArray; }
}
private EventWaitHandle _newItemEvent;
private EventWaitHandle _exitThreadEvent;
private WaitHandle[] _eventArray;
}
public class Producer
{
public Producer(Queue<int> q, SyncEvents e)
{
_queue = q;
_syncEvents = e;
}
// Producer.ThreadRun
public void ThreadRun()
{
int count = 0;
Random r = new Random();
while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
{
lock (((ICollection)_queue).SyncRoot)
{
while (_queue.Count < 20)
{
_queue.Enqueue(r.Next(0,100));
_syncEvents.NewItemEvent.Set();
count++;
}
}
}
Console.WriteLine("Producer thread: produced {0} items", count);
}
private Queue<int> _queue;
private SyncEvents _syncEvents;
}
public class Consumer
{
public Consumer(Queue<int> q, SyncEvents e)
{
_queue = q;
_syncEvents = e;
}
// Consumer.ThreadRun
public void ThreadRun()
{
int count = 0;
while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
{
lock (((ICollection)_queue).SyncRoot)
{
int item = _queue.Dequeue();
}
count++;
}
Console.WriteLine("Consumer Thread: consumed {0} items", count);
}
private Queue<int> _queue;
private SyncEvents _syncEvents;
}
public class ThreadSyncSample
{
private static void ShowQueueContents(Queue<int> q)
{
lock (((ICollection)q).SyncRoot)
{
foreach (int item in q)
{
Console.Write("{0} ", item);
}
}
Console.WriteLine();
}
static void Main()
{
Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();
Console.WriteLine("Configuring worker threads...");
Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);
Console.WriteLine("Launching producer and consumer threads...");
producerThread.Start();
consumerThread.Start();
for (int i=0; i<4; i++)
{
Thread.Sleep(2500);
ShowQueueContents(queue);
}
Console.WriteLine("Signaling threads to terminate...");
syncEvents.ExitThreadEvent.Set();
producerThread.Join();
consumerThread.Join();
}
}
Configuring worker threads...
Launching producer and consumer threads...
22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87
79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59
0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21
38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92
Signalling threads to terminate...
Consumer Thread: consumed 1053771 items
Producer thread: produced 1053791 items