共用方式為


HOW TO:同步處理產生者和消費者執行緒 (C# 程式設計手冊)

更新:2007 年 11 月

下列程式碼範例示範了使用 lock 關鍵字以及 AutoResetEventManualResetEvent 類別,進行主執行緒和兩個背景工作執行緒 (Worker Thread) 之間的執行緒同步處理。如需詳細資訊,請參閱 lock 陳述式 (C# 參考)

這個範例會建立兩個輔助或背景工作執行緒。其中一個執行緒會產生項目並將其存放在不是安全執行緒的泛型佇列中。如需詳細資訊,請參閱 Queue<T>。另一個執行緒會使用這個佇列中的項目。此外,主執行緒也會定期顯示佇列的內容,因此會有三個執行緒存取佇列。lock 關建字是用來同步處理對佇列的存取,以確保佇列的狀態不會損毀。

使用 lock 關鍵字除了單純避免同時進行存取以外,兩種事件物件也提供額外的同步處理功能。其中一種物件是用來發出結束背景工作執行緒的信號,產生者執行緒則使用另一種物件在新物件加入佇列時,發出信號通知消費者執行緒。這兩種事件物件會封裝在名為 SyncEvents 的類別中。這樣可以輕鬆地將事件傳遞至代表消費者和產生者執行緒的物件。SyncEvents 類別的定義如下:

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}

AutoResetEvent 類別是用在 "new item" 事件上,因為想要在每次消費者執行緒回應這個事件時就自動重設這項事件。此外,ManualResetEvent 類別也會用在 "exit" 事件上,因為想要讓多個執行緒在這個事件收到信號時做出回應。如果使用 AutoResetEvent 取代的話,在只有一個執行緒回應事件後,事件就會還原成未收到信號的狀態。其他的執行緒並不會回應,所以在上述情形中執行緒會無法結束。

SyncEvents 類別會建立兩個事件並使用兩種不同格式存放:當做 EventWaitHandle (是 AutoResetEventManualResetEvent 的基底類別),以及根據 WaitHandle 存放在陣列中。您會在消費者執行緒的討論中發現,需要使用這個陣列才能讓消費者執行緒回應每個事件。

消費者和產生者執行緒是由名為 Consumer 和 Producer 的類別代表。這都會定義名為 ThreadRun 的方法。這些方法是用來當做 Main 方法建立之背景工作執行緒的進入點。

Producer 類別定義的 ThreadRun 方法類似於:

// Producer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    Random r = new Random();
    while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            while (_queue.Count < 20)
            {
                _queue.Enqueue(r.Next(0,100));
                _syncEvents.NewItemEvent.Set();
                count++;
            }
        }
    }
    Console.WriteLine("Producer thread: produced {0} items", count);
}

這個方法會一直迴圈直到 "exit thread" 事件變成收到信號的狀態為止。這個事件的狀態會藉由 SyncEvents 類別定義的 ExitThreadEvent 屬性,搭配 WaitOne 方法進行測試。在上述情形中,因為搭配 WaitOne 使用的第一個引數為零,表示方法應該會立即傳回,所以會在不封鎖目前執行緒的情況下檢查事件狀態。如果 WaitOne 傳回 true,則討論中的事件目前就會是收到信號的狀態。如果是的話,ThreadRun 方法就會回傳導致結束執行此方法的背景工作執行緒。

在 "exit thread" 事件收到信號之前,Producer.ThreadStart 方法會一直嘗試在佇列中保持 20 個項目。項目只是介於 0 到 100 的整數。在加入新項目之前必須先鎖定集合,以避免消費者和主執行緒同時存取集合。這可以使用 lock 關鍵字來完成。傳遞至 lock 的引數是透過 ICollection 介面公開 (Expose) 的 SyncRoot 欄位。這個欄位是特別為同步處理執行緒存取而提供。對集合的專有存取權,會授與在 lock 之後的程式碼區塊中所包含的任何指令。針對產生者加入佇列的每個新項目,都會呼叫 "new item" 事件上的 Set 方法。這會對消費者執行緒發出脫離暫停狀態的信號以便處理新項目。

Consumer 物件也會定義名為 ThreadRun 的方法。就像是 ThreadRun 的產生者版本,會由 Main 方法建立之背景工作執行緒執行這個方法。然而,ThreadStart 的消費者版本就必須回應兩個事件。Consumer.ThreadRun 方法類似於:

// Consumer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            int item = _queue.Dequeue();
        }
        count++;
    } 
    Console.WriteLine("Consumer Thread: consumed {0} items", count);
}

這個方法會使用 WaitAny 封鎖消費者執行緒,直到所提供之陣列中的任何等候處理常式變成收到信號的狀態為止。在上述情形中陣列內有兩個處理常式,其中一個會結束背景工作執行緒,另一個則表示集合中已加入新項目。WaitAny 會傳回變成收到信號之狀態的事件索引。"new item" 事件是陣列中的第一個項目,所以索引值零表示新項目。在上述情形中請檢查索引值 1,其表示用來判斷此方法是否要繼續使用項目的 "exit thread" 事件。如果 "new item" 事件收到信號,就會藉由 lock 取得集合的專有存取權然後使用新項目。因為這個範例會產生並使用數以千計的項目,所以不會顯示每個使用過的項目。而是使用 Main 定期顯示佇列的內容 (之後將會示範)。

Main 方法就會開始建立將產生和使用其內容的佇列,以及稍早查看過的 SyncEvents 執行個體:

Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();

下一步 Main 會設定 Producer 和 Consumer 物件以搭配背景工作執行緒使用。然而,這個步驟不會建立或啟動實際的背景工作執行緒:

Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);

請注意佇列和同步處理事件物件會同時傳遞給 Consumer 和 Producer 執行緒當做建構函式引數。這會提供所有物件在執行相對工作時所需的共用資源。然後就會建立兩個新的 Thread 物件,並使用 ThreadRun 方法當做每個物件的引數。每個背景工作執行緒在啟動時,將使用這個引數當做執行緒的進入點。

下一步 Main 會搭配呼叫 Start 方法啟動兩個背景工作執行緒,如下所示:

producerThread.Start();
consumerThread.Start();

此時已建立兩個新的背景工作執行緒並開始同步處理執行,不受目前執行 Main 方法的主執行緒影響。事實上,Main 執行的下一步是呼叫 Sleep 方法暫停主執行緒。方法會將目前執行的執行緒暫停指定之毫秒數。當經過此間隔後就會重新啟動 Main,此時就會顯示佇列的內容。Main 會反覆執行這個步驟四次,如下所示:

for (int i=0; i<4; i++)
{
    Thread.Sleep(2500);
    ShowQueueContents(queue);
}

最後 Main 會藉由叫用 "exit thread" 事件的 Set 方法發出信號以結束背景工作執行緒,然後呼叫每個背景工作執行緒的 Join 方法以封鎖主執行緒,直到每個背景工作執行緒回應事件然後結束為止。

有個執行緒同步處理的最後範例:ShowQueueContents 方法。這個方法就像消費者和產生者執行緒,會使用 lock 取得佇列的專有存取權。然而在上述情形中專有存取權特別的重要,因為 ShowQueueContents 會列舉所有集合。列舉集合是特別容易經由非同步作業而發生資料損毀的作業,因為其牽涉到跨越整個集合的內容。

請注意 ShowQueueContents,因為是由 Main 呼叫,所以是由主執行緒執行。這表示當這個方法達到項目佇列的專有存取權時,會同時封鎖產生者和消費者執行緒對佇列的存取權。ShowQueueContents 會鎖定佇列並列舉內容:

private static void ShowQueueContents(Queue<int> q)
{
    lock (((ICollection)q).SyncRoot)
    {
        foreach (int item in q)
        {
            Console.Write("{0} ", item);
        }
    }
    Console.WriteLine();
}

完整的程式碼範例如下所示。

範例

using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}
public class Producer 
{
    public Producer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Producer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        Random r = new Random();
        while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                while (_queue.Count < 20)
                {
                    _queue.Enqueue(r.Next(0,100));
                    _syncEvents.NewItemEvent.Set();
                    count++;
                }
            }
        }
        Console.WriteLine("Producer thread: produced {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class Consumer
{
    public Consumer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Consumer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                int item = _queue.Dequeue();
            }
            count++;
        } 
        Console.WriteLine("Consumer Thread: consumed {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class ThreadSyncSample
{
    private static void ShowQueueContents(Queue<int> q)
    {
        lock (((ICollection)q).SyncRoot)
        {
            foreach (int item in q)
            {
                Console.Write("{0} ", item);
            }
        }
        Console.WriteLine();
    }

    static void Main()
    {
        Queue<int> queue = new Queue<int>();
        SyncEvents syncEvents = new SyncEvents();

        Console.WriteLine("Configuring worker threads...");
        Producer producer = new Producer(queue, syncEvents);
        Consumer consumer = new Consumer(queue, syncEvents);
        Thread producerThread = new Thread(producer.ThreadRun);
        Thread consumerThread = new Thread(consumer.ThreadRun);

        Console.WriteLine("Launching producer and consumer threads...");        
        producerThread.Start();
        consumerThread.Start();

        for (int i=0; i<4; i++)
        {
            Thread.Sleep(2500);
            ShowQueueContents(queue);
        }

        Console.WriteLine("Signaling threads to terminate...");
        syncEvents.ExitThreadEvent.Set();

        producerThread.Join();
        consumerThread.Join();
    }

}
Configuring worker threads...
Launching producer and consumer threads...
22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87
79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59
0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21
38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92
Signalling threads to terminate...
Consumer Thread: consumed 1053771 items
Producer thread: produced 1053791 items

請參閱

工作

監視同步處理技術範例

等候同步處理技術範例

概念

C# 程式設計手冊

參考

執行緒同步處理 (C# 程式設計手冊)

Thread

lock 陳述式 (C# 參考)

AutoResetEvent

ManualResetEvent

Set

Join

WaitOne

WaitAll

Queue

ICollection

Start

Sleep

WaitHandle

EventWaitHandle