Verwenden von foreach zum Entfernen von Elementen aus einer BlockingCollection-Klasse

Zusätzlich zum Entnehmen von Elementen aus einer BlockingCollection<T>-Klasse mithilfe der Methoden Take und TryTake können Sie auch einen foreach-Verweis (For Each in Visual Basic) mit der BlockingCollection<T>.GetConsumingEnumerable-Methode verwenden, um Elemente zu entfernen, bis der Hinzufügevorgang abgeschlossen und die Auflistung leer ist. Dies wird als mutierende Enumeration oder verbrauchende Enumeration bezeichnet, da dieser Enumerator, im Gegensatz zu einer typischen foreach- (For Each-)Schleife, die Quellsammlung durch Entfernen von Elementen verändert.

Beispiel

Das folgende Beispiel zeigt das Entfernen aller Elemente in einer BlockingCollection<T> mithilfe einer foreach-Schleife (For Each).

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

class Example
{
    // Limit the collection size to 2000 items at any given time.
    // Set itemsToProduce to > 500 to hit the limit.
    const int UpperLimit = 1000;

    // Adjust this number to see how it impacts the producing-consuming pattern.
    const int ItemsToProduce = 100;

    static readonly BlockingCollection<long> Collection =
        new BlockingCollection<long>(UpperLimit);

    // Variables for diagnostic output only.
    static readonly Stopwatch Stopwatch = new Stopwatch();
    static int TotalAdditions = 0;

    static async Task Main()
    {
        Stopwatch.Start();

        // Queue the consumer task.
        var consumerTask = Task.Run(() => RunConsumer());

        // Queue the producer tasks.
        var produceTaskOne = Task.Run(() => RunProducer("A", 0));
        var produceTaskTwo = Task.Run(() => RunProducer("B", ItemsToProduce));
        var producerTasks = new[] { produceTaskOne , produceTaskTwo };

        // Create a cleanup task that will call CompleteAdding after
        // all producers are done adding items.
        var cleanupTask = Task.Factory.ContinueWhenAll(producerTasks, _ => Collection.CompleteAdding());

        // Wait for all tasks to complete
        await Task.WhenAll(consumerTask, produceTaskOne, produceTaskTwo, cleanupTask);

        // Keep the console window open while the
        // consumer thread completes its output.
        Console.WriteLine("Press any key to exit");
        Console.ReadKey(true);
    }

    static void RunProducer(string id, int start)
    {
        var additions = 0;
        for (var i = start; i < start + ItemsToProduce; i++)
        {
            // The data that is added to the collection.
            var ticks = Stopwatch.ElapsedTicks;

            // Display additions and subtractions.
            Console.WriteLine($"{id} adding tick value {ticks}. item# {i}");

            if (!Collection.IsAddingCompleted)
            {
                Collection.Add(ticks);
            }

            // Counter for demonstration purposes only.
            additions++;

            // Comment this line to speed up the producer threads.
            Thread.SpinWait(100000);
        }

        Interlocked.Add(ref TotalAdditions, additions);
        Console.WriteLine($"{id} is done adding: {additions} items");
    }

    static void RunConsumer()
    {
        // GetConsumingEnumerable returns the enumerator for the underlying collection.
        var subtractions = 0;
        foreach (var item in Collection.GetConsumingEnumerable())
        {
            Console.WriteLine(
                $"Consuming tick value {item:D18} : item# {subtractions++} : current count = {Collection.Count}");
        }

        Console.WriteLine(
            $"Total added: {TotalAdditions} Total consumed: {subtractions} Current count: {Collection.Count}");

        Stopwatch.Stop();
    }
}
Option Strict On
Option Explicit On
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent


Module EnumerateBC

    Class Program
        ' Limit the collection size to 2000 items
        ' at any given time. Set itemsToProduce to >500
        ' to hit the limit.
        Const upperLimit As Integer = 1000

        ' Adjust this number to see how it impacts
        ' the producing-consuming pattern.
        Const itemsToProduce As Integer = 100

        Shared collection As BlockingCollection(Of Long) = New BlockingCollection(Of Long)(upperLimit)

        ' Variables for diagnostic output only.
        Shared sw As New Stopwatch()
        Shared totalAdditions As Integer = 0

        ' Counter for synchronizing producers.
        Shared producersStillRunning As Integer = 2

        Shared Sub Main()

            ' Start the stopwatch.
            sw.Start()
            ' Queue the Producer threads. 

            Dim task1 = Task.Factory.StartNew(Sub() RunProducer("A", 0))
            Dim task2 = Task.Factory.StartNew(Sub() RunProducer("B", itemsToProduce))

            ' Store in an array for use with ContinueWhenAll
            Dim producers() As Task = {task1, task2}

            ' Create a cleanup task that will call CompleteAdding after
            ' all producers are done adding items.
            Dim cleanup As Task = Task.Factory.ContinueWhenAll(producers, Sub(p) collection.CompleteAdding())

            ' Queue the Consumer thread. Put this call
            ' before Parallel.Invoke to begin consuming as soon as
            ' the producers add items.
            Task.Factory.StartNew(Sub() RunConsumer())

            ' Keep the console window open while the
            ' consumer thread completes its output.
            Console.ReadKey()

        End Sub

        Shared Sub RunProducer(ByVal ID As String, ByVal start As Integer)
            Dim additions As Integer = 0

            For i As Integer = start To start + itemsToProduce - 1

                ' The data that is added to the collection.
                Dim ticks As Long = sw.ElapsedTicks

                'Display additions and subtractions.
                Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i)

                ' Don't try to add item after CompleteAdding
                ' has been called.
                If collection.IsAddingCompleted = False Then
                    collection.Add(ticks)
                End If

                ' Counter for demonstration purposes only.
                additions = additions + 1

                ' Uncomment this line to 
                ' slow down the producer threads without sleeping.
                Thread.SpinWait(100000)

            Next
            Interlocked.Add(totalAdditions, additions)
            Console.WriteLine("{0} is done adding: {1} items", ID, additions)

        End Sub

        Shared Sub RunConsumer()
            ' GetConsumingEnumerable returns the enumerator for the 
            ' underlying collection.
            Dim subtractions As Integer = 0

            For Each item In collection.GetConsumingEnumerable

                subtractions = subtractions + 1
                Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
                                  item.ToString("D18"), subtractions, collection.Count)
            Next

            Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
                                    totalAdditions, subtractions, collection.Count())
            sw.Stop()

            Console.WriteLine("Press any key to exit.")
        End Sub

    End Class
End Module

Dieses Beispiel verwendet eine foreach-Schleife mit der BlockingCollection<T>.GetConsumingEnumerable-Methode im verbrauchenden Thread, wodurch jedes Element beim Aufzählen aus der Auflistung entfernt wird. System.Collections.Concurrent.BlockingCollection<T> begrenzt die maximale Anzahl von Elementen, die sich zu einem bestimmten Zeitpunkt in der Sammlung befinden können. Durch Aufzählen der Auflistung auf diese Weise wird der Consumerthread blockiert, wenn keine Elemente verfügbar sind oder die Auflistung leer ist. In diesem Beispiel ist eine Blockierung kein Problem, da der Producerthread Elemente schneller hinzufügt als sie verbraucht werden können.

Die BlockingCollection<T>.GetConsumingEnumerable-Methode gibt IEnumerable<T> zurück, weshalb die Reihenfolge nicht garantiert werden kann. Intern wird jedoch eine System.Collections.Concurrent.ConcurrentQueue<T>-Klasse als zugrunde liegender Sammlungstyp verwendet, die Objekte nach der FIFO-Reihenfolge (First-in-First-Out) aus der Warteschlange entfernt. Wenn gleichzeitige Aufrufe der BlockingCollection<T>.GetConsumingEnumerable-Methode durchgeführt werden, konkurrieren diese. Ein Element, das in einer Enumeration aus der Warteschlange entfernt wurde, kann nicht in der anderen Enumeration beobachtet werden.

Um die Auflistung aufzuzählen, ohne sie zu verändern, verwenden Sie einfach foreach (For Each) ohne die GetConsumingEnumerable-Methode. Es ist jedoch wichtig zu wissen, dass diese Art der Enumeration eine Momentaufnahme der Auflistung zu einem genauen Zeitpunkt darstellt. Wenn weitere Threads gleichzeitig Elemente hinzufügen oder entfernen, während die Schleife ausgeführt wird, stellt die Schleife möglicherweise nicht den tatsächlichen Zustand der Auflistung dar.

Siehe auch