SO WIRD'S GEMACHT: Synchronisieren eines Producer- und Consumerthreads (C#- und Visual Basic)

Dieser Dokumentation für die Vorschau nur ist und in späteren Versionen geändert. Leere Themen wurden als Platzhalter eingefügt.]

Das folgende Beispiel veranschaulicht Threadsynchronisierung zwischen dem primären Thread und zwei Arbeitsthreads mithilfe der lock (c#) oder SyncLock (Visual Basic)-Schlüsselwort, und die AutoResetEvent und ManualResetEvent Klassen. Weitere Informationen finden Sie unter Lock-Anweisung (C#-Referenz) oder SyncLock-Anweisung.

Im Beispiel werden zwei Hilfs- bzw. Arbeitsthreads erstellt. Ein Thread erstellt Elemente und speichert diese in einer generischen Warteschlange, die nicht threadsicher ist. Weitere Informationen finden Sie unter Queue<T>. Der andere Thread verwendet Elemente aus dieser Warteschlange. Darüber hinaus zeigt der primäre Thread in regelmäßigen Abständen den Inhalt der Warteschlange, sodass drei Threads auf die Warteschlange zugreifen. Die lock oder SyncLock Schlüsselwort wird zum Synchronisieren der Zugriff auf die Warteschlange um sicherzustellen, dass der Status der Warteschlange nicht beschädigt ist.

Zusätzlich zu den gleichzeitigen Zugriff, die die lock oder SyncLock Schlüsselwort verwendet einfach verhindern, wird zusätzliche Synchronisierung von zwei Ereignisobjekte bereitgestellt. Eine dient Arbeitsthreads beenden signalisieren und das andere wird vom Producer-Thread verwendet, um dem Consumerthread zu signalisieren, wenn ein neues Element zur Warteschlange hinzugefügt wurde. Diese zwei Ereignisobjekte werden in einer Klasse mit dem Namen SyncEvents gekapselt. Dadurch werden die Ereignisse an die Objekte übergeben, die den Consumer- und Producer-Threads einfach darstellen. SyncEvents-Klasse wird wie folgt definiert:

                        Public
                        Class SyncEvents
    SubNew()
        _newItemEvent = New AutoResetEvent(False)
        _exitThreadEvent = New ManualResetEvent(False)
        _eventArray(0) = _newItemEvent
        _eventArray(1) = _exitThreadEvent
    EndSubPublicReadOnlyProperty ExitThreadEvent() As EventWaitHandle
        GetReturn _exitThreadEvent
        EndGetEndPropertyPublicReadOnlyProperty NewItemEvent() As EventWaitHandle
        GetReturn _newItemEvent
        EndGetEndPropertyPublicReadOnlyProperty EventArray() As WaitHandle()
        GetReturn _eventArray
        EndGetEndPropertyPrivate _newItemEvent As EventWaitHandle
    Private _exitThreadEvent As EventWaitHandle
    Private _eventArray(0 To 1) As WaitHandle
EndClass
                        public
                        class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}

Die AutoResetEvent-Klasse wird für die "Neues Element" verwendetEreignis da dieses Ereignis automatisch jedes Mal zurückgesetzt, der der Consumerthread auf dieses Ereignis reagiert werden soll. Alternativ wird die Klasse ManualResetEvent für "Beenden" verwendetEreignis da Sie mehrere Threads zu reagieren, wenn dieses Ereignis signalisiert wird. Wenn AutoResetEvent stattdessen verwendet wird, würde das Ereignis nach nur ein Thread auf das Ereignis reagiert auf einen nicht signalisierten Zustand zurückgesetzt. Der andere Thread würde nicht reagieren, und in diesem Fall würde nicht beenden.

SyncEvents-Klasse erstellt zwei Ereignisse und speichert diese in zwei unterschiedlichen Formularen: als EventWaitHandle, die die Basisklasse für AutoResetEvent und ManualResetEventund in einem Array basierend auf WaitHandle ist. Wie Sie in der Consumer-Thread-Diskussion sehen werden, ist dieses Array notwendig, damit der Verbraucherthread entweder Ereignis reagieren kann.

Die Consumer und Producer-Threads werden durch Klassen, die mit dem Namen Consumer und Producer dargestellt. Beide dieser Definieren einer Methode namens „ ThreadRun. Diese Methoden werden als die Einstiegspunkte für die Arbeitsthreads verwendet, die die Main-Methode erstellt.

Die definierte von der Klasse ThreadRunProducer-Methode ähnelt dies:

                        ' Producer.ThreadRun
                        Public
                        Sub ThreadRun()
    Dim count AsInteger = 0
    Dim r AsNew Random()

    DoWhileNot _syncEvents.ExitThreadEvent.WaitOne(0, False)
        SyncLockCType(_queue, ICollection).SyncRoot
            DoWhile _queue.Count < 20
                _queue.Enqueue(r.Next(0, 100))
                _syncEvents.NewItemEvent.Set()
                count += 1
            LoopEndSyncLockLoop
    Console.WriteLine("Producer thread: produced {0} items", count)
EndSub
                        // Producer.ThreadRun
                        public
                        void ThreadRun()
{
    int count = 0;
    Random r = new Random();
    while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            while (_queue.Count < 20)
            {
                _queue.Enqueue(r.Next(0,100));
                _syncEvents.NewItemEvent.Set();
                count++;
            }
        }
    }
    Console.WriteLine("Producer thread: produced {0} items", count);
}

Diese Methode durchläuft, bis die "Thread beenden"Ereignis wird signalisiert. Status der dieses Ereignis wird mithilfe der definiert durch die Klasse WaitOneExitThreadEvent -Eigenschaft mit der Methode SyncEvents getestet. In diesem Fall wird der Zustand des Ereignisses überprüft, ohne den aktuellen Thread blockiert, da das erste Argument wird verwendet mit WaitOne NULL ist, der angibt, dass die-Methode sofort zurückgeben soll. Wenn WaitOnetrue zurückgibt, wird das betreffende Ereignis gerade signalisiert. Wenn also die ThreadRun-Methode zurückgibt, hat die Wirkung von den Arbeitsthread Ausführen dieser Methode beendet.

Bis die "Thread beenden"Ereignis wird signalisiert, die Producer.ThreadStart-Methode versucht, 20 Elemente in der Warteschlange zu behalten. Ein Element ist nur eine ganze Zahl zwischen 0 und 100. Die Auflistung muss gesperrt werden, bevor Sie zu verhindern, dass der Consumer und der primären Threads Zugriff auf die Auflistung zur gleichen Zeit neue Elemente hinzufügen. Dies erfolgt mithilfe der lock (c#) oder SyncLock (Visual Basic)-Schlüsselwort. Das lock oder SyncLock übergebene Argument ist das SyncRoot Feld durch die ICollection-Schnittstelle verfügbar gemacht werden. Dieses Feld dient speziell zum Synchronisieren von Threads den Zugriff. Exklusiver Zugriff auf die Auflistung ist für alle Anweisungen, die im Code enthalten sind erteilt folgenden lock oder SyncLock blockieren. Für jedes neue Element, das der Producer der Warteschlange, einen Aufruf der Set-Methode für die "Neues Element" hinzugefügtEreignis erfolgt. Dies signalisiert dem Consumerthread aus den angehaltenen Zustand um das neue Element zu verarbeiten.

Das Consumer -Objekt definiert auch eine Methode namens „ ThreadRun. Wie der Producer-Version von ThreadRunwird diese Methode von einem von der Main-Methode erstellten Arbeitsthread ausgeführt. Allerdings muss die Consumer-Version von ThreadRun auf zwei Ereignisse reagieren. Consumer.ThreadRun-Methode ähnelt dies:

                        ' Consumer.ThreadRun
                        Public
                        Sub ThreadRun()
    Dim count AsInteger = 0

    DoWhile WaitHandle.WaitAny(_syncEvents.EventArray) <> 1
        SyncLockCType(_queue, ICollection).SyncRoot
            Dim item AsInteger = _queue.Dequeue()
        EndSyncLock
        count += 1
    Loop
    Console.WriteLine("Consumer Thread: consumed {0} items", count)
EndSub
                        // Consumer.ThreadRun
                        public
                        void ThreadRun()
{
    int count = 0;
    while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            int item = _queue.Dequeue();
        }
        count++;
    } 
    Console.WriteLine("Consumer Thread: consumed {0} items", count);
}

Diese Methode verwendet WaitAny dem Consumerthread blockiert, bis eines der Wait-Handles im bereitgestellten Array signalisiert wird. In diesem Fall sind zwei Handles im Array, eine Worker-Threads beendet und eine, die angibt, dass ein neues Element der Auflistung hinzugefügt wurde. WaitAny gibt den Index des Ereignisses, das signalisiert wurde zurück. "Neues Element"Ereignis ist der erste im Array, so dass ein Index 0 (null) ein neues Element angibt. Überprüfen Sie in diesem Fall für einen Index von 1, womit den "Thread beenden"Ereignis und dieses wird verwendet, um zu bestimmen, ob diese Methode fortgesetzt wird, um Elemente zu nutzen. Wenn die "Neues Element"Ereignis wurde signalisiert, Sie erhalten Sie exklusiven Zugriff auf die Auflistung mit lock oder SyncLock und können das neue Element nutzen. Da in diesem Beispiel wird erzeugt und Tausende von Elementen in Anspruch nimmt, werden Sie nicht jedes Element angezeigt verbraucht. Verwenden Sie Main stattdessen regelmäßig den Inhalt der Warteschlange anzuzeigen, wie gezeigt wird.

Main -Methode beginnt durch Erstellen der Warteschlange, deren Inhalt produziert und verbraucht werden, und eine Instanz von SyncEvents, die Sie zuvor sich ansehen:

                        Dim queue AsNew Queue(OfInteger)
Dim syncEvents AsNew SyncEvents()
Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();

Main konfiguriert anschließend die Producer und Consumer Objekte für die Verwendung mit Arbeitsthreads. Diese Schritt nicht, jedoch erstellen oder die tatsächlichen Arbeitsthreads zu starten:

                        Dim producer AsNew Producer(queue, syncEvents)
Dim consumer AsNew Consumer(queue, syncEvents)
Dim producerThread AsNew Thread(AddressOf producer.ThreadRun)
Dim consumerThread AsNew Thread(AddressOf consumer.ThreadRun)
Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);

Beachten Sie, dass die Warteschlange und das Ereignisobjekt Synchronisierung der Consumer und Producer Threads als Konstruktorargumente übergeben werden. Dadurch werden beide Objekte, die die freigegebenen Ressourcen, die Sie Ihren jeweiligen Aufgaben ausführen müssen. Zwei neue Thread Objekte werden dann erstellt, mithilfe der ThreadRun-Methode für jedes Objekt als Argument. Jeder Arbeitsthread, wenn er gestartet wird, wird dieses Argument als Einstiegspunkt für den Thread verwenden.

Nächste Main startet die beiden Arbeitsthreads mit einem Aufruf der Start-Methode, wie diese:

producerThread.Start()
consumerThread.Start()
producerThread.Start();
consumerThread.Start();

Zu diesem Zeitpunkt die beiden neuen Arbeitsthreads werden erstellt und beginnen asynchrone Ausführung unabhängig von der primären Thread, der gegenwärtig die Main-Methode ausgeführt wird. Der nächste Aspekt Main in der Tat verfügt ist den primären Thread mit einem Aufruf der Methode Sleep unterbrechen. Die Methode unterbricht den derzeit ausgeführten Thread für eine bestimmte Anzahl von Millisekunden. Sobald dieses Intervalls wird Main reaktiviert, an welche Stelle Sie den Inhalt der Warteschlange anzeigt. Main wiederholt diese vier Iterationen, wie diese:

                        For i AsInteger = 0 To 3
    Thread.Sleep(2500)
    ShowQueueContents(queue)
Next
                        for (int i=0; i<4; i++)
{
    Thread.Sleep(2500);
    ShowQueueContents(queue);
}

Main schließlich signalisiert Arbeitsthreads beenden durch Aufrufen der Set-Methode der "Thread beenden"Ereignis und dann auf Aufrufe die Join-Methode für jeden Arbeitsthread für den primären Thread blockiert, bis jeder Arbeitsthread auf das Ereignis zu reagieren und wird beendet.

Es ist ein letzten Beispiel für die Threadsynchronisierung: ShowQueueContents-Methode. Diese Methode, wie die Consumer und Producer-Threads werden lock (c#) oder SyncLock (Visual Basic) für den exklusiven Zugriff auf die Warteschlange verwendet. In diesem Fall ist jedoch exklusiver Zugriff sehr wichtig, da ShowQueueContents all der Auflistung auflistet. Zum Aufzählen über eine Auflistung ist eine Operation, die besonders anfällig für Beschädigungen von Daten durch asynchrone Vorgänge ist, weil er umfasst den Inhalt der gesamten Auflistung durchlaufen ist.

Beachten Sie, ShowQueueContents, da Sie von Main aufgerufen wird vom primären Thread ausgeführt wird. Dies bedeutet, dass diese Methode, wenn er exklusiven Zugriff auf die Warteschlange Element erreicht die Producer und Consumer-Threads von Zugriff auf die Warteschlange blockiert. ShowQueueContents sperrt die Warteschlange und listet den Inhalt:

                        Private
                        Sub ShowQueueContents(ByVal q As Queue(OfInteger))
    SyncLockCType(q, ICollection).SyncRoot
        ForEach item AsIntegerIn q
            Console.Write("{0} ", item)
        NextEndSyncLock
    Console.WriteLine()
EndSub
                        private
                        static
                        void ShowQueueContents(Queue<int> q)
{
    lock (((ICollection)q).SyncRoot)
    {
        foreach (int item in q)
        {
            Console.Write("{0} ", item);
        }
    }
    Console.WriteLine();
}

Das vollständige Beispiel folgt.

Beispiel

                        Imports System.Threading

Module Module1
    PublicClass SyncEvents
        SubNew()
            _newItemEvent = New AutoResetEvent(False)
            _exitThreadEvent = New ManualResetEvent(False)
            _eventArray(0) = _newItemEvent
            _eventArray(1) = _exitThreadEvent
        EndSubPublicReadOnlyProperty ExitThreadEvent() As EventWaitHandle
            GetReturn _exitThreadEvent
            EndGetEndPropertyPublicReadOnlyProperty NewItemEvent() As EventWaitHandle
            GetReturn _newItemEvent
            EndGetEndPropertyPublicReadOnlyProperty EventArray() As WaitHandle()
            GetReturn _eventArray
            EndGetEndPropertyPrivate _newItemEvent As EventWaitHandle
        Private _exitThreadEvent As EventWaitHandle
        Private _eventArray(0 To 1) As WaitHandle
    EndClassPublicClass Producer
        SubNew(ByVal q As Queue(OfInteger), ByVal e As SyncEvents)
            _queue = q
            _syncEvents = e
        EndSub
        ' Producer.ThreadRunPublicSub ThreadRun()
            Dim count AsInteger = 0
            Dim r AsNew Random()

            DoWhileNot _syncEvents.ExitThreadEvent.WaitOne(0, False)
                SyncLockCType(_queue, ICollection).SyncRoot
                    DoWhile _queue.Count < 20
                        _queue.Enqueue(r.Next(0, 100))
                        _syncEvents.NewItemEvent.Set()
                        count += 1
                    LoopEndSyncLockLoop
            Console.WriteLine("Producer thread: produced {0} items", count)
        EndSubPrivate _queue As Queue(OfInteger)
        Private _syncEvents As SyncEvents
    EndClassPublicClass Consumer
        SubNew(ByVal q As Queue(OfInteger), ByVal e As SyncEvents)
            _queue = q
            _syncEvents = e
        EndSub
        ' Consumer.ThreadRunPublicSub ThreadRun()
            Dim count AsInteger = 0

            DoWhile WaitHandle.WaitAny(_syncEvents.EventArray) <> 1
                SyncLockCType(_queue, ICollection).SyncRoot
                    Dim item AsInteger = _queue.Dequeue()
                EndSyncLock
                count += 1
            Loop
            Console.WriteLine("Consumer Thread: consumed {0} items", count)
        EndSubPrivate _queue As Queue(OfInteger)
        Private _syncEvents As SyncEvents
    EndClassPrivateSub ShowQueueContents(ByVal q As Queue(OfInteger))
        SyncLockCType(q, ICollection).SyncRoot
            ForEach item AsIntegerIn q
                Console.Write("{0} ", item)
            NextEndSyncLock
        Console.WriteLine()
    EndSubSub Main()
        Dim queue AsNew Queue(OfInteger)
        Dim syncEvents AsNew SyncEvents()

        Console.WriteLine("Configuring worker threads...")
        Dim producer AsNew Producer(queue, syncEvents)
        Dim consumer AsNew Consumer(queue, syncEvents)
        Dim producerThread AsNew Thread(AddressOf producer.ThreadRun)
        Dim consumerThread AsNew Thread(AddressOf consumer.ThreadRun)

        Console.WriteLine("Launching producer and consumer threads...")
        producerThread.Start()
        consumerThread.Start()

        For i AsInteger = 0 To 3
            Thread.Sleep(2500)
            ShowQueueContents(queue)
        Next

        Console.WriteLine("Signaling threads to terminate...")
        syncEvents.ExitThreadEvent.Set()

        producerThread.Join()
        consumerThread.Join()
    EndSubEndModule
                        using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;

publicclass SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}
publicclass Producer 
{
    public Producer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Producer.ThreadRunpublicvoid ThreadRun()
    {
        int count = 0;
        Random r = new Random();
        while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                while (_queue.Count < 20)
                {
                    _queue.Enqueue(r.Next(0,100));
                    _syncEvents.NewItemEvent.Set();
                    count++;
                }
            }
        }
        Console.WriteLine("Producer thread: produced {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

publicclass Consumer
{
    public Consumer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Consumer.ThreadRunpublicvoid ThreadRun()
    {
        int count = 0;
        while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                int item = _queue.Dequeue();
            }
            count++;
        } 
        Console.WriteLine("Consumer Thread: consumed {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

publicclass ThreadSyncSample
{
    privatestaticvoid ShowQueueContents(Queue<int> q)
    {
        lock (((ICollection)q).SyncRoot)
        {
            foreach (int item in q)
            {
                Console.Write("{0} ", item);
            }
        }
        Console.WriteLine();
    }

    staticvoid Main()
    {
        Queue<int> queue = new Queue<int>();
        SyncEvents syncEvents = new SyncEvents();

        Console.WriteLine("Configuring worker threads...");
        Producer producer = new Producer(queue, syncEvents);
        Consumer consumer = new Consumer(queue, syncEvents);
        Thread producerThread = new Thread(producer.ThreadRun);
        Thread consumerThread = new Thread(consumer.ThreadRun);

        Console.WriteLine("Launching producer and consumer threads...");        
        producerThread.Start();
        consumerThread.Start();

        for (int i=0; i<4; i++)
        {
            Thread.Sleep(2500);
            ShowQueueContents(queue);
        }

        Console.WriteLine("Signaling threads to terminate...");
        syncEvents.ExitThreadEvent.Set();

        producerThread.Join();
        consumerThread.Join();
    }

}
Configuring worker threads... Launching producer and consumer threads... 22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87 79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59 0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21 38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92 Signalling threads to terminate... Consumer Thread: consumed 1053771 items Producer thread: produced 1053791 items 

Siehe auch

Referenz

Threadsynchronisierung (C#- und Visual Basic)

Thread

SyncLock-Anweisung

Lock-Anweisung (C#-Referenz)

AutoResetEvent

ManualResetEvent

Set

Join

WaitOne

WaitAll

Queue

ICollection

Start

Sleep

WaitHandle

EventWaitHandle