BlockingCollection – přehled

BlockingCollection<T> je třída kolekce bezpečná pro přístup z více vláken, která poskytuje následující funkce:

  • Implementace vzoru Producer-Consumer.

  • Souběžné přidávání a přebírání položek z více vláken

  • Volitelná maximální kapacita.

  • Operace vložení a odebrání, které blokují, když je kolekce prázdná nebo plná

  • Vložení a odebrání operací try, které neblokují nebo blokují až do zadaného časového období.

  • Zapouzdřuje všechny typy kolekcí, které implementují IProducerConsumerCollection<T>

  • Zrušení pomocí tokenů zrušení

  • Dva druhy výčtu s foreach (For Each v jazyce Visual Basic):

    1. Výčet jen pro čtení

    2. Výčet, který odebere položky při jejich výčtu.

Podpora ohraničování a blokování

BlockingCollection<T> podporuje ohraničování a blokování. Ohraničující znamená, že můžete nastavit maximální kapacitu kolekce. Ohraničování je důležité v určitých scénářích, protože umožňuje řídit maximální velikost kolekce v paměti a brání vytváření vláken v příliš daleko před spotřebovávacími vlákny.

Více vláken nebo úloh může přidávat položky do kolekce současně a pokud kolekce dosáhne zadané maximální kapacity, budou generovaná vlákna blokovaná, dokud se položka neodebere. Více příjemců může položky odebrat současně a pokud se kolekce stane prázdnou, zablokují se spotřebovává vlákna, dokud producent položku přidá. Vygenerování vlákna může volat CompleteAdding , že nebudou přidány žádné další položky. Uživatelé sledují IsCompleted vlastnost, aby věděli, kdy je kolekce prázdná a nebudou přidány žádné další položky. Následující příklad ukazuje jednoduchý BlockingCollection s ohraničenou kapacitou 100. Úkol producenta přidá položky do kolekce, pokud je splněna některá externí podmínka a pak volá CompleteAdding. Úkol příjemce přebírá položky, dokud IsCompleted není vlastnost pravdivá.

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

Úplný příklad naleznete v tématu Postupy: Přidání a převzetí položek jednotlivě z BlockingCollection.

Operace blokování s časovým limitem

V časově blokovaných a TryAddTryTake operacích v ohraničených kolekcích se metoda pokusí přidat nebo vzít položku. Pokud je položka k dispozici, je umístěna do proměnné, která byla předána odkazem, a metoda vrátí true. Pokud se po zadaném časovém limitu nenačte žádná položka, vrátí metoda hodnotu false. Vlákno je pak zdarma provést některé další užitečné práce předtím, než se pokusíte znovu získat přístup k kolekci. Příklad časového blokování přístupu najdete v druhém příkladu v tématu Postupy: Přidání a převzetí položek jednotlivě z BlockingCollection.

Zrušení operací přidání a převzetí

Operace přidání a převzetí se obvykle provádějí ve smyčce. Smyčku můžete zrušit předáním CancellationToken metody TryAdd nebo TryTake metody a následnou kontrolou hodnoty vlastnosti tokenu IsCancellationRequested v každé iteraci. Pokud je hodnota true, je na vás odpovědět na žádost o zrušení vyčištěním všech prostředků a ukončením smyčky. Následující příklad ukazuje přetížení TryAdd , které přebírá token zrušení a kód, který ho používá:

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

Příklad přidání podpory zrušení naleznete v druhém příkladu v tématu Postupy: Přidání a převzetí položek jednotlivě z BlockingCollection.

Určení typu kolekce

Při vytváření BlockingCollection<T>můžete zadat nejen ohraničenou kapacitu, ale také typ kolekce, která se má použít. Můžete například zadat ConcurrentQueue<T> chování fiFO (first in-first out) nebo ConcurrentStack<T> pro poslední chování typu in-first out (LIFO). Můžete použít libovolnou třídu kolekce, která implementuje IProducerConsumerCollection<T> rozhraní. Výchozí typ kolekce je BlockingCollection<T>ConcurrentQueue<T>. Následující příklad kódu ukazuje, jak vytvořit BlockingCollection<T> řetězec, který má kapacitu 1 000 a používá ConcurrentBag<T>:

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)  
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );  

Další informace naleznete v tématu Postupy: Přidání ohraničující a blokující funkce do kolekce.

Podpora IEnumerable

BlockingCollection<T> poskytuje metodu GetConsumingEnumerable , která uživatelům umožňuje používat foreach (For Each v jazyce Visual Basic) odebrat položky, dokud se kolekce nedokončí, což znamená, že je prázdná a nebudou přidány žádné další položky. Další informace naleznete v tématu Postupy: Použití příkazu ForEach k odebrání položek v BlockingCollection.

Použití mnoha blockingcollections jako jedné

Ve scénářích, ve kterých příjemce potřebuje převzít položky z více kolekcí současně, můžete vytvořit pole BlockingCollection<T> a použít statické metody, jako TakeFromAny je a AddToAny které se přidají nebo převezmou z libovolné kolekce v poli. Pokud jedna kolekce blokuje, metoda okamžitě pokusí jinou, dokud nenajde jednu, která může provést operaci. Další informace naleznete v tématu Postupy: Použití polí blokujících kolekcí v kanálu.

Viz také