Поделиться через


Практическое руководство. Синхронизация потока-производителя и потока-потребителя (Руководство по программированию на C#)

Обновлен: Ноябрь 2007

В следующем примере демонстрируется синхронизация потоков между основным потоком и двумя рабочими потоками при помощи ключевого слова lock и классов AutoResetEvent и ManualResetEvent. Дополнительные сведения см. в разделе Оператор lock (Справочник по C#).

В этом примере создаются два дополнительных (то есть рабочих) потока. Один поток производит элементы и сохраняет их в универсальной очереди, не являющейся потокобезопасной. Дополнительные сведения см. в разделе Queue<T>. Другой поток потребляет элементы из этой очереди. Кроме того, главный поток периодически отображает содержимое очереди, то есть к очереди получают доступ три потока. Ключевое слово lock используется для синхронизации доступа к потоку, чтобы избежать повреждения состояния очереди.

Помимо запрета одновременного доступа с помощью ключевого слова lock, два объекта событий обеспечивают дополнительную синхронизацию. Один из них используется для передачи рабочим потокам команды завершения работы, другой используется потоком-производителем для того, чтобы сообщать потоку-потребителю о добавлении в очередь нового элемента. Эти два объекта событий инкапсулированы в класс SyncEvents. Это позволяет событиям легко передавать объекты, представляющие поток-потребитель и поток-производитель. Класс SyncEvents определяется следующим образом.

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}

Класс AutoResetEvent используется для события "new item", поскольку нужно автоматические выполнять сброс этого события каждый раз, когда поток-потребитель отвечает на это событие. В другом случае, класс ManualResetEvent используется для события "exit", поскольку при создании этого события нужен ответ нескольких потоков. Если вместо этого использовался класс AutoResetEvent, это событие будет возвращаться в выключенное состояние сразу после ответа одного потока. Другие потоки не ответят, и, в данном случае, не смогут завершить работу.

Класс SyncEvents создает два события и хранит их в двух разных формах: как EventWaitHandle, (база для AutoResetEvent и ManualResetEvent) и как массив на базе WaitHandle. Как мы увидим в обсуждении потока-потребителя, этот массив необходим для того, чтобы поток-потребитель мог ответить на оба события.

Поток-потребитель и поток-производитель представлены классами Consumer и Producer. Оба класса определяют метод ThreadRun. Эти методы используются в качестве точек входа для рабочих потоков, создаваемых методом Main.

Метод ThreadRun, определенный классом Producer, выглядит следующим образом:

// Producer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    Random r = new Random();
    while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            while (_queue.Count < 20)
            {
                _queue.Enqueue(r.Next(0,100));
                _syncEvents.NewItemEvent.Set();
                count++;
            }
        }
    }
    Console.WriteLine("Producer thread: produced {0} items", count);
}

Этот метод зацикливается до возникновения события "exit thread". Состояние этого события проверяется с помощью метода WaitOne путем применения свойства ExitThreadEvent, определенного классом SyncEvents. В этом случае состояние события проверяется без блокировки текущего потока, поскольку первый аргумент, используемый с WaitOne, равен нулю, что означает немедленный возврат метода. Если WaitOne возвращает значение true, то событие возникло в настоящий момент. В этом случае метод ThreadRun возвращается, то есть рабочий процесс, в котором выполняется этот метод, завершается.

До возникновения события "exit thread" метод Producer.ThreadStart пытается сохранить 20 элементов в очереди. Элемент — это целое число от 0 до 100. Коллекцию нужно заблокировать перед добавлением новых элементов, чтобы запретить одновременный доступ к коллекции потока-потребителя и главного потока. Для этого служит ключевое слово lock. Аргумент, передаваемый для lock, является полем SyncRoot, открытым посредством интерфейса ICollection. Это поле предоставлено специально для синхронизации доступа потоков. Монопольный доступ к коллекции предоставляется для всех инструкций, содержащихся в блоке кода после ключевого слова lock. Для каждого нового элемента, добавляемого производителем в очередь, осуществляется вызов метода Set для события "new item". Таким образом поток-потребитель получает команду выхода из приостановленного состояния для обработки нового элемента.

Объект Consumer также определяет метод ThreadRun. Как и версия ThreadRun в потоке-производителе, этот метод выполняется в рабочем потоке, созданном методом Main. Однако в потоке-потребителе ThreadStart должен отвечать на два события. Метод Consumer.ThreadRun выглядит следующим образом:

// Consumer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            int item = _queue.Dequeue();
        }
        count++;
    } 
    Console.WriteLine("Consumer Thread: consumed {0} items", count);
}

Данный метод использует WaitAny для блокирования потока-потребителя до тех пор, пока не будет включен любой из дескрипторов ожидания в предоставленном массиве. В этом случае в массиве есть два дескриптора: один завершает рабочие потоки, другой указывает, что в коллекцию добавлен новый элемент. WaitAny возвращает индекс происшедшего события. Событие "new item" является первым в массиве, поэтому нулевой индекс означает новый элемент. В этом случае следует проверить наличие индекса 1, который обозначает событие "exit thread" (это позволяет определить, продолжает ли этот метод потреблять элементы). Если произошло событие "new item", включается монопольный доступ к коллекции с помощью ключевого слова lock и новый элемент потребляется. В этом примере производятся и потребляются тысячи элементов, поэтому каждый потребляемы элемент не отображается. Вместо этого используется метод Main для периодического отображения содержимого очереди, как будет показано дальше.

Метод Main сначала создает очередь, содержимое которой будет производиться и потребляться, и экземпляр SyncEvents, который был рассмотрен ранее:

Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();

Затем Main настраивает объекты Producer и Consumer для использования с рабочими потоками. Однако на этом этапе еще не создаются и не запускаются рабочие потоки:

Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);

Обратите внимание, что очередь и объект события синхронизации передаются потокам Consumer и Producer как аргументы конструктора. При этом оба объекта получают необходимые общие ресурсы для выполнения своих задач. Затем создаются два новых объекта Thread с помощью метода ThreadRun в качестве аргумента для каждого объекта. Каждый рабочий поток при запуска использует этот аргумент в качестве входной точки для потока.

Затем Main запускает два рабочих потока с вызовом метода Start:

producerThread.Start();
consumerThread.Start();

На этом этапе создаются два новых рабочих потока и начинается их асинхронное выполнение независимо от основного потока, который в данное время выполняет метод Main. Затем Main приостанавливает основной поток с вызовом метода Sleep. Метод приостанавливает текущий выполняемый поток на заданное количество миллисекунд. После истечения интервала метод Main заново активируется и отображает содержимое очереди. Метод Main повторяет эти действия для четырех итераций:

for (int i=0; i<4; i++)
{
    Thread.Sleep(2500);
    ShowQueueContents(queue);
}

И, наконец, Main передает рабочим потокам команду на завершения путем вызова метода Set события "exit thread", а затем вызывает для каждого рабочего потока метод Join для блокирования основного потока до тех пор, пока каждый рабочий поток не ответит на это событие и не завершит работу.

Вот еще один пример синхронизации потоков: метод ShowQueueContents. Этот метод, как потоки-производители и потоки-потребители, использует lock для получения монопольного доступ к очереди. В этом случае монопольный доступ весьма важен, поскольку ShowQueueContents перечисляется всю коллекции. Операция перечисления коллекции подвержена повреждению данных асинхронными операциями, поскольку при этой операции осуществляется обход содержимого всей коллекции.

Обратите внимание, что метод ShowQueueContents выполняется главным потоком, поскольку его вызывает метод Main. Это означает, что данный метод при получении монопольного доступа к очереди элементов блокирует доступ потока-производителя и потока-потребителя к очереди. Метод ShowQueueContents блокирует очередь и перечисляет содержимое.

private static void ShowQueueContents(Queue<int> q)
{
    lock (((ICollection)q).SyncRoot)
    {
        foreach (int item in q)
        {
            Console.Write("{0} ", item);
        }
    }
    Console.WriteLine();
}

Полный пример кода выглядит следующим образом.

Пример

using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}
public class Producer 
{
    public Producer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Producer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        Random r = new Random();
        while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                while (_queue.Count < 20)
                {
                    _queue.Enqueue(r.Next(0,100));
                    _syncEvents.NewItemEvent.Set();
                    count++;
                }
            }
        }
        Console.WriteLine("Producer thread: produced {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class Consumer
{
    public Consumer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Consumer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                int item = _queue.Dequeue();
            }
            count++;
        } 
        Console.WriteLine("Consumer Thread: consumed {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class ThreadSyncSample
{
    private static void ShowQueueContents(Queue<int> q)
    {
        lock (((ICollection)q).SyncRoot)
        {
            foreach (int item in q)
            {
                Console.Write("{0} ", item);
            }
        }
        Console.WriteLine();
    }

    static void Main()
    {
        Queue<int> queue = new Queue<int>();
        SyncEvents syncEvents = new SyncEvents();

        Console.WriteLine("Configuring worker threads...");
        Producer producer = new Producer(queue, syncEvents);
        Consumer consumer = new Consumer(queue, syncEvents);
        Thread producerThread = new Thread(producer.ThreadRun);
        Thread consumerThread = new Thread(consumer.ThreadRun);

        Console.WriteLine("Launching producer and consumer threads...");        
        producerThread.Start();
        consumerThread.Start();

        for (int i=0; i<4; i++)
        {
            Thread.Sleep(2500);
            ShowQueueContents(queue);
        }

        Console.WriteLine("Signaling threads to terminate...");
        syncEvents.ExitThreadEvent.Set();

        producerThread.Join();
        consumerThread.Join();
    }

}
Configuring worker threads...
Launching producer and consumer threads...
22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87
79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59
0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21
38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92
Signalling threads to terminate...
Consumer Thread: consumed 1053771 items
Producer thread: produced 1053791 items

См. также

Задачи

Пример Monitor Synchronization Technology

Пример Wait Synchronization Technology

Основные понятия

Руководство по программированию в C#

Ссылки

Синхронизация потоков (Руководство по программированию на C#)

Thread

Оператор lock (Справочник по C#)

AutoResetEvent

ManualResetEvent

Set

Join

WaitOne

WaitAll

Queue

ICollection

Start

Sleep

WaitHandle

EventWaitHandle