Comment : utiliser la boucle ForEach pour supprimer les éléments d'un BlockingCollection
Outre l'extraction d'éléments à partir d'une BlockingCollection<T> à l'aide des méthodes Take et TryTake, vous pouvez également utiliser une instruction foreach (For Each en Visual Basic) pour supprimer des éléments jusqu'à ce que l'ajout soit terminé et la collection vide. Cela s'appelle une énumération de mutation ou énumération de consommation car, contrairement à une boucle foreachFor Each) typique, cet énumérateur modifie la collection source en supprimant ses éléments.
Exemple
L'exemple suivant indique comment supprimer tous les éléments d'une BlockingCollection<T> à l'aide d'une boucle foreachFor Each).
Option Strict On
Option Explicit On
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent
Module EnumerateBC
Class Program
' Limit the collection size to 2000 items
' at any given time. Set itemsToProduce to >500
' to hit the limit.
Const upperLimit As Integer = 1000
' Adjust this number to see how it impacts
' the producing-consuming pattern.
Const itemsToProduce As Integer = 100
Shared collection As BlockingCollection(Of Long) = New BlockingCollection(Of Long)(upperLimit)
' Variables for diagnostic output only.
Shared sw As New Stopwatch()
Shared totalAdditions As Integer = 0
' Counter for synchronizing producers.
Shared producersStillRunning As Integer = 2
Shared Sub Main()
' Start the stopwatch.
sw.Start()
' Queue the Producer threads.
Dim task1 = Task.Factory.StartNew(Sub() RunProducer("A", 0))
Dim task2 = Task.Factory.StartNew(Sub() RunProducer("B", itemsToProduce))
' Store in an array for use with ContinueWhenAll
Dim producers() As Task = {task1, task2}
' Create a cleanup task that will call CompleteAdding after
' all producers are done adding items.
Dim cleanup As Task = Task.Factory.ContinueWhenAll(producers, Sub(p) collection.CompleteAdding())
' Queue the Consumer thread. Put this call
' before Parallel.Invoke to begin consuming as soon as
' the producers add items.
Task.Factory.StartNew(Sub() RunConsumer())
' Keep the console window open while the
' consumer thread completes its output.
Console.ReadKey()
End Sub
Shared Sub RunProducer(ByVal ID As String, ByVal start As Integer)
Dim additions As Integer = 0
For i As Integer = start To start + itemsToProduce - 1
' The data that is added to the collection.
Dim ticks As Long = sw.ElapsedTicks
'Display additions and subtractions.
Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i)
' Don't try to add item after CompleteAdding
' has been called.
If collection.IsAddingCompleted = False Then
collection.Add(ticks)
End If
' Counter for demonstration purposes only.
additions = additions + 1
' Uncomment this line to
' slow down the producer threads without sleeping.
Thread.SpinWait(100000)
Next
Interlocked.Add(totalAdditions, additions)
Console.WriteLine("{0} is done adding: {1} items", ID, additions)
End Sub
Shared Sub RunConsumer()
' GetConsumingEnumerable returns the enumerator for the
' underlying collection.
Dim subtractions As Integer = 0
For Each item In collection.GetConsumingEnumerable
subtractions = subtractions + 1
Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
item.ToString("D18"), subtractions, collection.Count)
Next
Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
totalAdditions, subtractions, collection.Count())
sw.Stop()
Console.WriteLine("Press any key to exit.")
End Sub
End Class
End Module
namespace EnumerateBlockingCollection
{
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
class Program
{
// Limit the collection size to 2000 items
// at any given time. Set itemsToProduce to >500
// to hit the limit.
const int upperLimit = 1000;
// Adjust this number to see how it impacts
// the producing-consuming pattern.
const int itemsToProduce = 100;
static BlockingCollection<long> collection = new BlockingCollection<long>(upperLimit);
// Variables for diagnostic output only.
static Stopwatch sw = new Stopwatch();
static int totalAdditions = 0;
// Counter for synchronizing producers.
static int producersStillRunning = 2;
static void Main(string[] args)
{
// Start the stopwatch.
sw.Start();
// Queue the Producer threads. Store in an array
// for use with ContinueWhenAll
Task[] producers = new Task[2];
producers[0] = Task.Factory.StartNew(() => RunProducer("A", 0));
producers[1] = Task.Factory.StartNew(() => RunProducer("B", itemsToProduce));
// Create a cleanup task that will call CompleteAdding after
// all producers are done adding items.
Task cleanup = Task.Factory.ContinueWhenAll(producers, (p) => collection.CompleteAdding());
// Queue the Consumer thread. Put this call
// before Parallel.Invoke to begin consuming as soon as
// the producers add items.
Task.Factory.StartNew(() => RunConsumer());
// Keep the console window open while the
// consumer thread completes its output.
Console.ReadKey();
}
static void RunProducer(string ID, int start)
{
int additions = 0;
for (int i = start; i < start + itemsToProduce; i++)
{
// The data that is added to the collection.
long ticks = sw.ElapsedTicks;
// Display additions and subtractions.
Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i);
if(!collection.IsAddingCompleted)
collection.Add(ticks);
// Counter for demonstration purposes only.
additions++;
// Uncomment this line to
// slow down the producer threads ing.
Thread.SpinWait(100000);
}
Interlocked.Add(ref totalAdditions, additions);
Console.WriteLine("{0} is done adding: {1} items", ID, additions);
}
static void RunConsumer()
{
// GetConsumingEnumerable returns the enumerator for the
// underlying collection.
int subtractions = 0;
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
item.ToString("D18"), subtractions++, collection.Count);
}
Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
totalAdditions, subtractions, collection.Count());
sw.Stop();
Console.WriteLine("Press any key to exit");
}
}
}
Cet exemple utilise une boucle foreach avec la méthode BlockingCollection<T>.GetConsumingEnumerable dans le thread consommateur, ce qui provoque la suppression de chaque élément de la collection pendant son énumération. System.Collections.Concurrent.BlockingCollection<T> limite le nombre maximal d'éléments présents dans la collection à tout moment. Cette façon d'énumérer la collection bloque le thread consommateur si aucun élément n'est disponible ou si la collection est vide. Dans cet exemple, le blocage n'est pas un problème car le thread producteur ajoute des éléments plus vite qu'ils peuvent être consommés.
Il n'y a aucune garantie que les éléments soient énumérés dans le même ordre que celui dans lequel ils ont été ajoutés par les threads producteurs.
Pour énumérer la collection sans la modifier, utilisez seulement la boucle foreach (For Each) sans la méthode GetConsumingEnumerable. Toutefois, il est important de comprendre que ce genre d'énumération représente un instantané de la collection à un moment donné. Si d'autres threads ajoutent ou suppriment des éléments simultanément pendant l'exécution de la boucle, la boucle peut ne pas représenter l'état réel de la collection.