如何:在 BlockingCollection 中逐个添加和取出项

此示例展示了如何以阻止性和非阻止性方式在 BlockingCollection<T> 中添加和删除项。 有关 BlockingCollection<T> 的详细信息,请参阅 BlockingCollection<T>

有关如何枚举 BlockingCollection<T> 直至其为空且不再添加更多元素的示例,请参阅BlockingCollection<T>

示例 1

第一个示例展示了如何添加和取出项,以便在集合暂时为空(取出时)或达到最大容量(添加时),或超过指定超时期限时,阻止相应操作。 注意,仅当已创建 BlockingCollection 且构造函数中指定了最大容量时,才会启用在达到最大容量时进行阻止的功能。

using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        // Increase or decrease this value as desired.
        int itemsToAdd = 500;

        if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
        {
            int width = Math.Max(Console.BufferWidth, 80);
            int height = Math.Max(Console.BufferHeight, itemsToAdd * 2 + 3);

            // Preserve all the display output for Adds and Takes
            Console.SetBufferSize(width, height);
        }

        // A bounded collection. Increase, decrease, or remove the
        // maximum capacity argument to see how it impacts behavior.
        var numbers = new BlockingCollection<int>(50);

        // A simple blocking consumer with no cancellation.
        Task.Run(() =>
        {
            int i = -1;
            while (!numbers.IsCompleted)
            {
                try
                {
                    i = numbers.Take();
                }
                catch (InvalidOperationException)
                {
                    Console.WriteLine("Adding was completed!");
                    break;
                }
                Console.WriteLine("Take:{0} ", i);

                // Simulate a slow consumer. This will cause
                // collection to fill up fast and thus Adds wil block.
                Thread.SpinWait(100000);
            }

            Console.WriteLine("\r\nNo more items to take. Press the Enter key to exit.");
        });

        // A simple blocking producer with no cancellation.
        Task.Run(() =>
        {
            for (int i = 0; i < itemsToAdd; i++)
            {
                numbers.Add(i);
                Console.WriteLine("Add:{0} Count={1}", i, numbers.Count);
            }

            // See documentation for this method.
            numbers.CompleteAdding();
        });

        // Keep the console display open in debug mode.
        Console.ReadLine();
    }
}
Option Strict On
Option Explicit On

Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks

Module SimpleBlocking

    Class Program
        Shared Sub Main()
            ' Increase or decrease this value as desired.
            Dim itemsToAdd As Integer = 500

            ' Preserve all the display output for Adds and Takes
            Console.SetBufferSize(80, (itemsToAdd * 2) + 3)

            ' A bounded collection. Increase, decrease, or remove the 
            ' maximum capacity argument to see how it impacts behavior.
            Dim numbers = New BlockingCollection(Of Integer)(50)

            ' A simple blocking consumer with no cancellation.
            Task.Factory.StartNew(Sub()
                                      Dim i As Integer = -1
                                      While numbers.IsCompleted = False
                                          Try
                                              i = numbers.Take()
                                          Catch ioe As InvalidOperationException
                                              Console.WriteLine("Adding was completed!")
                                              Exit While
                                          End Try
                                          Console.WriteLine("Take:{0} ", i)
                                          ' Simulate a slow consumer. This will cause
                                          ' collection to fill up fast and thus Adds wil block.
                                          Thread.SpinWait(100000)
                                      End While
                                      Console.WriteLine(vbCrLf & "No more items to take. Press the Enter key to exit.")
                                  End Sub)

            ' A simple blocking producer with no cancellation.
            Task.Factory.StartNew(Sub()
                                      For i As Integer = 0 To itemsToAdd
                                          numbers.Add(i)
                                          Console.WriteLine("Add:{0} Count={1}", i, numbers.Count)
                                      Next

                                      'See documentation for this method.
                                      numbers.CompleteAdding()
                                  End Sub)

            'Keep the console window open in debug mode.
            Console.ReadLine()
        End Sub
    End Class

End Module

示例 2

第二个示例演示如何添加和取出项以便不会阻止操作。 如果没有任何项、已达到绑定集合的最大容量或已超过超时期限,TryAddTryTake 操作返回 false。 这样一来,线程可以暂时执行其他一些有用的工作,并在稍后再次尝试检索新项,或尝试添加先前无法添加的相同项。 该程序还演示如何在访问 BlockingCollection<T> 时实现取消。

using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

class ProgramWithCancellation
{
    static int inputs = 2000;

    static void Main()
    {
        // The token source for issuing the cancelation request.
        var cts = new CancellationTokenSource();

        // A blocking collection that can hold no more than 100 items at a time.
        var numberCollection = new BlockingCollection<int>(100);

        if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
        {
            int width = Math.Max(Console.BufferWidth, 80);
            int height = Math.Max(Console.BufferHeight, 8000);

            // Preserve all the display output for Adds and Takes
            Console.SetBufferSize(width, height);
        }

        // The simplest UI thread ever invented.
        Task.Run(() =>
        {
            if (Console.ReadKey(true).KeyChar == 'c')
            {
                cts.Cancel();
            }
        });

        // Start one producer and one consumer.
        Task t1 = Task.Run(() => NonBlockingConsumer(numberCollection, cts.Token));
        Task t2 = Task.Run(() => NonBlockingProducer(numberCollection, cts.Token));

        // Wait for the tasks to complete execution
        Task.WaitAll(t1, t2);

        cts.Dispose();
        Console.WriteLine("Press the Enter key to exit.");
        Console.ReadLine();
    }

    static void NonBlockingConsumer(BlockingCollection<int> bc, CancellationToken ct)
    {
        // IsCompleted == (IsAddingCompleted && Count == 0)
        while (!bc.IsCompleted)
        {
            int nextItem = 0;
            try
            {
                if (!bc.TryTake(out nextItem, 0, ct))
                {
                    Console.WriteLine(" Take Blocked");
                }
                else
                {
                    Console.WriteLine(" Take:{0}", nextItem);
                }
            }

            catch (OperationCanceledException)
            {
                Console.WriteLine("Taking canceled.");
                break;
            }

            // Slow down consumer just a little to cause
            // collection to fill up faster, and lead to "AddBlocked"
            Thread.SpinWait(500000);
        }

        Console.WriteLine("\r\nNo more items to take.");
    }

    static void NonBlockingProducer(BlockingCollection<int> bc, CancellationToken ct)
    {
        int itemToAdd = 0;
        bool success = false;

        do
        {
            // Cancellation causes OCE. We know how to handle it.
            try
            {
                // A shorter timeout causes more failures.
                success = bc.TryAdd(itemToAdd, 2, ct);
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Add loop canceled.");
                // Let other threads know we're done in case
                // they aren't monitoring the cancellation token.
                bc.CompleteAdding();
                break;
            }

            if (success)
            {
                Console.WriteLine(" Add:{0}", itemToAdd);
                itemToAdd++;
            }
            else
            {
                Console.Write(" AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count);
                // Don't increment nextItem. Try again on next iteration.

                //Do something else useful instead.
                UpdateProgress(itemToAdd);
            }
        } while (itemToAdd < inputs);

        // No lock required here because only one producer.
        bc.CompleteAdding();
    }

    static void UpdateProgress(int i)
    {
        double percent = ((double)i / inputs) * 100;
        Console.WriteLine("Percent complete: {0}", percent);
    }
}
Option Strict On
Option Explicit On
Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks

Class NonBlockingAccess
    Shared inputs As Integer = 2000

    Shared Sub Main()
        ' The token source for issuing the cancelation request.
        Dim cts As New CancellationTokenSource()

        ' A blocking collection that can hold no more than 100 items at a time.
        Dim numberCollection As BlockingCollection(Of Integer) = New BlockingCollection(Of Integer)(100)

        ' Set console buffer to hold our prodigious output.
        Console.SetBufferSize(80, 2000)

        ' The simplest UI thread ever invented.
        Task.Run(Sub()
                     If Console.ReadKey.KeyChar() = "c"c Then
                         cts.Cancel()
                     End If
                 End Sub)

        ' Start one producer and one consumer.
        Dim t1 As Task = Task.Run(Sub() NonBlockingConsumer(numberCollection, cts.Token))
        Dim t2 As Task = Task.Run(Sub() NonBlockingProducer(numberCollection, cts.Token))

        ' Wait for the tasks to complete execution
        Task.WaitAll(t1, t2)

        cts.Dispose()
        Console.WriteLine("Press the Enter key to exit.")
        Console.ReadLine()

    End Sub

    Shared Sub NonBlockingConsumer(ByVal bc As BlockingCollection(Of Integer), ByVal ct As CancellationToken)

        ' IsCompleted is equivalent to (IsAddingCompleted And Count = 0)
        While bc.IsCompleted = False
            Dim nextItem As Integer = 0
            Try
                If bc.TryTake(nextItem, 0, ct) = False Then
                    Console.WriteLine("  Take Blocked.")
                Else
                    Console.WriteLine(" Take: {0}", nextItem)
                End If
            Catch ex As OperationCanceledException
                Console.WriteLine("Taking canceled.")
                Exit While
            End Try
            'Slow down consumer just a little to cause
            ' collection to fill up faster, and lead to "AddBlocked"
            Thread.SpinWait(500000)
        End While

        Console.WriteLine(vbCrLf & "No more items to take.")
    End Sub

    Shared Sub NonBlockingProducer(ByVal bc As BlockingCollection(Of Integer), ByVal ct As CancellationToken)
        Dim itemToAdd As Integer = 0
        Dim success As Boolean = False

        Do
            'Cancellation causes OCE. We know how to handle it.
            Try
                success = bc.TryAdd(itemToAdd, 2, ct)
            Catch ex As OperationCanceledException
                Console.WriteLine("Add loop canceled.")

                ' Let other threads know we're done in case
                ' they aren't monitoring the cancellation token.
                bc.CompleteAdding()
                Exit Do
            End Try

            If success = True Then
                Console.WriteLine(" Add:{0}", itemToAdd)
                itemToAdd = itemToAdd + 1
            Else
                Console.Write("  AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count)

                ' Don't increment nextItem. Try again on next iteration
                ' Do something else useful instead.
                UpdateProgress(itemToAdd)
            End If
        Loop While itemToAdd < inputs

        ' No lock required here because only one producer.
        bc.CompleteAdding()

    End Sub

    Shared Sub UpdateProgress(ByVal i As Integer)
        Dim percent As Double = (CType(i, Double) / inputs) * 100
        Console.WriteLine("Percent complete: {0}", percent)
    End Sub
End Class

另请参阅