共用方式為


HOW TO:使用 ForEach 來移除 BlockingCollection 中的項目

除了使用 TakeTryTake 方法從 BlockingCollection<T> 取得項目之外,您也可以使用 foreach (在 Visual Basic 中則為 For Each) 移除項目,直到加入完成且集合清空為止。 這種列舉稱為「變動列舉」(Mutating Enumeration) 或「耗用列舉」(Consuming Enumeration),因為不像一般 foreach (For Each) 迴圈,這個列舉程式會藉由移除項目以修改來源集合。

範例

下列範例顯示如何使用 foreach (For Each) 迴圈來移除 BlockingCollection<T> 中的所有項目。

Option Strict On
Option Explicit On
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent


Module EnumerateBC

    Class Program
        ' Limit the collection size to 2000 items
        ' at any given time. Set itemsToProduce to >500
        ' to hit the limit.
        Const upperLimit As Integer = 1000

        ' Adjust this number to see how it impacts
        ' the producing-consuming pattern.
        Const itemsToProduce As Integer = 100

        Shared collection As BlockingCollection(Of Long) = New BlockingCollection(Of Long)(upperLimit)

        ' Variables for diagnostic output only.
        Shared sw As New Stopwatch()
        Shared totalAdditions As Integer = 0

        ' Counter for synchronizing producers.
        Shared producersStillRunning As Integer = 2

        Shared Sub Main()

            ' Start the stopwatch.
            sw.Start()
            ' Queue the Producer threads. 

            Dim task1 = Task.Factory.StartNew(Sub() RunProducer("A", 0))
            Dim task2 = Task.Factory.StartNew(Sub() RunProducer("B", itemsToProduce))

            ' Store in an array for use with ContinueWhenAll
            Dim producers() As Task = {task1, task2}

            ' Create a cleanup task that will call CompleteAdding after
            ' all producers are done adding items.
            Dim cleanup As Task = Task.Factory.ContinueWhenAll(producers, Sub(p) collection.CompleteAdding())

            ' Queue the Consumer thread. Put this call
            ' before Parallel.Invoke to begin consuming as soon as
            ' the producers add items.
            Task.Factory.StartNew(Sub() RunConsumer())

            ' Keep the console window open while the
            ' consumer thread completes its output.
            Console.ReadKey()

        End Sub

        Shared Sub RunProducer(ByVal ID As String, ByVal start As Integer)
            Dim additions As Integer = 0

            For i As Integer = start To start + itemsToProduce - 1

                ' The data that is added to the collection.
                Dim ticks As Long = sw.ElapsedTicks

                'Display additions and subtractions.
                Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i)

                ' Don't try to add item after CompleteAdding
                ' has been called.
                If collection.IsAddingCompleted = False Then
                    collection.Add(ticks)
                End If

                ' Counter for demonstration purposes only.
                additions = additions + 1

                ' Uncomment this line to 
                ' slow down the producer threads without sleeping.
                Thread.SpinWait(100000)

            Next
            Interlocked.Add(totalAdditions, additions)
            Console.WriteLine("{0} is done adding: {1} items", ID, additions)

        End Sub

        Shared Sub RunConsumer()
            ' GetConsumingEnumerable returns the enumerator for the 
            ' underlying collection.
            Dim subtractions As Integer = 0

            For Each item In collection.GetConsumingEnumerable

                subtractions = subtractions + 1
                Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
                                  item.ToString("D18"), subtractions, collection.Count)
            Next

            Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
                                    totalAdditions, subtractions, collection.Count())
            sw.Stop()

            Console.WriteLine("Press any key to exit.")
        End Sub

    End Class
End Module
namespace EnumerateBlockingCollection
{
    using System;
    using System.Collections.Concurrent;
    using System.Diagnostics;

    class Program
    {

        // Limit the collection size to 2000 items
        // at any given time. Set itemsToProduce to >500
        // to hit the limit.
        const int upperLimit = 1000;

        // Adjust this number to see how it impacts
        // the producing-consuming pattern.
        const int itemsToProduce = 100;

        static BlockingCollection<long> collection = new BlockingCollection<long>(upperLimit);

        // Variables for diagnostic output only.
        static Stopwatch sw = new Stopwatch();
        static int totalAdditions = 0;

        // Counter for synchronizing producers.
        static int producersStillRunning = 2;            

        static void Main(string[] args)
        {

            // Start the stopwatch.
            sw.Start();



            // Queue the Producer threads. Store in an array
            // for use with ContinueWhenAll
            Task[] producers = new Task[2];
            producers[0] = Task.Factory.StartNew(() => RunProducer("A", 0));
            producers[1] = Task.Factory.StartNew(() => RunProducer("B", itemsToProduce));

            // Create a cleanup task that will call CompleteAdding after
            // all producers are done adding items.
            Task cleanup = Task.Factory.ContinueWhenAll(producers, (p) => collection.CompleteAdding());

            // Queue the Consumer thread. Put this call
            // before Parallel.Invoke to begin consuming as soon as
            // the producers add items.
            Task.Factory.StartNew(() => RunConsumer());

            // Keep the console window open while the
            // consumer thread completes its output.
            Console.ReadKey();

        }

        static void RunProducer(string ID, int start)
        {

            int additions = 0;
            for (int i = start; i < start + itemsToProduce; i++)
            {
                // The data that is added to the collection.
                long ticks = sw.ElapsedTicks;

                // Display additions and subtractions.
                Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i);

                if(!collection.IsAddingCompleted)
                    collection.Add(ticks);

                // Counter for demonstration purposes only.
                additions++;

                // Uncomment this line to 
                // slow down the producer threads     ing.
                Thread.SpinWait(100000); 
            }


            Interlocked.Add(ref totalAdditions, additions);
            Console.WriteLine("{0} is done adding: {1} items", ID, additions);
        }


        static void RunConsumer()
        {
            // GetConsumingEnumerable returns the enumerator for the 
            // underlying collection.
            int subtractions = 0;
            foreach (var item in collection.GetConsumingEnumerable())
            {
                Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
                        item.ToString("D18"), subtractions++, collection.Count);
            }

            Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
                                totalAdditions, subtractions, collection.Count());
            sw.Stop();

            Console.WriteLine("Press any key to exit");
        }            
    }
}

這個範例會在耗用端執行緒中使用 foreach 迴圈並搭配 BlockingCollection<T>.GetConsumingEnumerable 方法,以在列舉項目的同時將項目從集合中移除。 System.Collections.Concurrent.BlockingCollection<T> 會隨時限制集合中的項目數上限。 以此方式列舉集合時,如果沒有項目可用或集合是空集合,則消費者執行緒會無法繼續。 在這個範例中,封鎖並非考量重點,因為生產者執行緒加入項目的速度比使用的速度還要快。

項目的列舉順序不保證會和生產者執行緒加入項目的順序相同。

若要列舉集合而不加以修改,請使用 foreach (For Each) 就好,不要搭配 GetConsumingEnumerable 方法。 但是,請務必了解這種列舉代表的是集合在某個時間點的快照。 如果當您執行迴圈時同時有其他執行緒在加入或移除項目,則這個迴圈可能無法代表集合的實際狀態。

請參閱

參考

System.Collections.Concurrent

概念

以 .NET Framework 進行平行程式設計