Partager via


Dataflow (bibliothèque parallèle de tâches)

La bibliothèque parallèle de tâches (TPL) fournit des composants de flux de données destinés à augmenter la robustesse des applications prenant en charge l’accès concurrentiel. Ces composants de dataflow sont collectivement appelés bibliothèque de flux de données TPL. Ce modèle de flux de données favorise la programmation basée sur les acteurs en fournissant un passage de message interne pour les tâches de flux de données à grain grossier et de chaînage. Les composants de flux de données s’appuient sur les types et l’infrastructure de planification du TPL et s’intègrent à la prise en charge du langage C#, Visual Basic et F# pour la programmation asynchrone. Ces composants de flux de données sont utiles lorsque vous avez plusieurs opérations qui doivent communiquer entre elles de manière asynchrone ou lorsque vous souhaitez traiter les données dès qu’elles sont disponibles. Par exemple, considérez une application qui traite les données d’image à partir d’une caméra web. À l’aide du modèle de flux de données, l’application peut traiter les images au fur et à mesure qu’elles deviennent disponibles. Si l’application améliore les images, par exemple en effectuant une correction légère ou une réduction des yeux rouges, vous pouvez créer un pipeline de composants de flux de données. Chaque étape du pipeline peut utiliser des fonctionnalités de parallélisme plus grossières, telles que la fonctionnalité fournie par le TPL, pour transformer l’image.

Ce document fournit une vue d’ensemble de la bibliothèque de flux de données TPL. Il décrit le modèle de programmation, les types de blocs de flux de données prédéfinis et comment configurer des blocs de flux de données pour répondre aux exigences spécifiques de vos applications.

Remarque

La bibliothèque de flux de données TPL (l’espace System.Threading.Tasks.Dataflow de noms) n’est pas distribuée avec .NET. Pour installer l’espace System.Threading.Tasks.Dataflow de noms dans Visual Studio, ouvrez votre projet, choisissez Gérer les packages NuGet dans le menu Projet et recherchez en ligne le System.Threading.Tasks.Dataflow package. Sinon, pour l’installer à l’aide de l’interface CLI .NET Core, exécutez dotnet add package System.Threading.Tasks.Dataflow.

Modèle de programmation

La bibliothèque TPL Dataflow fournit une base pour le passage de messages et la parallélisation des applications intensives en calcul et en E/S, avec un débit élevé et une faible latence. Il vous donne également un contrôle explicite sur la façon dont les données sont mises en mémoire tampon et se déplacent autour du système. Pour mieux comprendre le modèle de programmation de flux de données, envisagez une application qui charge de façon asynchrone des images à partir du disque et crée un composite de ces images. Les modèles de programmation traditionnels nécessitent généralement que vous utilisiez des rappels et des objets de synchronisation, tels que des verrous, pour coordonner les tâches et accéder aux données partagées. En utilisant le modèle de programmation de flux de données, vous pouvez créer des objets de flux de données qui traitent des images à mesure qu’elles sont lues à partir du disque. Sous le modèle de flux de données, vous déclarez comment les données sont gérées lorsqu’elles sont disponibles, ainsi que toutes les dépendances entre les données. Étant donné que le runtime gère les dépendances entre les données, vous pouvez souvent éviter la nécessité de synchroniser l’accès aux données partagées. En outre, étant donné que les planifications du runtime fonctionnent en fonction de l’arrivée asynchrone des données, le flux de données peut améliorer la réactivité et le débit en gérant efficacement les threads sous-jacents. Pour obtenir un exemple qui utilise le modèle de programmation de flux de données pour implémenter le traitement d’images dans une application Windows Forms, consultez Procédure pas à pas : Utilisation du flux de données dans une application Windows Forms.

Sources et cibles

La bibliothèque de flux de données TPL se compose de blocs de flux de données, qui sont des structures de données qui tamponnt et traitent des données. Le TPL définit trois types de blocs de flux de données : les blocs sources, les blocs cibles et les blocs de propagation. Un bloc source agit comme une source de données et peut être lu. Un bloc cible agit en tant que récepteur de données et peut être écrit dans. Un bloc de propagation agit à la fois comme un bloc source et un bloc cible, et peut être lu et écrit dans. Le TPL définit l’interface System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> pour représenter des sources, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> représenter des cibles et System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> représenter des propagateurs. IPropagatorBlock<TInput,TOutput> hérite à la fois de ISourceBlock<TOutput> et de ITargetBlock<TInput>.

La bibliothèque TPL Dataflow fournit plusieurs types de blocs de flux de données prédéfinis qui implémentent les interfaces ISourceBlock<TOutput>, ITargetBlock<TInput>, et IPropagatorBlock<TInput,TOutput>. Ces types de blocs de flux de données sont décrits dans ce document dans la section Types de blocs de flux de données prédéfinis.

Blocs connecteurs

Vous pouvez connecter des blocs de flux de données pour former des pipelines, qui sont des séquences linéaires de blocs de flux de données ou des réseaux, qui sont des graphiques de blocs de flux de données. Un pipeline est une forme de réseau. Dans un pipeline ou un réseau, les sources propagent de façon asynchrone les données aux cibles à mesure que ces données sont disponibles. La ISourceBlock<TOutput>.LinkTo méthode lie un bloc de flux de données source à un bloc cible. Une source peut être liée à zéro ou plusieurs cibles ; les cibles peuvent être liées à partir de zéro ou plusieurs sources. Vous pouvez ajouter ou supprimer simultanément des blocs de flux de données à partir d’un pipeline ou d’un réseau. Les types de blocs de flux de données prédéfinis gèrent tous les aspects de sécurité des threads de liaison et de dissociation.

Pour obtenir un exemple qui connecte des blocs de flux de données pour former un pipeline de base, consultez Procédure pas à pas : Création d’un pipeline de flux de données. Pour obtenir un exemple qui connecte des blocs de flux de données pour former un réseau plus complexe, consultez Procédure pas à pas : Utilisation du flux de données dans une application Windows Forms. Pour obtenir un exemple qui dissocie une cible d’une source après que la source offre le message cible, consultez Comment : dissocier des blocs de flux de données.

Filtrage

Lorsque vous appelez la ISourceBlock<TOutput>.LinkTo méthode pour lier une source à une cible, vous pouvez fournir un délégué qui détermine si le bloc cible accepte ou rejette un message en fonction de la valeur de ce message. Ce mécanisme de filtrage est un moyen utile de garantir qu’un bloc de flux de données ne reçoit que certaines valeurs. Pour la plupart des types de blocs de flux de données prédéfinis, si un bloc source est connecté à plusieurs blocs cibles, lorsqu’un bloc cible rejette un message, la source propose ce message à la cible suivante. L’ordre dans lequel une source propose des messages à des cibles est défini par la source et peut varier en fonction du type de la source. La plupart des types de blocs sources arrêtent d’offrir un message après qu’une cible accepte ce message. Une exception à cette règle est la BroadcastBlock<T> classe, qui offre chaque message à toutes les cibles, même si certaines cibles rejettent le message. Pour obtenir un exemple qui utilise le filtrage pour traiter uniquement certains messages, consultez Procédure pas à pas : utilisation du flux de données dans une application Windows Forms.

Importante

Étant donné que chaque type de bloc de flux de données source prédéfini garantit que les messages sont propagés dans l’ordre dans lequel ils sont reçus, chaque message doit être lu à partir du bloc source avant que le bloc source puisse traiter le message suivant. Par conséquent, lorsque vous utilisez le filtrage pour connecter plusieurs cibles à une source, assurez-vous qu’au moins un bloc cible reçoit chaque message. Sinon, votre application risque d’interblocage.

Passage de messages

Le modèle de programmation de flux de données est lié au concept de passage de messages, où les composants indépendants d’un programme communiquent entre eux en envoyant des messages. Une façon de propager des messages entre les composants d’application consiste à appeler les Post méthodes (synchrones) et SendAsync (asynchrones) pour envoyer des messages aux blocs de flux de données cibles, ainsi que les ReceiveReceiveAsyncTryReceive méthodes permettant de recevoir des messages à partir de blocs sources. Vous pouvez combiner ces méthodes avec des pipelines de flux de données ou des réseaux en envoyant des données d’entrée au nœud principal (bloc cible) et en recevant des données de sortie à partir du nœud terminal du pipeline ou des nœuds terminal du réseau (un ou plusieurs blocs sources). Vous pouvez également utiliser la Choose méthode pour lire à partir du premier des sources fournies qui disposent de données disponibles et effectuer une action sur ces données.

Les blocs sources offrent des données aux blocs cibles en appelant la ITargetBlock<TInput>.OfferMessage méthode. Le bloc cible répond à un message proposé de trois façons : il peut accepter le message, refuser le message ou reporter le message. Lorsque la cible accepte le message, la OfferMessage méthode retourne Accepted. Lorsque la cible refuse le message, la OfferMessage méthode retourne Declined. Lorsque la cible exige qu’elle ne reçoive plus de messages de la source, OfferMessage retourne DecliningPermanently. Les types de blocs source prédéfinis n’offrent pas de messages aux cibles liées après la réception d’une telle valeur de retour, et ils se dissocient automatiquement de ces cibles.

Lorsqu’un bloc cible reporte le message pour une utilisation ultérieure, la OfferMessage méthode retourne Postponed. Un bloc cible qui reporte un message peut appeler ultérieurement la ISourceBlock<TOutput>.ReserveMessage méthode pour essayer de réserver le message proposé. À ce stade, le message est toujours disponible et peut être utilisé par le bloc cible, ou le message a été pris par une autre cible. Lorsque le bloc cible nécessite ultérieurement le message, il appelle la méthode ISourceBlock<TOutput>.ConsumeMessage, ou n’en a plus besoin, la méthode ReleaseReservation respectivement. La réservation de messages est généralement utilisée par les types de blocs de flux de données qui fonctionnent en mode non gourmand. Le mode non gourmand est expliqué plus loin dans ce document. Au lieu de réserver un message reporté, un bloc cible peut également utiliser la ISourceBlock<TOutput>.ConsumeMessage méthode pour tenter de consommer directement le message reporté.

Achèvement du bloc de flux de données

Les blocs de flux de données prennent également en charge le concept d’achèvement. Un bloc de flux de données qui se trouve dans l’état terminé n’effectue aucun travail supplémentaire. Chaque bloc de flux de données a un objet associé System.Threading.Tasks.Task , appelé tâche d’achèvement, qui représente l’état d’achèvement du bloc. Étant donné que vous pouvez attendre qu’un Task objet se termine en utilisant des tâches d’achèvement, vous pouvez aussi attendre qu’un ou plusieurs nœuds terminaux d’un réseau de flux de données se terminent. L’interface IDataflowBlock définit la Complete méthode, qui informe le bloc de flux de données d’une demande pour qu’elle se termine, et la Completion propriété, qui retourne la tâche d’achèvement pour le bloc de flux de données. Les deux ISourceBlock<TOutput> et ITargetBlock<TInput> héritent de l’interface IDataflowBlock .

Il existe deux façons de déterminer si un bloc de flux de données s’est terminé sans erreur, a rencontré une ou plusieurs erreurs ou a été annulé. La première façon consiste à appeler la méthode Task.Wait sur la tâche d’achèvement dans un bloc try-catch (Try-Catch en Visual Basic). L’exemple suivant crée un ActionBlock<TInput> objet qui lève ArgumentOutOfRangeException si sa valeur d’entrée est inférieure à zéro. L’exception AggregateException est levée quand cet exemple appelle Wait sur la tâche d’achèvement. L’élément ArgumentOutOfRangeException est accessible via la propriété InnerExceptions de l’objet AggregateException.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Cet exemple montre le cas dans lequel une exception n’est pas gérée dans le délégué d’un bloc de flux de données d’exécution. Nous vous recommandons de gérer les exceptions dans le corps des blocs. Toutefois, si vous ne parvenez pas à le faire, le bloc se comporte comme s’il a été annulé et ne traite pas les messages entrants.

Lorsqu’un bloc de flux de données est annulé explicitement, l’objet AggregateException contient OperationCanceledException dans la InnerExceptions propriété. Pour plus d’informations sur l’annulation du flux de données, consultez la section Activation de l’annulation .

La deuxième façon de déterminer l’état d’achèvement d’un bloc de flux de données consiste à utiliser une continuation de la tâche d’achèvement, ou à utiliser les fonctionnalités de langage asynchrone de C# et Visual Basic pour attendre de façon asynchrone la tâche d’achèvement. Le délégué que vous fournissez à la méthode Task.ContinueWith prend un objet Task qui représente la tâche antécédente. Dans le cas de la propriété Completion, le délégué de la continuation prend la tâche d'achèvement. L’exemple suivant ressemble à celui précédent, sauf qu’il utilise également la ContinueWith méthode pour créer une tâche de continuation qui imprime l’état de l’opération de flux de données globale.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Vous pouvez également utiliser des propriétés telles que IsCanceled dans le corps de la tâche de continuation pour déterminer des informations supplémentaires sur l’état d’achèvement d’un bloc de flux de données. Pour plus d’informations sur les tâches de continuation et leur relation avec la gestion des annulations et des erreurs, consultez Chaînage des tâches à l’aide des tâches de continuation, de l’annulation des tâches et de la gestion des exceptions.

Types de blocs de flux de données prédéfinis

La bibliothèque de flux de données TPL fournit plusieurs types de blocs de flux de données prédéfinis. Ces types sont divisés en trois catégories : blocs de mise en mémoire tampon, blocs d’exécution et blocs de regroupement. Les sections suivantes décrivent les types de blocs qui composent ces catégories.

Blocs de mise en mémoire tampon

Les blocs de mise en mémoire tampon contiennent les données à utiliser par les consommateurs de données. La bibliothèque de flux de données TPL fournit trois types de blocs de mise en mémoire tampon : System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>et System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

La BufferBlock<T> classe représente une structure de messagerie asynchrone à usage général. Cette classe stocke une file d’attente de type premier arrivé, premier servi (FIFO) de messages qui peuvent être écrits par plusieurs sources ou lus par plusieurs destinations. Lorsqu’une cible reçoit un message d’un BufferBlock<T> objet, ce message est supprimé de la file d’attente de messages. Par conséquent, bien qu’un BufferBlock<T> objet puisse avoir plusieurs cibles, une seule cible recevra chaque message. La BufferBlock<T> classe est utile lorsque vous souhaitez transmettre plusieurs messages à un autre composant et que ce composant doit recevoir chaque message.

L’exemple de base suivant publie plusieurs Int32 valeurs dans un BufferBlock<T> objet, puis lit ces valeurs à partir de cet objet.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Pour obtenir un exemple complet qui montre comment écrire et lire des messages à partir d’un BufferBlock<T> objet, consultez How to : Write Messages to and Read Messages from a Dataflow Block.

BroadcastBlock<T>

La BroadcastBlock<T> classe est utile lorsque vous devez transmettre plusieurs messages à un autre composant, mais ce composant a besoin uniquement de la valeur la plus récente. Cette classe est également utile lorsque vous souhaitez diffuser un message vers plusieurs composants.

L’exemple de base suivant publie une Double valeur dans un BroadcastBlock<T> objet, puis lit cette valeur à partir de cet objet plusieurs fois. Étant donné que les valeurs ne sont pas supprimées des BroadcastBlock<T> objets après leur lecture, la même valeur est disponible chaque fois.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Pour obtenir un exemple complet qui montre comment diffuser BroadcastBlock<T> un message sur plusieurs blocs cibles, consultez Comment : spécifier un planificateur de tâches dans un bloc de flux de données.

WriteOnceBlock<T>

La WriteOnceBlock<T> classe ressemble à la BroadcastBlock<T> classe, sauf qu’un WriteOnceBlock<T> objet ne peut être écrit qu’une seule fois. Vous pouvez considérer WriteOnceBlock<T> comme étant similaire au mot clé C# readonly (ReadOnly en Visual Basic), sauf qu'un objet WriteOnceBlock<T> devient immuable après avoir reçu une valeur plutôt qu'à la construction. Comme la BroadcastBlock<T> classe, lorsqu’une cible reçoit un message d’un WriteOnceBlock<T> objet, ce message n’est pas supprimé de cet objet. Par conséquent, plusieurs cibles reçoivent une copie du message. La WriteOnceBlock<T> classe est utile lorsque vous souhaitez propager uniquement le premier de plusieurs messages.

L’exemple de base suivant publie plusieurs String valeurs dans un WriteOnceBlock<T> objet, puis lit la valeur de cet objet. Étant donné qu’un WriteOnceBlock<T> objet peut être écrit à une seule fois, une fois qu’un WriteOnceBlock<T> objet reçoit un message, il ignore les messages suivants.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Pour obtenir un exemple complet qui montre comment utiliser WriteOnceBlock<T> pour recevoir la valeur de la première opération qui se termine, consultez Guide pratique pour dissocier des blocs de flux de données.

Blocs d'exécution

Les blocs d'exécution appellent un délégué fourni par l'utilisateur pour chaque élément de données reçues. La bibliothèque de flux de données TPL fournit trois types de blocs d’exécution : ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>et System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

La ActionBlock<TInput> classe est un bloc cible qui appelle un délégué lorsqu’il reçoit des données. Considérez un objet comme un ActionBlock<TInput> délégué qui s’exécute de façon asynchrone lorsque les données sont disponibles. Le délégué que vous fournissez à un objet ActionBlock<TInput> peut être de type Action<T> ou de type System.Func<TInput, Task>. Lorsque vous utilisez un ActionBlock<TInput> objet avec Action<T>, le traitement de chaque élément d’entrée est considéré comme terminé lorsque le délégué retourne. Lorsque vous utilisez un ActionBlock<TInput> objet avec System.Func<TInput, Task>, le traitement de chaque élément d’entrée est considéré comme terminé uniquement lorsque l’objet retourné Task est terminé. En utilisant ces deux mécanismes, vous pouvez utiliser ActionBlock<TInput> pour le traitement synchrone et asynchrone de chaque élément d’entrée.

L’exemple de base suivant publie plusieurs Int32 valeurs dans un ActionBlock<TInput> objet. L’objet ActionBlock<TInput> imprime ces valeurs dans la console. Cet exemple montre comment définir le bloc à l’état terminé et attendre que toutes les tâches de flux de données se terminent.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Pour obtenir des exemples complets qui montrent comment utiliser des délégués avec la classe ActionBlock<TInput>, consultez Comment : effectuer une action lorsqu'un bloc de flux de données reçoit des données.

TransformBlock<TInput, TOutput>

La TransformBlock<TInput,TOutput> classe ressemble à la ActionBlock<TInput> classe, sauf qu’elle agit à la fois comme source et comme cible. Le délégué que vous passez à un objet TransformBlock<TInput,TOutput> renvoie une valeur de type TOutput. Le délégué que vous fournissez à un objet TransformBlock<TInput,TOutput> peut être de type System.Func<TInput, TOutput> ou de type System.Func<TInput, Task<TOutput>>. Lorsque vous utilisez un TransformBlock<TInput,TOutput> objet avec System.Func<TInput, TOutput>, le traitement de chaque élément d’entrée est considéré comme terminé lorsque le délégué retourne. Lorsque vous utilisez un TransformBlock<TInput,TOutput> objet utilisé avec System.Func<TInput, Task<TOutput>>, le traitement de chaque élément d’entrée est considéré comme terminé uniquement lorsque l’objet retourné Task<TResult> est terminé. Comme avec ActionBlock<TInput>, en utilisant ces deux mécanismes, vous pouvez utiliser TransformBlock<TInput,TOutput> pour le traitement synchrone et asynchrone de chaque élément d’entrée.

L’exemple de base suivant crée un TransformBlock<TInput,TOutput> objet qui calcule la racine carrée de son entrée. L’objet TransformBlock<TInput,TOutput> prend Int32 des valeurs comme entrée et produit des Double valeurs en tant que sortie.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Pour obtenir des exemples complets qui s’utilisent TransformBlock<TInput,TOutput> dans un réseau de blocs de flux de données qui effectuent le traitement d’images dans une application Windows Forms, consultez Procédure pas à pas : Utilisation du flux de données dans une application Windows Forms.

TransformManyBlock<TInput, TOutput>

La TransformManyBlock<TInput,TOutput> classe ressemble à la TransformBlock<TInput,TOutput> classe, sauf qu’elle TransformManyBlock<TInput,TOutput> produit zéro ou plusieurs valeurs de sortie pour chaque valeur d’entrée, au lieu d’une seule valeur de sortie pour chaque valeur d’entrée. Le délégué que vous fournissez à un objet TransformManyBlock<TInput,TOutput> peut être de type System.Func<TInput, IEnumerable<TOutput>> ou de type System.Func<TInput, Task<IEnumerable<TOutput>>>. Lorsque vous utilisez un TransformManyBlock<TInput,TOutput> objet avec System.Func<TInput, IEnumerable<TOutput>>, le traitement de chaque élément d’entrée est considéré comme terminé lorsque le délégué retourne. Lorsque vous utilisez un TransformManyBlock<TInput,TOutput> objet avec System.Func<TInput, Task<IEnumerable<TOutput>>>, le traitement de chaque élément d’entrée est considéré comme terminé uniquement lorsque l’objet retourné System.Threading.Tasks.Task<IEnumerable<TOutput>> est terminé.

L’exemple de base suivant crée un TransformManyBlock<TInput,TOutput> objet qui fractionne les chaînes en leurs séquences de caractères individuelles. L’objet TransformManyBlock<TInput,TOutput> prend String des valeurs comme entrée et produit des Char valeurs en tant que sortie.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Pour obtenir des exemples complets qui utilisent TransformManyBlock<TInput,TOutput> pour produire plusieurs sorties indépendantes pour chaque entrée dans un pipeline de flux de données, consultez procédure pas à pas : création d’un pipeline de flux de données.

Degré de parallélisme

Chaque ActionBlock<TInput>, TransformBlock<TInput,TOutput>et TransformManyBlock<TInput,TOutput> objet met en mémoire tampon les messages d’entrée jusqu’à ce que le bloc soit prêt à les traiter. Par défaut, ces classes traitent les messages dans l’ordre dans lequel ils sont reçus, un message à la fois. Vous pouvez également spécifier le degré de parallélisme pour permettre à ActionBlock<TInput>, TransformBlock<TInput,TOutput> et TransformManyBlock<TInput,TOutput> de traiter simultanément plusieurs messages. Pour plus d’informations sur l’exécution simultanée, consultez la section Spécification du degré de parallélisme plus loin dans ce document. Pour obtenir un exemple qui définit le degré de parallélisme pour permettre à un bloc de flux de données d’exécution de traiter plusieurs messages à la fois, consultez Comment : spécifier le degré de parallélisme dans un bloc de flux de données.

Résumé des types délégués

Le tableau suivant récapitule les types délégués que vous pouvez fournir aux objets ActionBlock<TInput>, TransformBlock<TInput,TOutput> et TransformManyBlock<TInput,TOutput>. Cette table spécifie également si le type de délégué fonctionne de manière synchrone ou asynchrone.

Catégorie Type délégué synchrone Type délégué asynchrone
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

Vous pouvez également utiliser des expressions lambda lorsque vous travaillez avec des types de blocs d’exécution. Pour obtenir un exemple montrant comment utiliser une expression lambda avec un bloc d’exécution, consultez How to : Perform Action When a Dataflow Block Receives Data.

Regroupement de blocs

Les blocs de regroupement combinent des données provenant d'une ou plusieurs sources et sous diverses contraintes. La bibliothèque de flux de données TPL fournit trois types de blocs de jointure : BatchBlock<T>, JoinBlock<T1,T2>et BatchedJoinBlock<T1,T2>.

BatchBlock<T>

La BatchBlock<T> classe combine des jeux de données d’entrée, appelés lots, dans des tableaux de données de sortie. Vous spécifiez la taille de chaque lot lorsque vous créez un BatchBlock<T> objet. Lorsque l’objet BatchBlock<T> reçoit le nombre spécifié d’éléments d’entrée, il propage de façon asynchrone un tableau qui contient ces éléments. Si un BatchBlock<T> objet est défini sur l’état terminé mais ne contient pas suffisamment d’éléments pour former un lot, il propage un tableau final qui contient les éléments d’entrée restants.

La BatchBlock<T> classe fonctionne en mode gourmand ou non gourmand . En mode gourmand, qui est la valeur par défaut, un BatchBlock<T> objet accepte chaque message qu’il est proposé et propage un tableau après avoir reçu le nombre d’éléments spécifié. En mode non gourmand, un BatchBlock<T> objet reporte tous les messages entrants jusqu’à ce que suffisamment de sources aient proposé des messages au bloc pour former un lot. Le mode gourmand fonctionne généralement mieux que le mode non gourmand, car il nécessite moins de surcharge de traitement. Toutefois, vous pouvez utiliser le mode non gourmand lorsque vous devez coordonner la consommation à partir de plusieurs sources de manière atomique. Spécifiez le mode non gourmand en définissant Greedy sur False dans le paramètre dataflowBlockOptions du constructeur BatchBlock<T>.

L’exemple de base suivant publie plusieurs Int32 valeurs dans un BatchBlock<T> objet qui contient dix éléments dans un lot. Pour garantir que toutes les valeurs se propagent hors du BatchBlock<T>, cet exemple appelle la Complete méthode. La méthode Complete définit l'objet BatchBlock<T> à l'état terminé, et par conséquent, l'objet BatchBlock<T> propage tous les éléments restants comme un lot final.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");

Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Pour obtenir un exemple complet qui permet BatchBlock<T> d’améliorer l’efficacité des opérations d’insertion de base de données, consultez Procédure pas à pas : Utilisation de BatchBlock et BatchedJoinBlock pour améliorer l’efficacité.

JoinBlock<T1, T2, ...>

Les classes JoinBlock<T1,T2> et JoinBlock<T1,T2,T3> rassemblent des éléments d'entrée et propagent des objets System.Tuple<T1,T2> ou System.Tuple<T1,T2,T3> qui contiennent ces éléments. Les JoinBlock<T1,T2> classes et JoinBlock<T1,T2,T3> ne héritent pas de ITargetBlock<TInput>. Au lieu de cela, ils fournissent des propriétés, Target1, Target2et Target3, qui implémentent ITargetBlock<TInput>.

Comme BatchBlock<T>, JoinBlock<T1,T2> et JoinBlock<T1,T2,T3> fonctionnent en mode gourmand ou non gourmand. En mode gourmand, qui est la valeur par défaut, un objet JoinBlock<T1,T2> ou JoinBlock<T1,T2,T3> accepte chaque message qui lui est proposé et propage un tuple après que chacune de ses cibles ait reçu au moins un message. En mode non gourmand, un objet JoinBlock<T1,T2> ou JoinBlock<T1,T2,T3> reporte tous les messages entrants jusqu’à ce que toutes les cibles aient reçu les données requises pour créer un tuple. À ce stade, le bloc s'engage dans un protocole de validation en deux phases pour récupérer atomiquement tous les éléments requis à partir des sources. Ce report permet à une autre entité de consommer les données en attendant, afin de permettre au système global de progresser.

L’exemple de base suivant illustre un cas dans lequel un JoinBlock<T1,T2,T3> objet nécessite plusieurs données pour calculer une valeur. Cet exemple crée un JoinBlock<T1,T2,T3> objet qui nécessite deux Int32 valeurs et une Char valeur pour effectuer une opération arithmétique.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
         break;
      case '-':
         Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
         break;
      default:
         Console.WriteLine($"Unknown operator '{data.Item3}'.");
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Pour obtenir un exemple complet qui utilise des JoinBlock<T1,T2> objets en mode non gourmand pour partager de manière coopérative une ressource, consultez Guide pratique pour utiliser JoinBlock pour lire des données à partir de plusieurs sources.

BatchedJoinBlock<T1, T2, ...>

Les classes BatchedJoinBlock<T1,T2> et BatchedJoinBlock<T1,T2,T3> recueillent des lots d’éléments d’entrée et propagent des objets System.Tuple(IList(T1), IList(T2)) ou System.Tuple(IList(T1), IList(T2), IList(T3)) qui contiennent ces éléments. Considérons comme BatchedJoinBlock<T1,T2> une combinaison de BatchBlock<T> et JoinBlock<T1,T2>. Spécifiez la taille de chaque lot lorsque vous créez un BatchedJoinBlock<T1,T2> objet. BatchedJoinBlock<T1,T2> fournit également les propriétés Target1 et Target2, qui implémentent ITargetBlock<TInput>. Lorsque le nombre spécifié d’éléments d’entrée est reçu de toutes les cibles, l’objet BatchedJoinBlock<T1,T2> propage de manière asynchrone un System.Tuple(IList(T1), IList(T2)) objet qui contient ces éléments.

L'exemple de base suivant crée un objet BatchedJoinBlock<T1,T2> qui contient des résultats, des valeurs Int32 et des erreurs qui sont des objets Exception. Cet exemple effectue plusieurs opérations et écrit des résultats dans la Target1 propriété, ainsi que des erreurs dans la Target2 propriété, de l’objet BatchedJoinBlock<T1,T2> . Étant donné que le nombre d’opérations réussies et ayant échoué est inconnu à l’avance, les IList<T> objets permettent à chaque cible de recevoir zéro ou plusieurs valeurs.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Pour obtenir un exemple complet qui permet BatchedJoinBlock<T1,T2> de capturer à la fois les résultats et toutes les exceptions qui se produisent pendant que le programme lit à partir d’une base de données, consultez Procédure pas à pas : Utilisation de BatchBlock et BatchedJoinBlock pour améliorer l’efficacité.

Configuration du comportement des blocs de flux de données

Vous pouvez activer des options supplémentaires en fournissant un System.Threading.Tasks.Dataflow.DataflowBlockOptions objet au constructeur des types de blocs de flux de données. Ces options contrôlent le comportement du planificateur qui gère la tâche sous-jacente et le degré de parallélisme. Il DataflowBlockOptions a également des types dérivés qui spécifient un comportement spécifique à certains types de blocs de flux de données. Le tableau suivant récapitule le type d’options associé à chaque type de bloc de flux de données.

Type de bloc de flux de données Type DataflowBlockOptions
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

Les sections suivantes fournissent des informations supplémentaires sur les types importants d'options de blocs de flux de données disponibles à travers les classes System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions et System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.

Spécification du planificateur de tâches

Chaque bloc de flux de données prédéfini utilise le mécanisme de planification des tâches TPL pour effectuer des activités telles que la propagation de données vers une cible, la réception de données à partir d’une source et l’exécution de délégués définis par l’utilisateur lorsque les données sont disponibles. TaskScheduler est une classe abstraite qui représente un planificateur de tâches qui met en file d’attente les tâches sur des threads. Le planificateur de tâches par défaut, Default, utilise la classe ThreadPool pour mettre en file d’attente et exécuter des opérations. Vous pouvez remplacer le planificateur de tâches par défaut en définissant la TaskScheduler propriété lorsque vous construisez un objet de bloc de flux de données.

Lorsque le même planificateur de tâches gère plusieurs blocs de flux de données, il peut appliquer des stratégies entre eux. Par exemple, si plusieurs blocs de flux de données sont configurés pour cibler le planificateur exclusif du même ConcurrentExclusiveSchedulerPair objet, tout le travail qui s’exécute sur ces blocs est sérialisé. De même, si ces blocs sont configurés pour cibler le planificateur simultané du même ConcurrentExclusiveSchedulerPair objet et que ce planificateur est configuré pour avoir un niveau d’accès concurrentiel maximal, tout le travail de ces blocs est limité à ce nombre d’opérations simultanées. Vous trouverez un exemple utilisant la classe ConcurrentExclusiveSchedulerPair pour permettre que des opérations de lecture s’effectuent en parallèle, tout en imposant que chaque opération d’écriture soit réalisée de manière exclusive sur la page Guide pratique : spécifier un Planificateur de tâches dans un bloc de flux de données. Pour plus d’informations sur les planificateurs de tâches dans le TPL, consultez la rubrique de la classe TaskScheduler.

Spécification du degré de parallélisme

Par défaut, les trois types de blocs d’exécution que la bibliothèque de flux de données TPL fournit, ActionBlock<TInput>TransformBlock<TInput,TOutput>et TransformManyBlock<TInput,TOutput>traitent un message à la fois. Ces types de blocs de flux de données traitent également les messages dans l’ordre dans lequel ils sont reçus. Pour permettre à ces blocs de flux de données de traiter les messages simultanément, définissez la ExecutionDataflowBlockOptions.MaxDegreeOfParallelism propriété lorsque vous construisez l’objet de bloc de flux de données.

La valeur par défaut est MaxDegreeOfParallelism 1, ce qui garantit que le bloc de flux de données traite un message à la fois. La définition de cette propriété sur une valeur supérieure à 1 permet au bloc de flux de données de traiter plusieurs messages simultanément. Définir cette propriété sur DataflowBlockOptions.Unbounded permet au planificateur de tâches sous-jacent de gérer le degré maximal de concurrence.

Importante

Lorsque vous spécifiez un degré maximal de parallélisme supérieur à 1, plusieurs messages sont traités simultanément, et par conséquent, les messages peuvent ne pas être traités dans l’ordre dans lequel ils sont reçus. L’ordre dans lequel les messages sont générés à partir du bloc est toutefois le même dans lequel ils sont reçus.

Étant donné que la MaxDegreeOfParallelism propriété représente le degré maximal de parallélisme, le bloc de flux de données peut s’exécuter avec un degré moindre de parallélisme que vous spécifiez. Le bloc de flux de données peut utiliser un moindre degré de parallélisme pour répondre à ses exigences fonctionnelles ou parce qu’il n’existe pas de ressources système disponibles. Un bloc de flux de données ne choisit jamais plus de parallélisme que vous spécifiez.

La valeur de la MaxDegreeOfParallelism propriété est exclusive à chaque objet de bloc de flux de données. Par exemple, si quatre objets de bloc de flux de données spécifient chacun 1 pour le degré maximal de parallélisme, les quatre objets de bloc de flux de données peuvent potentiellement s’exécuter en parallèle.

Pour obtenir un exemple qui définit le degré maximal de parallélisme pour permettre aux opérations longues de se produire en parallèle, consultez Comment : spécifier le degré de parallélisme dans un bloc de flux de données.

Spécification du nombre de messages par tâche

Les types de blocs de flux de données prédéfinis utilisent des tâches pour traiter plusieurs éléments d’entrée. Cela permet de réduire le nombre d’objets de tâche requis pour traiter les données, ce qui permet aux applications de s’exécuter plus efficacement. Toutefois, lorsque les tâches d’un ensemble de blocs de flux de données traitent des données, les tâches d’autres blocs de flux de données peuvent avoir besoin d’attendre le temps de traitement en file d’attente des messages. Pour améliorer l’équité entre les tâches de flux de données, définissez la MaxMessagesPerTask propriété. Quand MaxMessagesPerTask est défini sur DataflowBlockOptions.Unbounded, qui est la valeur par défaut, la tâche utilisée par un bloc de flux de données traite autant de messages que possible. Lorsqu’il MaxMessagesPerTask est défini sur une valeur autre que Unbounded, le bloc de flux de données traite au maximum ce nombre de messages par Task objet. Bien que la définition de la propriété puisse augmenter l’équité MaxMessagesPerTask entre les tâches, le système peut créer plus de tâches que nécessaire, ce qui peut réduire les performances.

Activation de l’annulation

Le TPL fournit un mécanisme qui permet aux tâches de coordonner l’annulation de manière coopérative. Pour permettre aux blocs de flux de données de participer à ce mécanisme d’annulation, définissez la CancellationToken propriété. Lorsque cet CancellationToken objet est défini sur l’état annulé, tous les blocs de flux de données qui surveillent l’exécution de ce jeton terminent l’exécution de leur élément actuel, mais ne commencent pas à traiter les éléments suivants. Ces blocs de flux de données effacent également les messages mis en mémoire tampon, libèrent les connexions à n’importe quel bloc source et cible, et passent à l’état annulé. En passant à l’état annulé, la propriété Completion a sa propriété Status définie sur Canceled, à moins qu'une exception ne se soit produite pendant le traitement. Dans ce cas, Status est défini sur Faulted.

Pour obtenir un exemple montrant comment utiliser l’annulation dans une application Windows Forms, consultez Comment : annuler un bloc de flux de données. Pour plus d’informations sur l’annulation dans le TPL, consultez Annulation de tâche.

Spécification d’un comportement gourmand par rapport à un comportement non gourmand

Plusieurs types de blocs de flux de données groupés peuvent fonctionner en mode gourmand ou non gourmand . Par défaut, les types de blocs de flux de données prédéfinis fonctionnent en mode gourmand.

Pour les types de blocs de jointure tels que JoinBlock<T1,T2>le mode gourmand signifie que le bloc accepte immédiatement les données même si les données correspondantes avec lesquelles la jointure n’est pas encore disponible. Le mode non gourmand signifie que le bloc reporte tous les messages entrants jusqu’à ce qu’un message soit disponible sur chacune de ses cibles pour terminer la jointure. Si l’un des messages reportés n’est plus disponible, le bloc de jointure libère tous les messages reportés et redémarre le processus. Pour la classe BatchBlock<T>, le comportement gourmand et non gourmand est similaire, sauf qu’en mode non gourmand, un objet BatchBlock<T> reporte tous les messages entrants jusqu’à ce qu’il y ait suffisamment de messages provenant de sources distinctes pour terminer un lot.

Pour spécifier le mode non gourmand pour un bloc de flux de données, définissez Greedy sur False. Pour obtenir un exemple montrant comment utiliser le mode non gourmand pour permettre à plusieurs blocs de jointure de partager une source de données plus efficacement, consultez Comment : utiliser JoinBlock pour lire des données à partir de plusieurs sources.

Blocs de flux de données personnalisés

Bien que la bibliothèque de flux de données TPL fournit de nombreux types de blocs prédéfinis, vous pouvez créer des types de blocs supplémentaires qui effectuent un comportement personnalisé. Implémentez directement les interfaces ISourceBlock<TOutput> ou utilisez la méthode ITargetBlock<TInput> pour créer un bloc complexe qui encapsule le comportement des types de blocs existants avec Encapsulate. Pour obtenir des exemples montrant comment implémenter des fonctionnalités de bloc de flux de données personnalisées, consultez Procédure pas à pas : Création d’un type de bloc de flux de données personnalisé.

Titre Descriptif
Guide pratique pour écrire et lire des messages à partir d’un bloc de flux de données Montre comment écrire et lire des messages à partir d’un BufferBlock<T> objet.
Guide pratique pour implémenter un modèle de flux de données Producer-Consumer Décrit comment utiliser le modèle de flux de données pour implémenter un modèle producteur-consommateur, où le producteur envoie des messages à un bloc de flux de données et le consommateur lit les messages de ce bloc.
Procédure : effectuer une action lorsqu’un bloc de flux de données reçoit des données Décrit comment fournir des délégués aux types de blocs de flux de données d’exécution, , ActionBlock<TInput>TransformBlock<TInput,TOutput>et TransformManyBlock<TInput,TOutput>.
Procédure pas à pas : création d’un pipeline de flux de données Décrit comment créer un pipeline de flux de données qui télécharge du texte à partir du web et effectue des opérations sur ce texte.
Guide pratique pour dissocier des blocs de flux de données Montre comment utiliser la LinkTo méthode pour dissocier un bloc cible de sa source après que la source offre un message à la cible.
Procédure pas à pas : utilisation du flux de données dans une application Windows Forms Montre comment créer un réseau de blocs de flux de données qui effectuent le traitement d’images dans une application Windows Forms.
Procédure : annuler un bloc de flux de données Montre comment utiliser l’annulation dans une application Windows Forms.
Guide pratique pour utiliser JoinBlock pour lire des données à partir de plusieurs sources Explique comment utiliser la JoinBlock<T1,T2> classe pour effectuer une opération lorsque les données sont disponibles à partir de plusieurs sources et comment utiliser le mode non gourmand pour permettre à plusieurs blocs de jointure de partager une source de données plus efficacement.
Guide pratique pour spécifier le degré de parallélisme dans un bloc de flux de données Décrit comment définir la MaxDegreeOfParallelism propriété pour permettre à un bloc de flux de données d’exécution de traiter plusieurs messages à la fois.
Guide pratique pour spécifier un planificateur de tâches dans un bloc de flux de données Montre comment associer un planificateur de tâches spécifique lorsque vous utilisez le flux de données dans votre application.
Procédure pas à pas : utilisation de BatchBlock et BatchedJoinBlock pour améliorer l’efficacité Décrit comment utiliser la BatchBlock<T> classe pour améliorer l’efficacité des opérations d’insertion de base de données et comment utiliser la BatchedJoinBlock<T1,T2> classe pour capturer les résultats et toutes les exceptions qui se produisent pendant que le programme lit à partir d’une base de données.
Procédure pas à pas : création d’un type de bloc de flux de données personnalisé Montre deux façons de créer un type de bloc de flux de données qui implémente un comportement personnalisé.
bibliothèque parallèle de tâches (TPL) Présente le TPL, une bibliothèque qui simplifie la programmation parallèle et simultanée dans les applications .NET Framework.