BlockingCollection<T> 클래스
정의
중요
일부 정보는 릴리스되기 전에 상당 부분 수정될 수 있는 시험판 제품과 관련이 있습니다. Microsoft는 여기에 제공된 정보에 대해 어떠한 명시적이거나 묵시적인 보증도 하지 않습니다.
IProducerConsumerCollection<T>구현하는 스레드로부터 안전한 컬렉션에 대한 차단 및 경계 기능을 제공합니다.
generic <typename T>
public ref class BlockingCollection : IDisposable, System::Collections::Generic::IEnumerable<T>, System::Collections::Generic::IReadOnlyCollection<T>, System::Collections::ICollection
generic <typename T>
public ref class BlockingCollection : IDisposable, System::Collections::Generic::IEnumerable<T>, System::Collections::ICollection
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
[System.Runtime.InteropServices.ComVisible(false)]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.ICollection
[System.Runtime.InteropServices.ComVisible(false)]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.ICollection
type BlockingCollection<'T> = class
interface seq<'T>
interface IEnumerable
interface IReadOnlyCollection<'T>
interface ICollection
interface IDisposable
[<System.Runtime.Versioning.UnsupportedOSPlatform("browser")>]
type BlockingCollection<'T> = class
interface seq<'T>
interface IEnumerable
interface IReadOnlyCollection<'T>
interface ICollection
interface IDisposable
[<System.Runtime.InteropServices.ComVisible(false)>]
type BlockingCollection<'T> = class
interface seq<'T>
interface ICollection
interface IEnumerable
interface IDisposable
[<System.Runtime.InteropServices.ComVisible(false)>]
type BlockingCollection<'T> = class
interface seq<'T>
interface IEnumerable
interface ICollection
interface IDisposable
interface IReadOnlyCollection<'T>
type BlockingCollection<'T> = class
interface seq<'T>
interface ICollection
interface IEnumerable
interface IDisposable
Public Class BlockingCollection(Of T)
Implements ICollection, IDisposable, IEnumerable(Of T), IReadOnlyCollection(Of T)
Public Class BlockingCollection(Of T)
Implements ICollection, IDisposable, IEnumerable(Of T)
형식 매개 변수
- T
컬렉션에 있는 요소의 형식입니다.
- 상속
-
BlockingCollection<T>
- 특성
- 구현
예제
다음 예제에서는 차단 컬렉션에서 동시에 항목을 추가하고 가져오는 방법을 보여줍니다.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class BlockingCollectionDemo
{
static async Task Main()
{
await AddTakeDemo.BC_AddTakeCompleteAdding();
TryTakeDemo.BC_TryTake();
FromToAnyDemo.BC_FromToAny();
await ConsumingEnumerableDemo.BC_GetConsumingEnumerable();
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
}
}
class AddTakeDemo
{
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.Take()
// BlockingCollection<T>.CompleteAdding()
public static async Task BC_AddTakeCompleteAdding()
{
using (BlockingCollection<int> bc = new BlockingCollection<int>())
{
// Spin up a Task to populate the BlockingCollection
Task t1 = Task.Run(() =>
{
bc.Add(1);
bc.Add(2);
bc.Add(3);
bc.CompleteAdding();
});
// Spin up a Task to consume the BlockingCollection
Task t2 = Task.Run(() =>
{
try
{
// Consume the BlockingCollection
while (true) Console.WriteLine(bc.Take());
}
catch (InvalidOperationException)
{
// An InvalidOperationException means that Take() was called on a completed collection
Console.WriteLine("That's All!");
}
});
await Task.WhenAll(t1, t2);
}
}
}
class TryTakeDemo
{
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.TryTake()
// BlockingCollection<T>.IsCompleted
public static void BC_TryTake()
{
// Construct and fill our BlockingCollection
using (BlockingCollection<int> bc = new BlockingCollection<int>())
{
int NUMITEMS = 10000;
for (int i = 0; i < NUMITEMS; i++) bc.Add(i);
bc.CompleteAdding();
int outerSum = 0;
// Delegate for consuming the BlockingCollection and adding up all items
Action action = () =>
{
int localItem;
int localSum = 0;
while (bc.TryTake(out localItem)) localSum += localItem;
Interlocked.Add(ref outerSum, localSum);
};
// Launch three parallel actions to consume the BlockingCollection
Parallel.Invoke(action, action, action);
Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2));
Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted);
}
}
}
class FromToAnyDemo
{
// Demonstrates:
// Bounded BlockingCollection<T>
// BlockingCollection<T>.TryAddToAny()
// BlockingCollection<T>.TryTakeFromAny()
public static void BC_FromToAny()
{
BlockingCollection<int>[] bcs = new BlockingCollection<int>[2];
bcs[0] = new BlockingCollection<int>(5); // collection bounded to 5 items
bcs[1] = new BlockingCollection<int>(5); // collection bounded to 5 items
// Should be able to add 10 items w/o blocking
int numFailures = 0;
for (int i = 0; i < 10; i++)
{
if (BlockingCollection<int>.TryAddToAny(bcs, i) == -1) numFailures++;
}
Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures);
// Should be able to retrieve 10 items
int numItems = 0;
int item;
while (BlockingCollection<int>.TryTakeFromAny(bcs, out item) != -1) numItems++;
Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems);
}
}
class ConsumingEnumerableDemo
{
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.GetConsumingEnumerable()
public static async Task BC_GetConsumingEnumerable()
{
using (BlockingCollection<int> bc = new BlockingCollection<int>())
{
// Kick off a producer task
var producerTask = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
bc.Add(i);
Console.WriteLine($"Producing: {i}");
await Task.Delay(100); // sleep 100 ms between adds
}
// Need to do this to keep foreach below from hanging
bc.CompleteAdding();
});
// Now consume the blocking collection with foreach.
// Use bc.GetConsumingEnumerable() instead of just bc because the
// former will block waiting for completion and the latter will
// simply take a snapshot of the current state of the underlying collection.
foreach (var item in bc.GetConsumingEnumerable())
{
Console.WriteLine($"Consuming: {item}");
}
await producerTask; // Allow task to complete cleanup
}
}
}
open System
open System.Collections.Concurrent
open System.Threading
open System.Threading.Tasks
module AddTakeDemo =
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.Take()
// BlockingCollection<T>.CompleteAdding()
let blockingCollectionAddTakeCompleteAdding () =
task {
use bc = new BlockingCollection<int>()
// Spin up a Task to populate the BlockingCollection
let t1 =
task {
bc.Add 1
bc.Add 2
bc.Add 3
bc.CompleteAdding()
}
// Spin up a Task to consume the BlockingCollection
let t2 =
task {
try
// Consume consume the BlockingCollection
while true do
printfn $"{bc.Take()}"
with :? InvalidOperationException ->
// An InvalidOperationException means that Take() was called on a completed collection
printfn "That's All!"
}
let! _ = Task.WhenAll(t1, t2)
()
}
module TryTakeDemo =
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.TryTake()
// BlockingCollection<T>.IsCompleted
let blockingCollectionTryTake () =
// Construct and fill our BlockingCollection
use bc = new BlockingCollection<int>()
let NUMITEMS = 10000;
for i = 0 to NUMITEMS - 1 do
bc.Add i
bc.CompleteAdding()
let mutable outerSum = 0
// Delegate for consuming the BlockingCollection and adding up all items
let action =
Action(fun () ->
let mutable localItem = 0
let mutable localSum = 0
while bc.TryTake &localItem do
localSum <- localSum + localItem
Interlocked.Add(&outerSum, localSum)
|> ignore)
// Launch three parallel actions to consume the BlockingCollection
Parallel.Invoke(action, action, action)
printfn $"Sum[0..{NUMITEMS}) = {outerSum}, should be {((NUMITEMS * (NUMITEMS - 1)) / 2)}"
printfn $"bc.IsCompleted = {bc.IsCompleted} (should be true)"
module FromToAnyDemo =
// Demonstrates:
// Bounded BlockingCollection<T>
// BlockingCollection<T>.TryAddToAny()
// BlockingCollection<T>.TryTakeFromAny()
let blockingCollectionFromToAny () =
let bcs =
[|
new BlockingCollection<int>(5) // collection bounded to 5 items
new BlockingCollection<int>(5) // collection bounded to 5 items
|]
// Should be able to add 10 items w/o blocking
let mutable numFailures = 0;
for i = 0 to 9 do
if BlockingCollection<int>.TryAddToAny(bcs, i) = -1 then
numFailures <- numFailures + 1
printfn $"TryAddToAny: {numFailures} failures (should be 0)"
// Should be able to retrieve 10 items
let mutable numItems = 0
let mutable item = 0
while BlockingCollection<int>.TryTakeFromAny(bcs, &item) <> -1 do
numItems <- numItems + 1
printfn $"TryTakeFromAny: retrieved {numItems} items (should be 10)"
module ConsumingEnumerableDemo =
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.GetConsumingEnumerable()
let blockingCollectionGetConsumingEnumerable () =
task {
use bc = new BlockingCollection<int>()
// Kick off a producer task
let producerTask =
task {
for i = 0 to 9 do
bc.Add i
printfn $"Producing: {i}"
do! Task.Delay 100 // sleep 100 ms between adds
// Need to do this to keep foreach below from hanging
bc.CompleteAdding()
}
// Now consume the blocking collection with foreach.
// Use bc.GetConsumingEnumerable() instead of just bc because the
// former will block waiting for completion and the latter will
// simply take a snapshot of the current state of the underlying collection.
for item in bc.GetConsumingEnumerable() do
printfn $"Consuming: {item}"
do! producerTask // Allow task to complete cleanup
}
let main =
task {
do! AddTakeDemo.blockingCollectionAddTakeCompleteAdding ()
TryTakeDemo.blockingCollectionTryTake ()
FromToAnyDemo.blockingCollectionFromToAny ()
do! ConsumingEnumerableDemo.blockingCollectionGetConsumingEnumerable ()
printfn "Press any key to exit."
Console.ReadKey(true) |> ignore
}
main.Wait()
Imports System.Threading.Tasks
Imports System.Collections.Concurrent
Imports System.Threading
Class BlockingCollectionDemo
Shared Sub Main()
AddTakeDemo.BC_AddTakeCompleteAdding()
TryTakeDemo.BC_TryTake()
ToAnyDemo.BC_ToAny()
ConsumingEnumerableDemo.BC_GetConsumingEnumerable()
' Keep the console window open in debug mode
Console.WriteLine("Press any key to exit.")
Console.ReadKey()
End Sub
End Class
Class AddTakeDemo
' Demonstrates:
' BlockingCollection<T>.Add()
' BlockingCollection<T>.Take()
' BlockingCollection<T>.CompleteAdding()
Shared Sub BC_AddTakeCompleteAdding()
Using bc As New BlockingCollection(Of Integer)()
' Spin up a Task to populate the BlockingCollection
Using t1 As Task = Task.Factory.StartNew(
Sub()
bc.Add(1)
bc.Add(2)
bc.Add(3)
bc.CompleteAdding()
End Sub)
' Spin up a Task to consume the BlockingCollection
Using t2 As Task = Task.Factory.StartNew(
Sub()
Try
' Consume the BlockingCollection
While True
Console.WriteLine(bc.Take())
End While
Catch generatedExceptionName As InvalidOperationException
' An InvalidOperationException means that Take() was called on a completed collection
Console.WriteLine("That's All!")
End Try
End Sub)
Task.WaitAll(t1, t2)
End Using
End Using
End Using
End Sub
End Class
'Imports System.Collections.Concurrent
'Imports System.Threading
'Imports System.Threading.Tasks
Class TryTakeDemo
' Demonstrates:
' BlockingCollection<T>.Add()
' BlockingCollection<T>.CompleteAdding()
' BlockingCollection<T>.TryTake()
' BlockingCollection<T>.IsCompleted
Shared Sub BC_TryTake()
' Construct and fill our BlockingCollection
Using bc As New BlockingCollection(Of Integer)()
Dim NUMITEMS As Integer = 10000
For i As Integer = 0 To NUMITEMS - 1
bc.Add(i)
Next
bc.CompleteAdding()
Dim outerSum As Integer = 0
' Delegate for consuming the BlockingCollection and adding up all items
Dim action As Action =
Sub()
Dim localItem As Integer
Dim localSum As Integer = 0
While bc.TryTake(localItem)
localSum += localItem
End While
Interlocked.Add(outerSum, localSum)
End Sub
' Launch three parallel actions to consume the BlockingCollection
Parallel.Invoke(action, action, action)
Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2))
Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted)
End Using
End Sub
End Class
'Imports System.Threading.Tasks
'Imports System.Collections.Concurrent
' Demonstrates:
' Bounded BlockingCollection<T>
' BlockingCollection<T>.TryAddToAny()
' BlockingCollection<T>.TryTakeFromAny()
Class ToAnyDemo
Shared Sub BC_ToAny()
Dim bcs As BlockingCollection(Of Integer)() = New BlockingCollection(Of Integer)(1) {}
bcs(0) = New BlockingCollection(Of Integer)(5)
' collection bounded to 5 items
bcs(1) = New BlockingCollection(Of Integer)(5)
' collection bounded to 5 items
' Should be able to add 10 items w/o blocking
Dim numFailures As Integer = 0
For i As Integer = 0 To 9
If BlockingCollection(Of Integer).TryAddToAny(bcs, i) = -1 Then
numFailures += 1
End If
Next
Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures)
' Should be able to retrieve 10 items
Dim numItems As Integer = 0
Dim item As Integer
While BlockingCollection(Of Integer).TryTakeFromAny(bcs, item) <> -1
numItems += 1
End While
Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems)
End Sub
End Class
'Imports System.Threading.Tasks
'Imports System.Collections.Concurrent
' Demonstrates:
' BlockingCollection<T>.Add()
' BlockingCollection<T>.CompleteAdding()
' BlockingCollection<T>.GetConsumingEnumerable()
Class ConsumingEnumerableDemo
Shared Sub BC_GetConsumingEnumerable()
Using bc As New BlockingCollection(Of Integer)()
' Kick off a producer task
Task.Factory.StartNew(
Sub()
For i As Integer = 0 To 9
bc.Add(i)
' sleep 100 ms between adds
Thread.Sleep(100)
Next
' Need to do this to keep foreach below from not responding.
bc.CompleteAdding()
End Sub)
' Now consume the blocking collection with foreach.
' Use bc.GetConsumingEnumerable() instead of just bc because the
' former will block waiting for completion and the latter will
' simply take a snapshot of the current state of the underlying collection.
For Each item In bc.GetConsumingEnumerable()
Console.WriteLine(item)
Next
End Using
End Sub
End Class
설명
BlockingCollection<T> 다음을 제공하는 스레드로부터 안전한 컬렉션 클래스입니다.
생산자/소비자 패턴의 구현입니다. BlockingCollection<T>IProducerConsumerCollection<T> 인터페이스의 래퍼입니다.
TryAdd 또는 TryTake 메서드에서 CancellationToken 개체를 사용하여 Add 또는 Take 작업을 취소합니다.
중요하다
이 형식은 IDisposable 인터페이스를 구현합니다. 형식 사용을 마쳤으면 직접 또는 간접적으로 삭제해야 합니다. 형식을 직접 삭제하려면 try
/catch
블록에서 해당 Dispose 메서드를 호출합니다. 간접적으로 삭제하려면 using
(C#) 또는 Using
(Visual Basic)와 같은 언어 구문을 사용합니다. 자세한 내용은 IDisposable 인터페이스 항목의 "IDisposable을 구현하는 개체 사용" 섹션을 참조하세요. 또한 Dispose() 메서드는 스레드로부터 안전하지 않습니다. 다른 모든 공용 및 보호된 BlockingCollection<T> 멤버는 스레드로부터 안전하며 여러 스레드에서 동시에 사용할 수 있습니다.
IProducerConsumerCollection<T> 스레드로부터 안전한 데이터 추가 및 제거를 허용하는 컬렉션을 나타냅니다. BlockingCollection<T> IProducerConsumerCollection<T> 인스턴스의 래퍼로 사용되며, 데이터를 제거할 수 있을 때까지 컬렉션에서 제거 시도를 차단할 수 있습니다. 마찬가지로 IProducerConsumerCollection<T>허용된 데이터 요소 수에 상한을 적용하는 BlockingCollection<T> 만들 수 있습니다. 컬렉션에 대한 추가 시도는 추가된 항목을 저장할 공간을 사용할 수 있을 때까지 차단될 수 있습니다. 이러한 방식으로 BlockingCollection<T> 기본 데이터 스토리지 메커니즘이 IProducerConsumerCollection<T>추상화된다는 점을 제외하고 기존의 차단 큐 데이터 구조와 유사합니다.
BlockingCollection<T> 경계 및 차단을 지원합니다. 경계는 컬렉션의 최대 용량을 설정할 수 있음을 의미합니다. 경계는 메모리에서 컬렉션의 최대 크기를 제어할 수 있고 생성 스레드가 소비 스레드보다 너무 멀리 이동하지 못하도록 하기 때문에 특정 시나리오에서 중요합니다. 여러 스레드 또는 태스크가 동시에 컬렉션에 항목을 추가할 수 있으며 컬렉션이 지정된 최대 용량에 도달하면 항목이 제거될 때까지 생성 스레드가 차단됩니다. 여러 소비자가 동시에 항목을 제거할 수 있으며 컬렉션이 비어 있으면 생산자가 항목을 추가할 때까지 소비 스레드가 차단됩니다. 생성 스레드는 더 이상 항목이 추가되지 않음을 나타내기 위해 CompleteAdding 메서드를 호출할 수 있습니다. 소비자는 컬렉션이 비어 있고 더 이상 항목이 추가되지 않는 시기를 알 수 있도록 IsCompleted 속성을 모니터링합니다.
Add 및 Take 작업은 일반적으로 루프에서 수행됩니다.
CancellationToken 개체를 TryAdd 또는 TryTake 메서드에 전달한 다음 각 반복에서 토큰의 IsCancellationRequested 속성 값을 확인하여 루프를 취소할 수 있습니다. 값이 true
경우 리소스를 정리하고 루프를 종료하여 취소 요청에 응답해야 합니다.
BlockingCollection<T> 개체를 만들 때는 제한된 용량뿐만 아니라 사용할 컬렉션 유형도 지정할 수 있습니다. 예를 들어 FIFO(선입선행) 동작에 대한 ConcurrentQueue<T> 개체 또는 LIFO(last in, first out) 동작에 대한 ConcurrentStack<T> 개체를 지정할 수 있습니다. IProducerConsumerCollection<T> 인터페이스를 구현하는 컬렉션 클래스를 사용할 수 있습니다. BlockingCollection<T> 기본 컬렉션 형식은 ConcurrentQueue<T>.
기본 컬렉션을 직접 수정하지 마세요. BlockingCollection<T> 메서드를 사용하여 요소를 추가하거나 제거합니다. 기본 컬렉션을 직접 변경하면 BlockingCollection<T> 개체가 손상될 수 있습니다.
BlockingCollection<T> 비동기 액세스를 염두에 두고 설계되지 않았습니다. 애플리케이션에 비동기 생산자/소비자 시나리오가 필요한 경우 대신 Channel<T> 사용하는 것이 좋습니다.
생성자
BlockingCollection<T>() |
상한 없이 BlockingCollection<T> 클래스의 새 인스턴스를 초기화합니다. |
BlockingCollection<T>(Int32) |
지정된 상한을 사용하여 BlockingCollection<T> 클래스의 새 인스턴스를 초기화합니다. |
BlockingCollection<T>(IProducerConsumerCollection<T>) |
상한 없이 제공된 IProducerConsumerCollection<T> 기본 데이터 저장소로 사용하여 BlockingCollection<T> 클래스의 새 인스턴스를 초기화합니다. |
BlockingCollection<T>(IProducerConsumerCollection<T>, Int32) |
지정된 상한을 사용하여 제공된 IProducerConsumerCollection<T> 기본 데이터 저장소로 사용하여 BlockingCollection<T> 클래스의 새 인스턴스를 초기화합니다. |
속성
BoundedCapacity |
이 BlockingCollection<T> 인스턴스의 제한된 용량을 가져옵니다. |
Count |
BlockingCollection<T>포함된 항목 수를 가져옵니다. |
IsAddingCompleted |
이 BlockingCollection<T> 추가에 대한 완료로 표시되었는지 여부를 가져옵니다. |
IsCompleted |
이 BlockingCollection<T> 추가에 대한 완료로 표시되고 비어 있는지 여부를 가져옵니다. |
메서드
명시적 인터페이스 구현
ICollection.CopyTo(Array, Int32) |
BlockingCollection<T> 인스턴스의 모든 항목을 대상 배열의 지정된 인덱스에서 시작하여 호환되는 1차원 배열에 복사합니다. |
ICollection.IsSynchronized |
ICollection 대한 액세스가 동기화되는지 여부를 나타내는 값을 가져옵니다(스레드로부터 안전). |
ICollection.SyncRoot |
ICollection대한 액세스를 동기화하는 데 사용할 수 있는 개체를 가져옵니다. 이 속성은 지원되지 않습니다. |
IEnumerable.GetEnumerator() |
컬렉션의 항목에 대한 IEnumerator 제공합니다. |
IEnumerable<T>.GetEnumerator() |
컬렉션의 항목에 대한 IEnumerator<T> 제공합니다. |
확장 메서드
적용 대상
스레드 보안
Dispose 메서드는 스레드로부터 안전하지 않습니다. 다른 모든 공용 및 보호된 BlockingCollection<T> 멤버는 스레드로부터 안전하며 여러 스레드에서 동시에 사용할 수 있습니다.
추가 정보
- Thread-Safe 컬렉션
- BlockingCollection 개요
- 방법: 컬렉션 클래스 경계 및 차단 기능 추가
.NET