Delen via


Overzicht van BlockingCollection

BlockingCollection<T> is een thread-safe-verzamelingsklasse die de volgende functies biedt:

  • Een implementatie van het Producer-Consumer patroon.

  • Gelijktijdig toevoegen en ophalen van items uit meerdere threads.

  • Optionele maximale capaciteit.

  • Invoeg- en verwijderingsbewerkingen die blokkeren wanneer de verzameling leeg of vol is.

  • Invoegings- en verwijderingspogingen die niet blokkeren of die blokkeren tot een gespecificeerde tijdsduur.

  • Elk verzamelingstype inkapselen dat IProducerConsumerCollection<T> implementeert

  • Annulering met annuleringstokens.

  • Twee soorten opsomming met foreach (For Each in Visual Basic):

    1. Opsomming met het kenmerk Alleen-lezen.

    2. Enumeratie die items verwijdert terwijl ze worden geënumereerd.

Ondersteuning voor begrenzing en blokkering

BlockingCollection<T> ondersteunt begrenzing en blokkeren. Begrenzing betekent dat u de maximale capaciteit van de verzameling kunt instellen. Begrenzing is belangrijk in bepaalde scenario's, omdat u hiermee de maximale grootte van de verzameling in het geheugen kunt beheren en voorkomt dat de productiethreads zich te ver voor de verbruikende threads bewegen.

Meerdere threads of taken kunnen items gelijktijdig aan de verzameling toevoegen en als de verzameling de opgegeven maximale capaciteit bereikt, worden de productiethreads geblokkeerd totdat een item wordt verwijderd. Meerdere consumenten kunnen items gelijktijdig verwijderen en als de verzameling leeg raakt, worden de consumerende threads geblokkeerd totdat een producent een item heeft toegevoegd. Een producerende thread kan CompleteAdding oproepen om aan te geven dat er geen items meer zullen worden toegevoegd. Consumenten controleren de IsCompleted eigenschap om te weten wanneer de verzameling leeg is en er geen items meer worden toegevoegd. In het volgende voorbeeld ziet u een eenvoudige BlockingCollection met een begrensde capaciteit van 100. Een producenttaak voegt items toe aan de verzameling zolang aan een externe voorwaarde is voldaan en roept vervolgens aan CompleteAdding. De consumententaak neemt items totdat de IsCompleted eigenschap waar is.

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

Zie voor een volledig voorbeeld Hoe: Items afzonderlijk toevoegen en overnemen vanuit een BlockingCollection.

Getimede blokkeringsbewerkingen

Bij getimede blokkeringen TryAdd en TryTake operaties op begrensde verzamelingen probeert de methode een item toe te voegen of te nemen. Als een item beschikbaar is, wordt het in de variabele geplaatst die is doorgegeven via verwijzing en retourneert de methode true. Als er geen item wordt opgehaald na een opgegeven time-outperiode, retourneert de methode false. De thread is vervolgens vrij om nog wat nuttig werk te doen voordat u opnieuw probeert toegang te krijgen tot de verzameling. Zie het tweede voorbeeld in How to: Add and Take Items Individually from a BlockingCollection (Items afzonderlijk toevoegen vanuit een BlockingCollection) voor een voorbeeld van getimede blokkeringstoegang.

Bewerkingen voor toevoegen en wegnemen annuleren

Bewerkingen voor toevoegen en overnemen worden doorgaans uitgevoerd in een lus. U kunt een lus annuleren door een CancellationToken aan de TryAdd of TryTake methode door te geven en vervolgens de waarde van de eigenschap van het token IsCancellationRequested te controleren op elke iteratie. Als de waarde waar is, is het aan u om te reageren op de annuleringsaanvraag door resources op te schonen en de lus af te sluiten. In het volgende voorbeeld ziet u een overladen methode voor TryAdd die een annuleringstoken gebruikt en de code die deze methode aanroept.

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

Zie het tweede voorbeeld in How to: Add and Take Items Individually from a BlockingCollection (Items afzonderlijk toevoegen en overnemen vanuit een BlockingCollection) voor een voorbeeld van het toevoegen van annuleringsondersteuning.

Het verzamelingstype opgeven

Wanneer u een BlockingCollection<T>maakt, kunt u niet alleen de gebonden capaciteit opgeven, maar ook het type verzameling dat u wilt gebruiken. U kunt bijvoorbeeld een ConcurrentQueue<T> voor first in first out (FIFO)-gedrag opgeven, of een ConcurrentStack<T> voor last in first out (LIFO)-gedrag. U kunt elke verzamelingsklasse gebruiken die de IProducerConsumerCollection<T> interface implementeert. Het standaardverzamelingstype BlockingCollection<T> is ConcurrentQueue<T>. In het volgende codevoorbeeld ziet u hoe u een BlockingCollection<T> tekenreeks met een capaciteit van 1000 maakt en een ConcurrentBag<T>:

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)  
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );  

Zie Voor meer informatie : Begrenzings- en blokkeringsfunctionaliteit toevoegen aan een verzameling.

Ondersteuning voor IEnumerable

BlockingCollection<T> biedt een GetConsumingEnumerable methode waarmee consumenten items kunnen verwijderen foreach (For Each in Visual Basic) totdat de verzameling is voltooid. Dit betekent dat deze leeg is en dat er geen items meer worden toegevoegd. Zie voor meer informatie Hoe te: ForEach gebruiken om items in een BlockingCollection te verwijderen.

Veel BlockingCollections als één gebruiken

Voor scenario's waarin een consument items uit meerdere verzamelingen tegelijk moet overnemen, kunt u matrices maken van BlockingCollection<T> en de statische methoden gebruiken, zoals TakeFromAny en AddToAny die worden toegevoegd aan of overgenomen uit een van de verzamelingen in de matrix. Als een verzameling wordt geblokkeerd, probeert de methode onmiddellijk een andere totdat er een wordt gevonden die de bewerking kan uitvoeren. Voor meer informatie, zie Hoe: Gebruik van arrays van blokkerende verzamelingen in een pijplijn.

Zie ook