共用方式為


BlockingCollection 概觀

BlockingCollection<T> 是安全執行緒集合,可提供下列功能:

  • 實作生產者與消費者模式。

  • 並行加入和擷取多個執行緒的項目。

  • 可選擇最大容量。

  • 當集合為空白或已滿時,封鎖插入和移除作業。

  • 不封鎖或封鎖特定時段的插入和移除 "try" 作業。

  • 封裝任何實作 IProducerConsumerCollection<T> 的集合型別。

  • 啟用取消語彙基元的取消作業。

  • 含有 foreach (在 Visual Basic 中為 For Each) 的兩種列舉:

    1. 唯讀列舉。

    2. 可在列舉項目時移除項目的列舉。

界限和封鎖支援

BlockingCollection<T> 支援界限和封鎖。 界限表示您可以設定集合的最大容量。 在特定案例中界限是很重要的,因為它可讓您控制記憶體中集合的最大大小,並防止生產執行緒移到消費執行緒之前太遠。

多個執行緒或工作可以並行地將項目加入集合,如果集合達到指定的最大容量,則會封鎖生產執行緒,直到移除項目為止。 多個消費者可以並行地移除項目,如果集合變為空白,則會封鎖消費執行緒,直到生產者加入項目為止。 生產執行緒可以呼叫 CompleteAdding 以指出不會加入其他項目。 消費者可監視 IsCompleted 屬性,以了解何時集合為空白,以及不會加入其他項目。 下列範例示範簡單的 BlockingCollection,其界限容量為 100。 生產者工作會在部分特定條件為 true 時,將項目加入集合,然後呼叫 CompleteAdding。 消費者工作會擷取項目,直到 IsCompleted 屬性為 true 為止。

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if numbers.Count = dataItems.BoundedCapacity
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)
            // A bounded collection. It can hold no more 
            // than 100 items at once.
            BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);


            // A simple blocking consumer with no cancellation.
            Task.Factory.StartNew(() => 
            {
                while (!dataItems.IsCompleted)
                {

                    Data data = null;
                    // Blocks if number.Count == 0
                    // IOE means that Take() was called on a completed collection.
                    // Some other thread can call CompleteAdding after we pass the
                    // IsCompleted check but before we call Take. 
                    // In this example, we can simply catch the exception since the 
                    // loop will break on the next iteration.
                    try
                    {
                        data = dataItems.Take();
                    }
                    catch (InvalidOperationException) { }

                    if (data != null)
                    {
                        Process(data);
                    }
                }
                Console.WriteLine("\r\nNo more items to take.");
            });

            // A simple blocking producer with no cancellation.
            Task.Factory.StartNew(() =>
            {
                while (moreItemsToAdd)
                {
                    Data data = GetData();
                    // Blocks if numbers.Count == dataItems.BoundedCapacity
                    dataItems.Add(data);
                }
                // Let consumer know we are done.
                dataItems.CompleteAdding();
            });

如需完整的範例,請參閱 HOW TO:從 BlockingCollection 個別加入和擷取項目

計時的封鎖作業

在界限集合的計時封鎖 TryAddTryTake 作業中,方法會嘗試加入或擷取項目。 如果項目可用,則會放置在以傳址方式傳遞的變數中,並且此方法會傳回 true。 如果在指定的逾時期間之後沒有擷取項目,此方法會傳回 false。 然後,執行緒可任意執行其他有用的工作,然後嘗試重新存取集合。 如需計時封鎖存取的範例,請參閱 HOW TO:從 BlockingCollection 個別加入和擷取項目中的第二個範例。

取消加入和擷取作業

加入和擷取作業通常在迴圈中執行。 您可以取消迴圈,方法是將 CancellationToken 傳遞至 TryAddTryTake 方法,然後在每個反覆項目上檢查語彙基元的 IsCancellationRequested 屬性值。 如果該值為 true,則您可決定回應取消要求,清除任何資源並結束迴圈。 下列範例顯示接受取消語彙基元的 TryAdd 多載,以及使用它的程式碼:

Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop
do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);

如需如何加入取消支援的範例,請參閱 HOW TO:從 BlockingCollection 個別加入和擷取項目中的第二個範例。

指定集合型別

當您建立 BlockingCollection<T> 時,您不僅可以指定界限容量,也可以指定要使用的集合型別。 例如,您可以針對先進先出 (FIFO) 行為指定 ConcurrentConcurrentQueue,或針對後進先出 (LIFO) 行為指定 ConcurrentStack<T>。 您可以使用任何實作 IProducerConsumerCollection<T> 介面的集合類別。 BlockingCollection<T> 的預設集合型別是 ConcurrentQueue<T>。 下列程式碼範例中,顯示如何建立字串的 BlockingCollection<T> (其容量為 1000,並且使用 ConcurrentBag):

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );

如需詳細資訊,請參閱 HOW TO:將界限和封鎖功能加入至集合類別

IEnumerable 支援

BlockingCollection<T> 提供 GetConsumingEnumerable 方法,可讓消費者使用 foreach (在 Visual Basic 中為 For Each) 來移除項目,直到集合已完成為止,這表示集合為空白,且不會加入其他項目。 如需詳細資訊,請參閱 HOW TO:使用 ForEach 來移除 BlockingCollection 中的項目

將多個 BlockingCollection 做為一個使用

如果消費者需要同時從多個集合擷取項目,則您可以建立 BlockingCollection<T> 的陣列,並使用靜態方法 (例如 TakeFromAnyAddToAny),加入或擷取陣列中的任何集合。 如果封鎖一個集合,則該方法會立即嘗試另一個集合,直到找到一個可以執行作業的集合為止。 如需詳細資訊,請參閱 HOW TO:在管線中使用封鎖集合的陣列

請參閱

參考

System.Collections.Concurrent

概念

安全執行緒集合

其他資源

集合和資料結構