BlockingCollection<T> 類別
定義
重要
部分資訊涉及發行前產品,在發行之前可能會有大幅修改。 Microsoft 對此處提供的資訊,不做任何明確或隱含的瑕疵擔保。
為實作 IProducerConsumerCollection<T>的線程安全集合提供封鎖和系結功能。
generic <typename T>
public ref class BlockingCollection : IDisposable, System::Collections::Generic::IEnumerable<T>, System::Collections::Generic::IReadOnlyCollection<T>, System::Collections::ICollection
generic <typename T>
public ref class BlockingCollection : IDisposable, System::Collections::Generic::IEnumerable<T>, System::Collections::ICollection
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
[System.Runtime.InteropServices.ComVisible(false)]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.ICollection
[System.Runtime.InteropServices.ComVisible(false)]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.ICollection
type BlockingCollection<'T> = class
interface seq<'T>
interface IEnumerable
interface IReadOnlyCollection<'T>
interface ICollection
interface IDisposable
[<System.Runtime.Versioning.UnsupportedOSPlatform("browser")>]
type BlockingCollection<'T> = class
interface seq<'T>
interface IEnumerable
interface IReadOnlyCollection<'T>
interface ICollection
interface IDisposable
[<System.Runtime.InteropServices.ComVisible(false)>]
type BlockingCollection<'T> = class
interface seq<'T>
interface ICollection
interface IEnumerable
interface IDisposable
[<System.Runtime.InteropServices.ComVisible(false)>]
type BlockingCollection<'T> = class
interface seq<'T>
interface IEnumerable
interface ICollection
interface IDisposable
interface IReadOnlyCollection<'T>
type BlockingCollection<'T> = class
interface seq<'T>
interface ICollection
interface IEnumerable
interface IDisposable
Public Class BlockingCollection(Of T)
Implements ICollection, IDisposable, IEnumerable(Of T), IReadOnlyCollection(Of T)
Public Class BlockingCollection(Of T)
Implements ICollection, IDisposable, IEnumerable(Of T)
類型參數
- T
集合中的項目類型。
- 繼承
-
BlockingCollection<T>
- 屬性
- 實作
範例
下列範例示範如何同時從封鎖集合新增和擷取專案:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class BlockingCollectionDemo
{
static async Task Main()
{
await AddTakeDemo.BC_AddTakeCompleteAdding();
TryTakeDemo.BC_TryTake();
FromToAnyDemo.BC_FromToAny();
await ConsumingEnumerableDemo.BC_GetConsumingEnumerable();
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
}
}
class AddTakeDemo
{
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.Take()
// BlockingCollection<T>.CompleteAdding()
public static async Task BC_AddTakeCompleteAdding()
{
using (BlockingCollection<int> bc = new BlockingCollection<int>())
{
// Spin up a Task to populate the BlockingCollection
Task t1 = Task.Run(() =>
{
bc.Add(1);
bc.Add(2);
bc.Add(3);
bc.CompleteAdding();
});
// Spin up a Task to consume the BlockingCollection
Task t2 = Task.Run(() =>
{
try
{
// Consume the BlockingCollection
while (true) Console.WriteLine(bc.Take());
}
catch (InvalidOperationException)
{
// An InvalidOperationException means that Take() was called on a completed collection
Console.WriteLine("That's All!");
}
});
await Task.WhenAll(t1, t2);
}
}
}
class TryTakeDemo
{
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.TryTake()
// BlockingCollection<T>.IsCompleted
public static void BC_TryTake()
{
// Construct and fill our BlockingCollection
using (BlockingCollection<int> bc = new BlockingCollection<int>())
{
int NUMITEMS = 10000;
for (int i = 0; i < NUMITEMS; i++) bc.Add(i);
bc.CompleteAdding();
int outerSum = 0;
// Delegate for consuming the BlockingCollection and adding up all items
Action action = () =>
{
int localItem;
int localSum = 0;
while (bc.TryTake(out localItem)) localSum += localItem;
Interlocked.Add(ref outerSum, localSum);
};
// Launch three parallel actions to consume the BlockingCollection
Parallel.Invoke(action, action, action);
Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2));
Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted);
}
}
}
class FromToAnyDemo
{
// Demonstrates:
// Bounded BlockingCollection<T>
// BlockingCollection<T>.TryAddToAny()
// BlockingCollection<T>.TryTakeFromAny()
public static void BC_FromToAny()
{
BlockingCollection<int>[] bcs = new BlockingCollection<int>[2];
bcs[0] = new BlockingCollection<int>(5); // collection bounded to 5 items
bcs[1] = new BlockingCollection<int>(5); // collection bounded to 5 items
// Should be able to add 10 items w/o blocking
int numFailures = 0;
for (int i = 0; i < 10; i++)
{
if (BlockingCollection<int>.TryAddToAny(bcs, i) == -1) numFailures++;
}
Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures);
// Should be able to retrieve 10 items
int numItems = 0;
int item;
while (BlockingCollection<int>.TryTakeFromAny(bcs, out item) != -1) numItems++;
Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems);
}
}
class ConsumingEnumerableDemo
{
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.GetConsumingEnumerable()
public static async Task BC_GetConsumingEnumerable()
{
using (BlockingCollection<int> bc = new BlockingCollection<int>())
{
// Kick off a producer task
var producerTask = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
bc.Add(i);
Console.WriteLine($"Producing: {i}");
await Task.Delay(100); // sleep 100 ms between adds
}
// Need to do this to keep foreach below from hanging
bc.CompleteAdding();
});
// Now consume the blocking collection with foreach.
// Use bc.GetConsumingEnumerable() instead of just bc because the
// former will block waiting for completion and the latter will
// simply take a snapshot of the current state of the underlying collection.
foreach (var item in bc.GetConsumingEnumerable())
{
Console.WriteLine($"Consuming: {item}");
}
await producerTask; // Allow task to complete cleanup
}
}
}
open System
open System.Collections.Concurrent
open System.Threading
open System.Threading.Tasks
module AddTakeDemo =
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.Take()
// BlockingCollection<T>.CompleteAdding()
let blockingCollectionAddTakeCompleteAdding () =
task {
use bc = new BlockingCollection<int>()
// Spin up a Task to populate the BlockingCollection
let t1 =
task {
bc.Add 1
bc.Add 2
bc.Add 3
bc.CompleteAdding()
}
// Spin up a Task to consume the BlockingCollection
let t2 =
task {
try
// Consume consume the BlockingCollection
while true do
printfn $"{bc.Take()}"
with :? InvalidOperationException ->
// An InvalidOperationException means that Take() was called on a completed collection
printfn "That's All!"
}
let! _ = Task.WhenAll(t1, t2)
()
}
module TryTakeDemo =
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.TryTake()
// BlockingCollection<T>.IsCompleted
let blockingCollectionTryTake () =
// Construct and fill our BlockingCollection
use bc = new BlockingCollection<int>()
let NUMITEMS = 10000;
for i = 0 to NUMITEMS - 1 do
bc.Add i
bc.CompleteAdding()
let mutable outerSum = 0
// Delegate for consuming the BlockingCollection and adding up all items
let action =
Action(fun () ->
let mutable localItem = 0
let mutable localSum = 0
while bc.TryTake &localItem do
localSum <- localSum + localItem
Interlocked.Add(&outerSum, localSum)
|> ignore)
// Launch three parallel actions to consume the BlockingCollection
Parallel.Invoke(action, action, action)
printfn $"Sum[0..{NUMITEMS}) = {outerSum}, should be {((NUMITEMS * (NUMITEMS - 1)) / 2)}"
printfn $"bc.IsCompleted = {bc.IsCompleted} (should be true)"
module FromToAnyDemo =
// Demonstrates:
// Bounded BlockingCollection<T>
// BlockingCollection<T>.TryAddToAny()
// BlockingCollection<T>.TryTakeFromAny()
let blockingCollectionFromToAny () =
let bcs =
[|
new BlockingCollection<int>(5) // collection bounded to 5 items
new BlockingCollection<int>(5) // collection bounded to 5 items
|]
// Should be able to add 10 items w/o blocking
let mutable numFailures = 0;
for i = 0 to 9 do
if BlockingCollection<int>.TryAddToAny(bcs, i) = -1 then
numFailures <- numFailures + 1
printfn $"TryAddToAny: {numFailures} failures (should be 0)"
// Should be able to retrieve 10 items
let mutable numItems = 0
let mutable item = 0
while BlockingCollection<int>.TryTakeFromAny(bcs, &item) <> -1 do
numItems <- numItems + 1
printfn $"TryTakeFromAny: retrieved {numItems} items (should be 10)"
module ConsumingEnumerableDemo =
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.GetConsumingEnumerable()
let blockingCollectionGetConsumingEnumerable () =
task {
use bc = new BlockingCollection<int>()
// Kick off a producer task
let producerTask =
task {
for i = 0 to 9 do
bc.Add i
printfn $"Producing: {i}"
do! Task.Delay 100 // sleep 100 ms between adds
// Need to do this to keep foreach below from hanging
bc.CompleteAdding()
}
// Now consume the blocking collection with foreach.
// Use bc.GetConsumingEnumerable() instead of just bc because the
// former will block waiting for completion and the latter will
// simply take a snapshot of the current state of the underlying collection.
for item in bc.GetConsumingEnumerable() do
printfn $"Consuming: {item}"
do! producerTask // Allow task to complete cleanup
}
let main =
task {
do! AddTakeDemo.blockingCollectionAddTakeCompleteAdding ()
TryTakeDemo.blockingCollectionTryTake ()
FromToAnyDemo.blockingCollectionFromToAny ()
do! ConsumingEnumerableDemo.blockingCollectionGetConsumingEnumerable ()
printfn "Press any key to exit."
Console.ReadKey(true) |> ignore
}
main.Wait()
Imports System.Threading.Tasks
Imports System.Collections.Concurrent
Imports System.Threading
Class BlockingCollectionDemo
Shared Sub Main()
AddTakeDemo.BC_AddTakeCompleteAdding()
TryTakeDemo.BC_TryTake()
ToAnyDemo.BC_ToAny()
ConsumingEnumerableDemo.BC_GetConsumingEnumerable()
' Keep the console window open in debug mode
Console.WriteLine("Press any key to exit.")
Console.ReadKey()
End Sub
End Class
Class AddTakeDemo
' Demonstrates:
' BlockingCollection<T>.Add()
' BlockingCollection<T>.Take()
' BlockingCollection<T>.CompleteAdding()
Shared Sub BC_AddTakeCompleteAdding()
Using bc As New BlockingCollection(Of Integer)()
' Spin up a Task to populate the BlockingCollection
Using t1 As Task = Task.Factory.StartNew(
Sub()
bc.Add(1)
bc.Add(2)
bc.Add(3)
bc.CompleteAdding()
End Sub)
' Spin up a Task to consume the BlockingCollection
Using t2 As Task = Task.Factory.StartNew(
Sub()
Try
' Consume the BlockingCollection
While True
Console.WriteLine(bc.Take())
End While
Catch generatedExceptionName As InvalidOperationException
' An InvalidOperationException means that Take() was called on a completed collection
Console.WriteLine("That's All!")
End Try
End Sub)
Task.WaitAll(t1, t2)
End Using
End Using
End Using
End Sub
End Class
'Imports System.Collections.Concurrent
'Imports System.Threading
'Imports System.Threading.Tasks
Class TryTakeDemo
' Demonstrates:
' BlockingCollection<T>.Add()
' BlockingCollection<T>.CompleteAdding()
' BlockingCollection<T>.TryTake()
' BlockingCollection<T>.IsCompleted
Shared Sub BC_TryTake()
' Construct and fill our BlockingCollection
Using bc As New BlockingCollection(Of Integer)()
Dim NUMITEMS As Integer = 10000
For i As Integer = 0 To NUMITEMS - 1
bc.Add(i)
Next
bc.CompleteAdding()
Dim outerSum As Integer = 0
' Delegate for consuming the BlockingCollection and adding up all items
Dim action As Action =
Sub()
Dim localItem As Integer
Dim localSum As Integer = 0
While bc.TryTake(localItem)
localSum += localItem
End While
Interlocked.Add(outerSum, localSum)
End Sub
' Launch three parallel actions to consume the BlockingCollection
Parallel.Invoke(action, action, action)
Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2))
Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted)
End Using
End Sub
End Class
'Imports System.Threading.Tasks
'Imports System.Collections.Concurrent
' Demonstrates:
' Bounded BlockingCollection<T>
' BlockingCollection<T>.TryAddToAny()
' BlockingCollection<T>.TryTakeFromAny()
Class ToAnyDemo
Shared Sub BC_ToAny()
Dim bcs As BlockingCollection(Of Integer)() = New BlockingCollection(Of Integer)(1) {}
bcs(0) = New BlockingCollection(Of Integer)(5)
' collection bounded to 5 items
bcs(1) = New BlockingCollection(Of Integer)(5)
' collection bounded to 5 items
' Should be able to add 10 items w/o blocking
Dim numFailures As Integer = 0
For i As Integer = 0 To 9
If BlockingCollection(Of Integer).TryAddToAny(bcs, i) = -1 Then
numFailures += 1
End If
Next
Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures)
' Should be able to retrieve 10 items
Dim numItems As Integer = 0
Dim item As Integer
While BlockingCollection(Of Integer).TryTakeFromAny(bcs, item) <> -1
numItems += 1
End While
Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems)
End Sub
End Class
'Imports System.Threading.Tasks
'Imports System.Collections.Concurrent
' Demonstrates:
' BlockingCollection<T>.Add()
' BlockingCollection<T>.CompleteAdding()
' BlockingCollection<T>.GetConsumingEnumerable()
Class ConsumingEnumerableDemo
Shared Sub BC_GetConsumingEnumerable()
Using bc As New BlockingCollection(Of Integer)()
' Kick off a producer task
Task.Factory.StartNew(
Sub()
For i As Integer = 0 To 9
bc.Add(i)
' sleep 100 ms between adds
Thread.Sleep(100)
Next
' Need to do this to keep foreach below from not responding.
bc.CompleteAdding()
End Sub)
' Now consume the blocking collection with foreach.
' Use bc.GetConsumingEnumerable() instead of just bc because the
' former will block waiting for completion and the latter will
' simply take a snapshot of the current state of the underlying collection.
For Each item In bc.GetConsumingEnumerable()
Console.WriteLine(item)
Next
End Using
End Sub
End Class
備註
BlockingCollection<T> 是提供下列專案的安全線程集合類別:
生產者/取用者模式的實作;BlockingCollection<T> 是 IProducerConsumerCollection<T> 介面的包裝函式。
在 TryAdd 或 TryTake 方法中使用 CancellationToken 物件取消 Add 或 Take 作業。
重要
此類型會實作 IDisposable 介面。 當您完成使用類型時,應該直接或間接處置它。 若要直接處置類型,請在 try
/catch
區塊中呼叫其 Dispose 方法。 若要間接處置它,請使用語言建構,例如 using
(C#) 或 Using
(在 Visual Basic 中)。 如需詳細資訊,請參閱
IProducerConsumerCollection<T> 代表允許安全線程新增和移除數據的集合。 BlockingCollection<T> 會作為 IProducerConsumerCollection<T> 實例的包裝函式,並允許移除集合中的嘗試封鎖,直到數據可供移除為止。 同樣地,您可以建立 BlockingCollection<T>,以對 IProducerConsumerCollection<T>中允許的數據元素數目強制執行上限;再封鎖集合的新增嘗試,直到空間可供儲存新增的項目為止。 如此一來,BlockingCollection<T> 類似於傳統的封鎖佇列數據結構,不同之處在於基礎數據儲存機制會抽象化為 IProducerConsumerCollection<T>。
BlockingCollection<T> 支援周框和封鎖。 周框表示您可以設定集合的最大容量。 在某些案例中,界限很重要,因為它可讓您控制記憶體中集合的大小上限,並防止產生線程在取用線程之前移動太遠。多個線程或工作可以同時將專案新增至集合,如果集合達到其指定的最大容量,則產生線程會封鎖,直到移除項目為止。 多個取用者可以同時移除專案,而且如果集合變成空的,取用線程將會封鎖,直到產生者加入項目為止。 產生線程可以呼叫 CompleteAdding 方法,以指出不會再新增任何專案。 取用者會監視 IsCompleted 屬性,以瞭解集合何時是空的,而且不會再新增任何專案。
Add 和 Take 作業通常會在迴圈中執行。 您可以藉由將 CancellationToken 物件傳入 TryAdd 或 TryTake 方法,然後在每次反覆專案上檢查令牌 IsCancellationRequested 屬性值,以取消迴圈。 如果值為 true
,則清除任何資源並結束迴圈,即可回應取消要求。
當您建立 BlockingCollection<T> 物件時,不僅可以指定系結容量,還可以指定要使用的集合類型。 例如,您可以針對先入、先出 (FIFO) 行為或先出 (LIFO) 行為指定 ConcurrentQueue<T> 物件,或先出 (LIFO) 行為的 ConcurrentStack<T> 物件。 您可以使用任何實作 IProducerConsumerCollection<T> 介面的集合類別。 BlockingCollection<T> 的預設集合類型為 ConcurrentQueue<T>。
請勿直接修改基礎集合。 使用 BlockingCollection<T> 方法來新增或移除元素。 如果您直接變更基礎集合,BlockingCollection<T> 物件可能會損毀。
BlockingCollection<T> 的設計並未考慮到異步存取。 如果您的應用程式需要異步產生者/取用者案例,請考慮改用 Channel<T>。
建構函式
BlockingCollection<T>() |
在沒有上限的情況下,初始化 BlockingCollection<T> 類別的新實例。 |
BlockingCollection<T>(Int32) |
使用指定的上限,初始化 BlockingCollection<T> 類別的新實例。 |
BlockingCollection<T>(IProducerConsumerCollection<T>) |
在沒有上限的情況下,初始化 BlockingCollection<T> 類別的新實例,並使用提供的 IProducerConsumerCollection<T> 作為其基礎數據存放區。 |
BlockingCollection<T>(IProducerConsumerCollection<T>, Int32) |
使用指定的上限,並使用提供的 IProducerConsumerCollection<T> 做為其基礎數據存放區,初始化 BlockingCollection<T> 類別的新實例。 |
屬性
BoundedCapacity |
取得這個 BlockingCollection<T> 實例的限定容量。 |
Count |
取得包含在 BlockingCollection<T>中的項目數目。 |
IsAddingCompleted |
取得這個 BlockingCollection<T> 是否已標示為完成以進行新增。 |
IsCompleted |
取得這個 BlockingCollection<T> 是否已標示為完成以新增,而且是空的。 |
方法
明確介面實作
ICollection.CopyTo(Array, Int32) |
從目標陣列的指定索引開始,將 BlockingCollection<T> 實例中的所有專案複製到相容的一維陣列。 |
ICollection.IsSynchronized |
取得值,指出是否同步存取 ICollection (線程安全)。 |
ICollection.SyncRoot |
取得對象,這個物件可用來同步存取 ICollection。 不支援這個屬性。 |
IEnumerable.GetEnumerator() |
提供集合中專案的 IEnumerator。 |
IEnumerable<T>.GetEnumerator() |
提供集合中專案的 IEnumerator<T>。 |
擴充方法
適用於
執行緒安全性
Dispose 方法不是安全線程。 BlockingCollection<T> 的所有其他公用和受保護成員都是安全線程,而且可以從多個線程同時使用。