다음을 통해 공유


ForEach를 사용하여 BlockingCollection의 항목 제거

TakeTryTake 메서드를 사용하여 BlockingCollection<T>에서 항목을 가져오는 것 외에도, 추가가 완료되고 컬렉션이 빈 상태가 될 때까지 ForEach(Visual Basic의 For Each)를 BlockingCollection<T>.GetConsumingEnumerable과(와) 함께 사용하여 항목을 제거할 수 있습니다. 일반적인 foreach(For Each) 루프와 달리 이 열거자는 항목을 제거하여 소스 컬렉션을 수정하기 때문에 이 방식을 열거형 변경 또는 열거형 소비라고 합니다.

예시

다음 예제에서는 foreach(For Each) 루프를 사용하여 BlockingCollection<T>의 항목을 모두 제거하는 방법을 보여 줍니다.

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

class Example
{
    // Limit the collection size to 2000 items at any given time.
    // Set itemsToProduce to > 500 to hit the limit.
    const int UpperLimit = 1000;

    // Adjust this number to see how it impacts the producing-consuming pattern.
    const int ItemsToProduce = 100;

    static readonly BlockingCollection<long> Collection =
        new BlockingCollection<long>(UpperLimit);

    // Variables for diagnostic output only.
    static readonly Stopwatch Stopwatch = new Stopwatch();
    static int TotalAdditions = 0;

    static async Task Main()
    {
        Stopwatch.Start();

        // Queue the consumer task.
        var consumerTask = Task.Run(() => RunConsumer());

        // Queue the producer tasks.
        var produceTaskOne = Task.Run(() => RunProducer("A", 0));
        var produceTaskTwo = Task.Run(() => RunProducer("B", ItemsToProduce));
        var producerTasks = new[] { produceTaskOne , produceTaskTwo };

        // Create a cleanup task that will call CompleteAdding after
        // all producers are done adding items.
        var cleanupTask = Task.Factory.ContinueWhenAll(producerTasks, _ => Collection.CompleteAdding());

        // Wait for all tasks to complete
        await Task.WhenAll(consumerTask, produceTaskOne, produceTaskTwo, cleanupTask);

        // Keep the console window open while the
        // consumer thread completes its output.
        Console.WriteLine("Press any key to exit");
        Console.ReadKey(true);
    }

    static void RunProducer(string id, int start)
    {
        var additions = 0;
        for (var i = start; i < start + ItemsToProduce; i++)
        {
            // The data that is added to the collection.
            var ticks = Stopwatch.ElapsedTicks;

            // Display additions and subtractions.
            Console.WriteLine($"{id} adding tick value {ticks}. item# {i}");

            if (!Collection.IsAddingCompleted)
            {
                Collection.Add(ticks);
            }

            // Counter for demonstration purposes only.
            additions++;

            // Comment this line to speed up the producer threads.
            Thread.SpinWait(100000);
        }

        Interlocked.Add(ref TotalAdditions, additions);
        Console.WriteLine($"{id} is done adding: {additions} items");
    }

    static void RunConsumer()
    {
        // GetConsumingEnumerable returns the enumerator for the underlying collection.
        var subtractions = 0;
        foreach (var item in Collection.GetConsumingEnumerable())
        {
            Console.WriteLine(
                $"Consuming tick value {item:D18} : item# {subtractions++} : current count = {Collection.Count}");
        }

        Console.WriteLine(
            $"Total added: {TotalAdditions} Total consumed: {subtractions} Current count: {Collection.Count}");

        Stopwatch.Stop();
    }
}
Option Strict On
Option Explicit On
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent


Module EnumerateBC

    Class Program
        ' Limit the collection size to 2000 items
        ' at any given time. Set itemsToProduce to >500
        ' to hit the limit.
        Const upperLimit As Integer = 1000

        ' Adjust this number to see how it impacts
        ' the producing-consuming pattern.
        Const itemsToProduce As Integer = 100

        Shared collection As BlockingCollection(Of Long) = New BlockingCollection(Of Long)(upperLimit)

        ' Variables for diagnostic output only.
        Shared sw As New Stopwatch()
        Shared totalAdditions As Integer = 0

        ' Counter for synchronizing producers.
        Shared producersStillRunning As Integer = 2

        Shared Sub Main()

            ' Start the stopwatch.
            sw.Start()
            ' Queue the Producer threads. 

            Dim task1 = Task.Factory.StartNew(Sub() RunProducer("A", 0))
            Dim task2 = Task.Factory.StartNew(Sub() RunProducer("B", itemsToProduce))

            ' Store in an array for use with ContinueWhenAll
            Dim producers() As Task = {task1, task2}

            ' Create a cleanup task that will call CompleteAdding after
            ' all producers are done adding items.
            Dim cleanup As Task = Task.Factory.ContinueWhenAll(producers, Sub(p) collection.CompleteAdding())

            ' Queue the Consumer thread. Put this call
            ' before Parallel.Invoke to begin consuming as soon as
            ' the producers add items.
            Task.Factory.StartNew(Sub() RunConsumer())

            ' Keep the console window open while the
            ' consumer thread completes its output.
            Console.ReadKey()

        End Sub

        Shared Sub RunProducer(ByVal ID As String, ByVal start As Integer)
            Dim additions As Integer = 0

            For i As Integer = start To start + itemsToProduce - 1

                ' The data that is added to the collection.
                Dim ticks As Long = sw.ElapsedTicks

                'Display additions and subtractions.
                Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i)

                ' Don't try to add item after CompleteAdding
                ' has been called.
                If collection.IsAddingCompleted = False Then
                    collection.Add(ticks)
                End If

                ' Counter for demonstration purposes only.
                additions = additions + 1

                ' Uncomment this line to 
                ' slow down the producer threads without sleeping.
                Thread.SpinWait(100000)

            Next
            Interlocked.Add(totalAdditions, additions)
            Console.WriteLine("{0} is done adding: {1} items", ID, additions)

        End Sub

        Shared Sub RunConsumer()
            ' GetConsumingEnumerable returns the enumerator for the 
            ' underlying collection.
            Dim subtractions As Integer = 0

            For Each item In collection.GetConsumingEnumerable

                subtractions = subtractions + 1
                Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
                                  item.ToString("D18"), subtractions, collection.Count)
            Next

            Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
                                    totalAdditions, subtractions, collection.Count())
            sw.Stop()

            Console.WriteLine("Press any key to exit.")
        End Sub

    End Class
End Module

이 예제에서는 소비 스레드의 BlockingCollection<T>.GetConsumingEnumerable 메서드에 foreach 루프를 사용하는데, 이 루프는 컬렉션을 열거하면서 해당 컬렉션의 각 항목을 제거합니다. System.Collections.Concurrent.BlockingCollection<T>는 항상 컬렉션에 있는 항목의 최대 수를 제한합니다. 이런 방식으로 컬렉션을 열거하는 경우 제공되는 항목이 없거나 컬렉션이 비었을 때 소비자 스레드를 차단합니다. 이 예제에서는 항목이 소비되는 것보다 더 빠르게 공급자 스레드에서 항목을 추가하기 때문에 차단이 발생하지 않습니다.

BlockingCollection<T>.GetConsumingEnumerable은(는) IEnumerable<T>를 반환하므로 순서를 보장할 수 없습니다. 그러나 내부적으로 System.Collections.Concurrent.ConcurrentQueue<T>은(는) 기본 컬렉션 형식으로 사용되며 FIFO(선입선출) 순서를 따라 개체를 큐에서 제거합니다. BlockingCollection<T>.GetConsumingEnumerable에 대한 동시 호출이 이루어지면 경쟁이 발생합니다. 한 열거형에서 사용된(큐에서 제거된) 한 항목을 다른 열거형에서 관찰할 수 없습니다.

컬렉션을 수정하지 않고 열거하려면 GetConsumingEnumerable 메서드 없이 foreach(For Each)만 사용하면 됩니다. 그러나 이와 같이 열거한 결과는 특정 시점의 컬렉션을 나타내는 스냅샷에 불과하다는 사실을 이해할 필요가 있습니다. 루프를 실행하는 동시에 다른 스레드를 통해 항목을 추가하거나 제거하고 있는 경우에는 루프에서 컬렉션의 실제 상태를 나타내지 않을 수 있습니다.

참고 항목