BlockingCollection 概述

BlockingCollection<T> 是一个线程安全的集合类,提供以下功能:

  • 实现制造者-使用者模式。

  • 并发地从多个线程中添加和移除项。

  • 可选最大容量。

  • 集合为空或已满时通过插入和移除操作进行阻塞。

  • 插入和移除“尝试”操作不发生阻塞,或在指定时间段内发生阻塞。

  • 封装实现 IProducerConsumerCollection<T> 的任何集合类型

  • 使用取消标记执行取消操作。

  • 支持使用 foreach(在 Visual Basic 中,使用 For Each)的两种枚举:

    1. 只读枚举。

    2. 在枚举项时将项移除的枚举。

限制和阻塞支持

BlockingCollection<T> 支持限制和阻塞。 限制意味着可以设置集合的最大容量。 限制在某些情况中很重要,因为它使你能够控制内存中的集合的最大大小,并可阻止制造线程移动到离使用线程前方太远的位置。

多个线程或任务可以同时将项添加到集合中,如果集合达到其指定的最大容量,则生成线程将阻塞,直到删除某个项。 多个使用者可以同时删除项,如果集合变为空,则消耗线程将阻塞,直到生成者添加项。 生成线程可以调用 CompleteAdding 以指示不会添加更多项。 消费者监视 IsCompleted 属性,以了解集合何时为空且不会再添加更多项。 下面的示例展示了容量上限为 100 的简单 BlockingCollection。 只要某些外部条件为 true,生成者任务即可将项添加到集合中,然后调用 CompleteAdding。 使用者任务获取项,直到 IsCompleted 属性为 true。

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

完整示例,请参阅 如何单独添加和删除 BlockingCollection 中的项

计时阻塞操作

在有限集合上的定时阻塞 TryAddTryTake 操作中,该方法会尝试添加或获取项目。 如果某个项可用,则将其置于通过引用传入的变量中,并且该方法返回 true。 如果在指定的超时时间段后未检索任何项,则方法返回 false。 然后,线程可以自由执行其他一些有用的工作,然后再尝试访问集合。 有关定时访问阻止机制的示例,请查看如何:在BlockingCollection中单独添加和取出项目中的第二个示例。

取消添加和取出操作

添加和取操作通常在循环中执行。 可以通过以下方法来取消循环:向 CancellationTokenTryAdd 方法传入 TryTake,然后在每次迭代时检查该标记的 IsCancellationRequested 属性的值。 如果值为 true,则由你通过清理任何资源并退出循环来响应取消请求。 下面的示例演示获取取消标记和使用该标记的代码的 TryAdd 重载:

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

有关如何添加取消支持的示例,请参阅 How to: Add and Take Items Individually from a BlockingCollection 中的第二个示例。

指定集合类型

创建BlockingCollection<T>时,不仅可以指定容量限制,还可以指定要使用的集合类型。 例如,可为先进先出 (FIFO) 行为指定 ConcurrentQueue<T>,也可为后进先出 (LIFO) 行为指定 ConcurrentStack<T>。 可以使用任何集合类,这些集合类实现了 IProducerConsumerCollection<T> 接口。 的默认集合类型 BlockingCollection<T>ConcurrentQueue<T>. 下面的代码示例演示如何创建BlockingCollection<T>容量为 1000 的字符串,并使用 :ConcurrentBag<T>

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)  
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );  

有关详细信息,请参阅 如何:向集合添加边界和阻止功能

IEnumerable 支持

BlockingCollection<T> 提供了一种方法 GetConsumingEnumerable ,使使用者能够使用 foreachFor Each 在 Visual Basic 中)删除项,直到集合完成,这意味着该集合为空且不会添加更多项。 有关详细信息,请参阅 如何:使用 ForEach 删除 BlockingCollection 中的项

将多个 BlockingCollection 作为整体使用

对于使用者需要同时从多个集合中获取项的方案,可以创建数组并使用静态方法,例如BlockingCollection<T>TakeFromAny,这些方法将添加到数组AddToAny中的任何集合或从任何集合中获取。 如果一个集合被阻止,该方法会立即尝试另一个集合,直到找到可以执行该作的集合。 有关详细信息,请参阅 如何:在管道中使用阻塞集合的数组

另请参阅