Panoramica di BlockingCollection

BlockingCollection<T> è una classe di raccolta thread-safe che offre le funzionalità seguenti:

  • Implementazione del modello Producer-Consumer.

  • Aggiunta e rimozione simultanea di elementi da più thread.

  • Capacità massima facoltativa.

  • Operazioni di inserimento e rimozione che si bloccano quando la raccolta è vuota o piena.

  • Operazioni "prova" di inserimento e rimozione che non si bloccano o che si bloccano per un periodo di tempo specificato.

  • Incapsulamento di qualsiasi tipo di raccolta che implementi IProducerConsumerCollection<T>

  • Annullamento con token di annullamento.

  • Due tipi di enumerazione con foreach (For Each in Visual Basic):

    1. Enumerazione di sola lettura.

    2. Enumerazione che rimuove gli elementi quando vengono enumerati.

Supporto di delimitazione e blocco

BlockingCollection<T> supporta delimitazione e blocco. La delimitazione consente di impostare la capacità massima della raccolta. La delimitazione è importante in determinati scenari poiché consente di controllare la dimensione massima della raccolta in memoria, impedendo ai thread Producer di spostarsi troppo oltre i thread Consumer.

Più thread o attività possono aggiungere contemporaneamente elementi alla raccolta, e se la raccolta raggiunge la capacità massima specificata, i thread Producer si bloccano fino a quando non viene rimosso un elemento. Più Consumer possono rimuovere contemporaneamente elementi e quando la raccolta diventa vuota, i thread Consumer si bloccano fino a quando un Producer aggiunge un elemento. Un thread Producer può chiamare CompleteAdding per indicare che non verranno aggiunti altri elementi. I Consumer monitorano la proprietà IsCompleted per sapere quando la raccolta è vuota e non verranno aggiunti altri elementi. Nell'esempio seguente viene illustrato un semplice BlockingCollection con una capacità delimitata di 100. Un'attività Producer aggiunge elementi alla raccolta, purché una condizione esterna sia true e quindi chiama CompleteAdding. L'attività di tipo Consumer accetta elementi finché la proprietà IsCompleted è true.

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

Per un esempio completo, vedere Procedura: Aggiungere e rimuovere singoli elementi di un oggetto BlockingCollection.

Operazioni di blocco temporizzate

Nelle operazioni di blocco temporizzate TryAdd e TryTake sulle raccolte delimitate il metodo tenta di aggiungere o rimuovere un elemento. Se è disponibile un elemento, questo viene posizionato nella variabile che è stata passata per riferimento e il metodo restituisce true. Se dopo un periodo di timeout specificato non viene recuperato alcun elemento, il metodo restituisce false. Il thread è quindi libero di eseguire altre operazioni utili prima di tentare nuovamente di accedere alla raccolta. Per un esempio di accesso con blocco temporizzato, vedere il secondo esempio in Procedura: Aggiungere e rimuovere singoli elementi di un oggetto BlockingCollection.

Annullamento di operazioni di aggiunta e rimozione

Le operazioni di aggiunta e rimozione solitamente vengono eseguite in un ciclo. È possibile annullare un ciclo passando un CancellationToken al metodo TryAdd o TryTake e quindi controllando il valore della proprietà IsCancellationRequested del token in ogni iterazione. Se il valore è true, l'utente deve decidere se rispondere alla richiesta di annullamento cancellando tutte le risorse e uscendo dal ciclo. Nell'esempio seguente viene illustrato un overload di TryAdd che accetta un token di annullamento e il codice che viene usato:

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

Per un esempio di aggiunta del supporto per l'annullamento, vedere il secondo esempio in Procedura: Aggiungere e rimuovere singoli elementi di un oggetto BlockingCollection.

Specifica del tipo di raccolta

Quando si crea un oggetto BlockingCollection<T>, è possibile specificare non solo la capacità delimitata ma anche il tipo di raccolta da usare. Ad esempio, è possibile specificare ConcurrentQueue<T> per il comportamento FIFO (First In, First Out) o ConcurrentStack<T> per il comportamento LIFO (Last In, First Out). È possibile usare qualsiasi classe di raccolta che implementa l'interfaccia IProducerConsumerCollection<T>. Il tipo di raccolta predefinito per BlockingCollection<T> è ConcurrentQueue<T>. L'esempio di codice seguente illustra come creare un oggetto BlockingCollection<T> di stringhe con capacità 1000, che usa un oggetto ConcurrentBag<T>:

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)  
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );  

Per altre informazioni, vedere Procedura: Aggiungere funzionalità di delimitazione e blocco a una raccolta.

Supporto di IEnumerable

BlockingCollection<T> offre un metodo GetConsumingEnumerable che consente agli utenti di usare foreach (For Each in Visual Basic) per rimuovere gli elementi fino a quando non viene completata la raccolta, ovvero finché non è vuota e non vengono aggiunti altri elementi. Per altre informazioni, vedere Procedura: usare ForEach per rimuovere elementi in un oggetto BlockingCollection.

Uso di più oggetti BlockingCollections come uno solo

Per gli scenari in cui un Consumer deve rimuovere elementi da più raccolte contemporaneamente, è possibile creare matrici di BlockingCollection<T> e usare i metodi statici, ad esempio TakeFromAny e AddToAny che eseguiranno operazioni di aggiunta o rimozione in qualsiasi raccolta nella matrice. Se una raccolta è bloccata, il metodo ne cerca immediatamente un'altra finché non ne trova una che può eseguire l'operazione. Per altre informazioni, vedere Procedura: usare matrici di BlockingCollection in una pipeline.

Vedi anche