BlockingCollection 概觀

BlockingCollection<T> 是提供下列功能的安全執行緒集合類別︰

  • 生產者-消費者模式的實作。

  • 同時從多個執行緒新增和擷取項目。

  • 最佳最大容量。

  • 集合是空的或已滿時封鎖的插入和移除作業。

  • 不會封鎖或最多封鎖一段指定時間的插入和移除 "try" 作業。

  • 封裝任何可實作 IProducerConsumerCollection<T> 的集合類別

  • 具有取消語彙基元的取消作業。

  • foreach 的列舉有兩種 (在 Visual Basic 中為 For Each):

    1. 唯讀列舉。

    2. 移除所列舉項目的列舉。

界限和封鎖支援

BlockingCollection<T> 支援界限和封鎖。 界限表示您可以設定集合的最大容量。 界限在某些情況下十分重要,原因是它可讓您控制記憶體中集合的大小上限,並且防止產生執行緒移到使用執行緒的太前面。

多個執行緒或工作可以同時將項目新增至集合,如果集合達到其指定的最大容量,則會在移除項目之前封鎖產生執行緒。 多位消費者可以同時移除項目,如果集合變成空的,則會在生產者新增項目之前封鎖使用執行緒。 產生執行緒可以呼叫 CompleteAdding,表示無法再新增項目。 消費者會監視 IsCompleted 屬性,以得知集合何時變成空的,以及何時無法再新增項目。 下列範例示範界限容量為 100 的簡單 BlockingCollection。 只要符合某個外部條件,生產者工作就會將項目新增至集合,然後呼叫 CompleteAdding。 在 IsCompleted 屬性為 true 之前,消費者工作會擷取項目。

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

如需完整範例,請參閱如何:從 BlockingCollection 個別新增和擷取項目

計時封鎖作業

在界限集合的計時封鎖 TryAddTryTake 作業上,方法會嘗試新增或擷取項目。 如果項目可用,則會將它置入依傳址所傳遞的變數,而且方法會傳回 true。 如果在指定的逾時期間之後未擷取任何項目,則方法會傳回 false。 執行緒接著可以先執行一些其他有用工作,再重新嘗試存取集合。 如需計時封鎖存取的範例,請參閱如何:從 BlockingCollection 個別新增和擷取項目中的第二個範例。

取消新增和擷取作業

新增和擷取作業一般會透過迴圈形式執行。 您可以將 CancellationToken 傳入 TryAddTryTake 方法來取消迴圈,然後檢查每個反覆項目上語彙基元之 IsCancellationRequested 屬性的值。 如果值為 true,則由您決定是否要清除任何資源並結束迴圈,藉此回應該取消要求。 下列範例示範採用取消語彙基元之 TryAdd 的多載,以及使用它的程式碼︰

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

如需如何新增取消支援的範例,請參閱如何:從 BlockingCollection 個別新增和擷取項目中的第二個範例。

指定集合類型

在您建立 BlockingCollection<T> 時,不僅可以指定界限容量,還可以指定要使用的集合類型。 例如,您可以針對先進先出 (FIFO) 行為指定 ConcurrentQueue<T>,或針對後進先出 (LIFO) 行為指定 ConcurrentStack<T>。 您可以使用任何可實作 IProducerConsumerCollection<T> 介面的集合類別。 BlockingCollection<T> 的預設集合類型為 ConcurrentQueue<T>。 下列程式碼範例示範如何建立容量為 1000 並使用 ConcurrentBag<T> 之字串的 BlockingCollection<T>

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)  
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );  

如需詳細資訊,請參閱操作說明:將界限和封鎖功能加入至集合

IEnumerable 支援

BlockingCollection<T> 提供 GetConsumingEnumerable 方法讓消費者可以使用 foreach (Visual Basic 中的 For Each) 移除項目,直到收集完成為止;這表示集合會是空的,而且不會再新增任何項目。 如需詳細資訊,請參閱如何:使用 ForEach 來移除 BlockingCollection 中的項目

將多個 BlockingCollection 當成一個使用

如果消費者需要同時從多個集合擷取項目,您可以建立 BlockingCollection<T> 陣列,並使用將新增至或擷取自陣列中任何集合的靜態方法 (例如 TakeFromAnyAddToAny)。 如果封鎖其中一個集合,則方法會立即嘗試另一個集合,直到找到可執行作業的集合為止。 如需詳細資訊,請參閱如何:在管線中使用封鎖回收的陣列

另請參閱