次の方法で共有


BlockingCollection の概要

BlockingCollection<T> は、次の機能を提供するスレッド セーフなコレクション クラスです。

  • Producer-Consumer パターンの実装。

  • 複数のスレッドからの項目の同時追加と取得。

  • オプションの最大容量。

  • コレクションが空または完全な場合にブロックする挿入操作と削除操作。

  • 挿入と削除は、ブロックしないか、指定した期間までブロックする "try" 操作です。

  • 実装するコレクション型をカプセル化します。 IProducerConsumerCollection<T>

  • キャンセル トークンを使用した取り消し。

  • foreachを持つ 2 種類の列挙 (Visual Basic のFor Each):

    1. 読み取り専用列挙。

    2. 列挙されたアイテムを削除する列挙。

境界とブロックのサポート

BlockingCollection<T> では、境界とブロックがサポートされます。 境界は、コレクションの最大容量を設定できることを意味します。 境界は、メモリ内のコレクションの最大サイズを制御できるため、特定のシナリオで重要です。これにより、生成されるスレッドが消費するスレッドの前に移動しすぎなくなります。

複数のスレッドまたはタスクが同時にコレクションに項目を追加でき、コレクションが指定された最大容量に達すると、アイテムが削除されるまで生成スレッドはブロックされます。 複数のコンシューマーが同時に項目を削除でき、コレクションが空になると、プロデューサーが項目を追加するまで、消費スレッドはブロックされます。 生成スレッドは、 CompleteAdding を呼び出して、追加される項目がなくなったことを示すことができます。 コンシューマーは、 IsCompleted プロパティを監視して、コレクションが空で項目が追加されないタイミングを確認します。 次の例は、有界容量が 100 の単純な BlockingCollection を示しています。 プロデューサー タスクは、外部条件が true である限りコレクションに項目を追加し、 CompleteAddingを呼び出します。 コンシューマー タスクは、 IsCompleted プロパティが true になるまで項目を受け取ります。

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

完全な例については、「 方法: BlockingCollection から個別に項目を追加および取得する」を参照してください。

時間の経過したブロック操作

制限付きコレクションに対する TryAdd および TryTake の時間指定されたブロック操作では、メソッドは項目の追加または取得を試みます。 項目が使用可能な場合は、参照渡しされた変数に配置され、メソッドは true を返します。 指定したタイムアウト期間の後に項目が取得されない場合、メソッドは false を返します。 スレッドは、コレクションへのアクセスを再試行する前に、他の便利な作業を自由に行うことができます。 時間指定されたブロック アクセスの例については、「 方法: BlockingCollection から個別に項目を追加および取得する」の 2 番目の例を参照してください。

追加操作と取得操作の取り消し

通常、追加操作と Take 操作はループで実行されます。 ループを取り消すには、 CancellationTokenTryAdd メソッドまたは TryTake メソッドに渡し、各イテレーションでトークンの IsCancellationRequested プロパティの値を確認します。 値が true の場合は、リソースをクリーンアップしてループを終了することで、取り消し要求に応答する必要があります。 次の例は、キャンセル トークンを受け取る TryAdd のオーバーロードと、それを使用するコードを示しています。

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

取り消しサポートを追加する方法の例については、「 方法: BlockingCollection から個別に項目を追加および取得する」の 2 番目の例を参照してください。

コレクション型の指定

BlockingCollection<T>を作成するときに、バインドされた容量だけでなく、使用するコレクションの種類も指定できます。 たとえば、先入れ先出し (FIFO) 動作に ConcurrentQueue<T> を指定したり、最後の先入れ先出し (LIFO) 動作に ConcurrentStack<T> を指定したりできます。 IProducerConsumerCollection<T> インターフェイスを実装する任意のコレクション クラスを使用できます。 BlockingCollection<T>の既定のコレクションの種類はConcurrentQueue<T>。 次のコード例は、容量が 1000 でConcurrentBag<T>を使用する文字列のBlockingCollection<T>を作成する方法を示しています。

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)  
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );  

詳細については、「 方法: 境界機能とブロック機能をコレクションに追加する」を参照してください

IEnumerable サポート

BlockingCollection<T>には、コレクションが完了するまでforeach (Visual Basic でFor Each) を使用して項目を削除できるようにするGetConsumingEnumerableメソッドが用意されています。これは、コレクションが空であり、追加されない項目を意味します。 詳細については、「 方法: ForEach を使用して BlockingCollection 内の項目を削除する」を参照してください。

多くの BlockingCollection を 1 つとして使用する

コンシューマーが複数のコレクションから項目を同時に取得する必要があるシナリオでは、 BlockingCollection<T> の配列を作成し、配列内の任意のコレクションに追加または取得する TakeFromAnyAddToAny などの静的メソッドを使用できます。 1 つのコレクションがブロックされている場合、メソッドは、操作を実行できるコレクションが見つかるまで、すぐに別のコレクションを試みます。 詳細については、「 方法: パイプラインでブロッキング コレクションの配列を使用する」を参照してください

こちらも参照ください