Share via


Bibliothèque System.Threading.Channels

L’espace de noms System.Threading.Channels fournit un ensemble de structures de données de synchronisation pour passer des données entre des producteurs et des consommateurs de façon asynchrone. La bibliothèque cible .NET Standard et fonctionne sur toutes les implémentations .NET.

Cette bibliothèque est disponible dans le package NuGet System.Threading.Channels. Toutefois, si vous utilisez .NET Core 3.0 ou ultérieur, le package fait partie du framework.

Modèle de programmation conceptuel producteur/consommateur

Les canaux sont une implémentation du modèle de programmation conceptuel producteur/consommateur. Dans ce modèle de programmation, les producteurs produisent des données de manière asynchrone, et les consommateurs consomment ces données de manière asynchrone. En d’autres termes, ce modèle passe les données d’une partie à l’autre. Imaginez les canaux comme un autre type de collection générique courant, par exemple, List<T>. La principale différence est que cette collection gère la synchronisation et fournit différents modèles de consommation avec des options de création de fabrique. Ces options contrôlent le comportement des canaux, par exemple, le nombre d’éléments qu’ils sont autorisés à stocker et ce qui se passe si cette limite est atteinte, ou permettent d’indiquer si le canal est accessible simultanément par plusieurs producteurs ou consommateurs.

Stratégies de délimitation

Selon la façon dont un Channel<T> est créé, son lecteur et son rédacteur se comportent différemment.

Pour créer un canal qui spécifie une capacité maximale, appelez Channel.CreateBounded. Pour créer un canal utilisé simultanément par un certain nombre de lecteurs et de rédacteurs, appelez Channel.CreateUnbounded. Chaque stratégie de délimitation expose différentes options définies par le créateur, BoundedChannelOptions ou UnboundedChannelOptions, respectivement.

Notes

Quelle que soit la stratégie de délimitation, un canal lève toujours une exception ChannelClosedException quand il est utilisé après sa fermeture.

Canaux illimités

Pour créer un canal illimité, appelez une des surcharges Channel.CreateUnbounded :

var channel = Channel.CreateUnbounded<T>();

Quand vous créez un canal illimité, par défaut, le canal peut être utilisé simultanément par n’importe quel nombre de lecteurs et de rédacteurs. Vous pouvez également spécifier un comportement autre que celui par défaut quand vous créez un canal illimité en fournissant une instance UnboundedChannelOptions. La capacité du canal est illimitée et toutes les écritures sont effectuées de manière synchrone. Pour obtenir plus d’exemples, consultez Modèles de création illimités.

Canaux limités

Pour créer un canal limité, appelez une des surcharges Channel.CreateBounded :

var channel = Channel.CreateBounded<T>(7);

Le code précédent crée un canal qui a une capacité maximale de 7 éléments. Quand vous créez un canal limité, le canal est limité à une capacité maximale. Quand la limite est atteinte, le comportement par défaut est que le canal bloque de façon asynchrone le producteur jusqu’à ce que l’espace soit disponible. Vous pouvez configurer ce comportement en spécifiant une option quand vous créez le canal. Vous pouvez créer des canaux limités avec n’importe quelle valeur de capacité supérieure à zéro. Pour obtenir d’autres exemples, consultez Modèles de création limités.

Comportement en mode complet

Quand vous utilisez un canal limité, vous pouvez spécifier le comportement auquel le canal adhère quand la limite configurée est atteinte. Le tableau suivant liste les comportements en mode complet pour chaque valeur BoundedChannelFullMode :

Valeur Comportement
BoundedChannelFullMode.Wait Il s'agit de la valeur par défaut. Les appels à WriteAsync attendent que de l’espace soit disponible pour terminer l’opération d’écriture. Les appels à TryWrite retournent false immédiatement.
BoundedChannelFullMode.DropNewest Supprime et ignore l’élément le plus récent dans le canal afin de libérer de l’espace pour l’élément en cours d’écriture.
BoundedChannelFullMode.DropOldest Supprime et ignore l’élément le plus ancien dans le canal afin de libérer de l’espace pour l’élément en cours d’écriture.
BoundedChannelFullMode.DropWrite Supprime l’élément en cours d’écriture.

Important

Chaque fois qu’un Channel<TWrite,TRead>.Writer produit plus rapidement que peut consommer un Channel<TWrite,TRead>.Reader, le rédacteur du canal subit une régulation.

API de producteur

La fonctionnalité de producteur est exposée sur Channel<TWrite,TRead>.Writer. Les API de producteur et le comportement attendu sont détaillés dans le tableau suivant :

API Comportement attendu
ChannelWriter<T>.Complete Marque le canal comme étant terminé, ce qui signifie qu’aucun autre élément n’y est écrit.
ChannelWriter<T>.TryComplete Tente de marquer le canal comme étant terminé, ce qui signifie qu’aucune autre donnée n’y est écrite.
ChannelWriter<T>.TryWrite Tente d'écrire l'élément spécifié sur le canal. Quand elle est utilisée avec un canal illimité, elle renvoie toujours true, sauf si le rédacteur du canal signale l’achèvement avec ChannelWriter<T>.Complete ou ChannelWriter<T>.TryComplete.
ChannelWriter<T>.WaitToWriteAsync Retourne un ValueTask<TResult> qui se termine quand de l’espace est disponible pour écrire un élément.
ChannelWriter<T>.WriteAsync Écrit de façon asynchrone un élément sur le canal.

API de contrôle serveur consommateur

La fonctionnalité de consommateur est exposée sur Channel<TWrite,TRead>.Reader. Les API de consommateur et le comportement attendu sont détaillés dans le tableau suivant :

API Comportement attendu
ChannelReader<T>.ReadAllAsync Crée un IAsyncEnumerable<T> qui permet la lecture de toutes les données du canal.
ChannelReader<T>.ReadAsync Lit de façon asynchrone un élément à partir du canal.
ChannelReader<T>.TryPeek Tente de voir un élément du canal.
ChannelReader<T>.TryRead Tente de lire un élément depuis le canal.
ChannelReader<T>.WaitToReadAsync Retourne un ValueTask<TResult> qui se termine quand des données sont disponibles en lecture.

Modes d’utilisation courants

Il existe plusieurs modèles d’utilisation des canaux. L’API est conçue pour être simple, cohérente et aussi flexible que possible. Toutes les méthodes asynchrones renvoient un ValueTask (ou ValueTask<bool>) qui représente une opération asynchrone légère pouvant éviter l’allocation si l’opération se termine de manière synchrone et même potentiellement asynchrone. Par ailleurs, l’API est conçue pour être composable, en ce sens que le créateur d’un canal fait des promesses sur son utilisation prévue. Quand un canal est créé avec certains paramètres, l’implémentation interne peut fonctionner plus efficacement si elle connaît ces promesses.

Modèles de création

Imaginez que vous créez une solution de producteur/consommateur pour un GPS (Global Position System). Vous voulez suivre les coordonnées d’un appareil au fil du temps. Un exemple d’objet de coordonnées peut ressembler à ceci :

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Modèles de création illimités

Un modèle d’utilisation courant consiste à créer un canal illimité par défaut :

var channel = Channel.CreateUnbounded<Coordinates>();

Mais imaginons plutôt que vous voulez créer un canal illimité avec plusieurs producteurs et consommateurs :

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

Dans ce cas, toutes les écritures sont synchrones, même WriteAsync. C’est parce qu’un canal illimité a toujours de la place disponible pour une écriture effective immédiatement. Toutefois, avec AllowSynchronousContinuations défini sur true, les écritures peuvent finir par effectuer un travail associé à un lecteur en exécutant leurs continuations. Cela n’affecte pas la synchronicité de l’opération.

Modèles de création limités

Avec des canaux limités, la configurabilité du canal doit être connue du consommateur pour garantir une consommation appropriée. Autrement dit, le consommateur doit savoir quel comportement le canal affiche quand la limite configurée est atteinte. Explorons certains modèles de création limités courants.

La façon la plus simple de créer un canal limité est de spécifier une capacité :

var channel = Channel.CreateBounded<Coordinates>(1);

Le code précédent crée un canal limité avec une capacité maximale de 1. D’autres options sont disponibles. Certaines sont les mêmes que pour un canal illimité, tandis que d’autres sont spécifiques aux canaux illimités :

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

Dans le code précédent, le canal est créé comme un canal limité avec 1 000 éléments, un seul rédacteur, mais de nombreux lecteurs. Son comportement en mode complet est défini sur DropWrite, ce qui signifie qu’il supprime l’élément en cours d’écriture si le canal est plein.

Pour observer les éléments supprimés lors de l’utilisation de canaux limités, inscrivez un rappel itemDropped :

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Chaque fois que le canal est plein et qu’un nouvel élément est ajouté, le rappel itemDropped est appelé. Dans cet exemple, le rappel fourni écrit l’élément dans la console, mais vous pouvez effectuer l’action de votre choix.

Modèles de producteur

Imaginez que le producteur dans ce scénario écrit de nouvelles coordonnées dans le canal. Le producteur peut le faire en appelant TryWrite :

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

Code du producteur précédent :

  • Accepte l’objet Channel<Coordinates>.Writer (ChannelWriter<Coordinates>) comme argument, ainsi que l’objet Coordinates initial.
  • Définit une boucle conditionnelle while qui tente de déplacer les coordonnées avec TryWrite.

Un autre producteur peut utiliser la méthode WriteAsync :

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Là encore, l’objet Channel<Coordinates>.Writer est utilisé dans une boucle while. Mais cette fois, la méthode WriteAsync est appelée. La méthode continue uniquement une fois que les coordonnées ont été écrites. Quand la boucle while se termine, un appel à Complete est effectué, ce qui indique qu’aucune donnée supplémentaire n’est écrite dans le canal.

Un autre modèle de producteur consiste à utiliser la méthode WaitToWriteAsync. Prenez le code suivant :

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

Dans le cadre de la boucle while conditionnelle, le résultat de l’appel à WaitToWriteAsync est utilisé pour déterminer s’il faut continuer la boucle.

Modèles de consommateur

Il existe plusieurs modèles de consommateur de canal courants. Quand un canal ne finit jamais, ce qui signifie qu’il produit des données indéfiniment, le consommateur peut utiliser une boucle while (true) et lire les données dès qu’elles sont disponibles :

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Notes

Ce code lève une exception si le canal est fermé.

Un autre consommateur peut éviter ce problème en utilisant une boucle while imbriquée, comme indiqué dans le code suivant :

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

Dans le code précédent, le consommateur attend de lire les données. Une fois les données disponibles, le consommateur tente de les lire. Ces boucles continuent d’évaluer jusqu’à ce que le producteur du canal signale qu’il n’a plus de données à lire. Cela dit, quand un producteur est connu pour avoir un nombre fini d’éléments à produire et qu’il signale l’achèvement, le consommateur peut utiliser la sémantique await foreach pour itérer sur les éléments :

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

Le code précédent utilise la méthode ReadAllAsync pour lire toutes les coordonnées du canal.

Voir aussi