BlockingCollection – přehled

BlockingCollection<T> je vláknově bezpečná kolekce třídy, která poskytuje následující funkce:

  • Implementace modelu Producer-Consumer.

  • Souběžné přidávání a přebírání položek z více vláken

  • Volitelná maximální kapacita.

  • Operace vložení a odebrání, které blokují, když je kolekce prázdná nebo plná

  • Operační vložení a odebrání "try", které neblokují nebo blokují po dobu zadaného časového období.

  • Zapouzdřuje všechny typy kolekcí, které implementují IProducerConsumerCollection<T>

  • Zrušení pomocí tokenů zrušení

  • Dva druhy výčtu s foreach (For Each v jazyce Visual Basic):

    1. Výčet jen pro čtení

    2. Výčet, který odebere položky při jejich výčtu.

Podpora ohraničování a blokování

BlockingCollection<T> podporuje ohraničování a blokování. Omezení znamená, že můžete nastavit maximální kapacitu kolekce. Ohraničování je důležité v určitých scénářích, protože umožňuje řídit maximální velikost kolekce v paměti a brání produkujícím vláknům v přílišném předstihu před spotřebovávacími vlákny.

Více vláken nebo úloh může přidávat položky do kolekce současně a pokud kolekce dosáhne zadané maximální kapacity, budou generovaná vlákna blokovaná, dokud se položka neodebere. Více spotřebitelů může odebrat položky současně, a pokud se kolekce stane prázdnou, spotřebitelská vlákna se zablokují, dokud producent přidá položku. Vygenerování vlákna může volat CompleteAdding , že nebudou přidány žádné další položky. Uživatelé sledují IsCompleted vlastnost, aby věděli, kdy je kolekce prázdná a nebudou přidány žádné další položky. Následující příklad ukazuje jednoduchý BlockingCollection s ohraničenou kapacitou 100. Úkol producenta přidá položky do kolekce, pokud je splněna některá externí podmínka a pak volá CompleteAdding. Úkol příjemce přebírá položky, dokud IsCompleted vlastnost není pravdivá.

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

Úplný příklad naleznete v tématu Postupy: Přidání a převzetí položek jednotlivě z BlockingCollection.

Operace blokování s časovým limitem

Při časově omezených blokovacích operacích TryAdd a TryTake na omezených kolekcích se metoda pokusí přidat nebo odebrat položku. Pokud je položka k dispozici, je umístěna do proměnné, která byla předána odkazem, a metoda vrátí true. Pokud se po zadaném časovém limitu nenačte žádná položka, vrátí metoda hodnotu false. Vlákno je pak volné vykonat další užitečnou práci, než se znovu pokusí získat přístup k kolekci. Příklad časového blokování přístupu najdete v druhém příkladu v tématu Postupy: Přidání a převzetí položek jednotlivě z BlockingCollection.

Zrušení operací přidání a odebrání

Operace přidání a odebrání se obvykle provádějí ve smyčce. Smyčku můžete zrušit předáním CancellationToken do metod TryAdd nebo TryTake a následnou kontrolou hodnoty vlastnosti tokenu IsCancellationRequested v každé iteraci. Pokud je hodnota pravdivá, je na vás, abyste odpověděli na žádost o zrušení očištěním všech zdrojů a ukončením smyčky. Následující příklad ukazuje přetížení TryAdd, které přijímá token zrušení, a kód, který ho používá:

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

Příklad přidání podpory zrušení naleznete v druhém příkladu v tématu Postupy: Přidání a převzetí položek jednotlivě z BlockingCollection.

Určení typu kolekce

Při vytváření BlockingCollection<T>můžete zadat nejen ohraničenou kapacitu, ale také typ kolekce, která se má použít. Můžete například zadat ConcurrentQueue<T> pro chování first in-first out (FIFO) nebo ConcurrentStack<T> pro last in-first out (LIFO) chování. Můžete použít libovolnou třídu kolekce, která implementuje IProducerConsumerCollection<T> rozhraní. Výchozí typ kolekce je: BlockingCollection<T>ConcurrentQueue<T>. Následující příklad kódu ukazuje, jak vytvořit BlockingCollection<T> řetězec, který má kapacitu 1 000 a používá ConcurrentBag<T>:

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );

Další informace naleznete v tématu Postupy: Přidání ohraničující a blokující funkce do kolekce.

Podpora IEnumerable

BlockingCollection<T> poskytuje metodu GetConsumingEnumerable , která uživatelům umožňuje používat foreach (For Each v jazyce Visual Basic) odebrat položky, dokud se kolekce nedokončí, což znamená, že je prázdná a nebudou přidány žádné další položky. Další informace naleznete v tématu Postupy: Použití příkazu ForEach k odebrání položek v BlockingCollection.

Použití mnoha BlockingCollections jako jedné

Ve scénářích, kdy spotřebitel potřebuje převzít položky z více kolekcí současně, můžete vytvořit pole BlockingCollection<T> a použít statické metody, jako například TakeFromAny a AddToAny, které přidávají do nebo odebírají z libovolné kolekce v poli. Pokud jedna kolekce blokuje, metoda okamžitě pokusí jinou, dokud nenajde jednu, která může provést operaci. Další informace naleznete v tématu Postupy: Použití polí blokujících kolekcí v kanálu.

Viz také