Freigeben über


Übersicht über BlockingCollection

BlockingCollection<T> ist eine threadsichere Auflistungsklasse, die folgende Funktionen bereitstellt:

  • Eine Implementierung des Producer-Consumer-Musters

  • Gleichzeitiges Hinzufügen und Entfernen von Elementen zu bzw. in mehreren Threads

  • Optionale maximale Kapazität

  • Einfügungs- und Löschvorgänge, die blockiert werden, wenn die Auflistung leer oder voll ist

  • Einfügen und Löschen von "try"-Vorgängen, die nicht oder nur für einen bestimmten Zeitraum blockiert werden

  • Kapselt jeden Auflistungstyp, der IProducerConsumerCollection<T> implementiert.

  • Abbruch mit Abbruchtokens

  • Zwei Arten von Enumeration mit foreach (For Each in Visual Basic):

    1. Schreibgeschützte Enumeration

    2. Enumeration, die Elemente bei der Aufzählung entfernt

Unterstützung für Begrenzungs- und Blockierungsvorgänge

BlockingCollection<T> unterstützt Begrenzungs- und Blockierungsvorgänge. Durch Begrenzung können Sie die maximale Kapazität der Auflistung festlegen. Das Begrenzen ist in bestimmten Szenarien wichtig, da so die maximale Größe der Auflistung im Arbeitsspeicher gesteuert werden kann. Außerdem wird verhindert, dass sich die Producer-Threads zu weit von den Consumer-Threads entfernen.

Der Auflistung können von zahlreichen Threads oder Aufgaben gleichzeitig Elemente hinzugefügt werden. Wenn die Auflistung dann die angegebene maximale Kapazität erreicht, werden die Producer-Threads so lange blockiert, bis ein Element entfernt wird. Mehrere Consumer können gleichzeitig Elemente entfernen. Ist die Auflistung leer, werden die Consumer-Threads so lange blockiert, bis von einem Producer ein Element hinzugefügt wird. Von einem Producer-Thread kann CompleteAdding aufgerufen werden, um anzugeben, dass keine Elemente mehr hinzugefügt werden. Die IsCompleted-Eigenschaft wird von Consumern überwacht, damit keine Elemente mehr hinzugefügt werden, sobald die Auflistung leer ist. Das folgende Beispiel zeigt eine einfache BlockingCollection mit einer begrenzten Kapazität von 100. Von einer Producer-Aufgabe werden der Auflistung nur solange Elemente hinzugefügt, wie eine externe Bedingung den Wert "true" aufweist. Anschließend wird CompleteAdding aufgerufen. Die Consumeraufgabe entnimmt Elemente, bis die IsCompleted-Eigenschaft true ist.

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if numbers.Count = dataItems.BoundedCapacity
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)
            // A bounded collection. It can hold no more 
            // than 100 items at once.
            BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);


            // A simple blocking consumer with no cancellation.
            Task.Factory.StartNew(() => 
            {
                while (!dataItems.IsCompleted)
                {

                    Data data = null;
                    // Blocks if number.Count == 0
                    // IOE means that Take() was called on a completed collection.
                    // Some other thread can call CompleteAdding after we pass the
                    // IsCompleted check but before we call Take. 
                    // In this example, we can simply catch the exception since the 
                    // loop will break on the next iteration.
                    try
                    {
                        data = dataItems.Take();
                    }
                    catch (InvalidOperationException) { }

                    if (data != null)
                    {
                        Process(data);
                    }
                }
                Console.WriteLine("\r\nNo more items to take.");
            });

            // A simple blocking producer with no cancellation.
            Task.Factory.StartNew(() =>
            {
                while (moreItemsToAdd)
                {
                    Data data = GetData();
                    // Blocks if numbers.Count == dataItems.BoundedCapacity
                    dataItems.Add(data);
                }
                // Let consumer know we are done.
                dataItems.CompleteAdding();
            });

Ein vollständiges Beispiel finden Sie unter Gewusst wie: Hinzufügen und Entfernen von einzelnen Elementen zu bzw. aus einer BlockingCollection.

Zeitgesteuerte Blockierungsvorgänge

In zeitgesteuerten TryAdd- und TryTake-Blockierungsvorgängen in begrenzten Auflistungen versucht die Methode, ein Element hinzuzufügen oder zu entfernen. Ist ein Element verfügbar, wird es in die Variable eingefügt, die durch einen Verweis übergeben wurde, und von der Methode wird "true" zurückgegeben. Wird nach einem angegebenen Timeout kein Element abgerufen, wird von der Methode "false" zurückgegeben. Vom Thread können dann vor dem erneuten Zugriff auf die Auflistung andere wichtige Aufgaben erledigt werden. Ein Beispiel für den zeitgesteuerten Zugriff auf Blockierungen finden Sie im zweiten Beispiel unter Gewusst wie: Hinzufügen und Entfernen von einzelnen Elementen zu bzw. aus einer BlockingCollection.

Abbrechen von Vorgängen für das Hinzufügen und Entfernen

Vorgänge für das Hinzufügen und Entfernen werden in der Regel in einer Schleife ausgeführt. Sie können eine Schleife abbrechen, indem Sie ein CancellationToken an die TryAdd- oder TryTake-Methode übergeben und dann den Wert der IsCancellationRequested-Eigenschaft des Tokens in jeder Iteration überprüfen. Ist für den Wert "true" festgelegt, muss von Ihnen auf die Abbruchanforderung reagiert werden. Bereinigen Sie hierzu alle Ressourcen, und beenden Sie die Schleife. Das folgende Beispiel zeigt eine Überladung von TryAdd, an die ein Abbruchtoken übergeben wird, und den Code, in dem das Token verwendet wird:

Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop
do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);

Ein Beispiel für das Hinzufügen von Unterstützung für Abbruchvorgänge finden Sie im zweiten Beispiel in Gewusst wie: Hinzufügen und Entfernen von einzelnen Elementen zu bzw. aus einer BlockingCollection.

Angeben des Auflistungstyps

Beim Erstellen einer BlockingCollection<T> kann nicht nur die begrenzte Kapazität, sondern auch der zu verwendende Auflistungstyp angegeben werden. Sie können z. B. ein ConcurrentConcurrentQueue für ein FIFO-Verhalten (First-In-First-Out) oder ein ConcurrentStack<T> für ein LIFO-Verhalten (Last-In-First-Out) angeben. Sie können jede Auflistungsklasse verwenden, von der die IProducerConsumerCollection<T>-Schnittstelle implementiert wird. Der standardmäßige Auflistungstyp für BlockingCollection<T> ist ConcurrentQueue<T>. Das folgende Codebeispiel veranschaulicht, wie eine BlockingCollection<T> von Zeichenfolgen mit einer Kapazität von 1000 erstellt wird, die eine ConcurrentBag verwendet:

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );

Weitere Informationen finden Sie unter Gewusst wie: Hinzufügen von Begrenzungs- und Blockadefunktionen zu einer Auflistungsklasse.

IEnumerable-Unterstützung

BlockingCollection<T> stellt eine GetConsumingEnumerable-Methode bereit, die Consumern die Verwendung von foreach (For Each in Visual Basic) ermöglicht, um so lange Elemente zu entfernen, bis die Auflistung abgeschlossen ist, d. h., bis sie leer ist und keine Elemente mehr hinzugefügt werden. Weitere Informationen finden Sie unter Gewusst wie: Entfernen von Elementen in einer BlockingCollection mit ForEach.

Verwenden vieler BlockingCollections als eine einzelne Auflistung

Für Szenarien, in denen von einem Consumer Elemente aus mehreren Auflistungen gleichzeitig entnommen werden müssen, können Sie Arrays von BlockingCollection<T> erstellen und statische Methoden, wie TakeFromAny oder AddToAny verwenden, von denen für beliebige Auflistungen im Array Elemente entfernt bzw. hinzugefügt werden können. Wird eine Auflistung blockiert, wird von der Methode so lange weitergesucht, bis eine Auflistung gefunden wird, von der der Vorgang ausgeführt werden kann. Weitere Informationen finden Sie unter Gewusst wie: Verwenden von Arrays mit blockierenden Auflistungen in einer Pipeline.

Siehe auch

Referenz

System.Collections.Concurrent

Konzepte

Threadsichere Auflistungen

Weitere Ressourcen

Auflistungen und Datenstrukturen