Bibliothèque System.Threading.Channels

L’espace de noms System.Threading.Channels fournit un ensemble de structures de données de synchronisation pour passer des données entre des producteurs et des consommateurs de façon asynchrone. La bibliothèque cible .NET, .NET Standard et .NET Framework et fonctionne sur toutes les implémentations .NET.

Cette bibliothèque est disponible dans le 📦 package NuGet System.Threading.Channels . Toutefois, si vous utilisez .NET Core 3.0 ou version ultérieure, le package est inclus dans le cadre du framework partagé.

Modèle de programmation conceptuel producteur/consommateur

Les canaux sont une implémentation du modèle de programmation conceptuel producteur/consommateur. Dans ce modèle de programmation, les producteurs produisent des données de manière asynchrone, et les consommateurs consomment ces données de manière asynchrone. En d’autres termes, ce modèle transfère des données d’une partie à une autre via une file d’attente « FIFO » (premier entré, premier sorti). Considérez les canaux comme n’importe quel autre type de collection générique commun, tel qu’un List<T>. La principale différence est que cette collection gère la synchronisation et fournit différents modèles de consommation via des options de création par fabrique. Ces options contrôlent le comportement des canaux, tels que :

  • Nombre d’éléments qu’ils sont autorisés à stocker et ce qui se passe si cette limite est atteinte.
  • Indique si le canal est accessible simultanément par plusieurs producteurs ou plusieurs consommateurs.

Utilisation de base

L’exemple suivant illustre l’utilisation de base d’un canal, où un producteur écrit des éléments et un consommateur les lit :

static async Task BasicUsageAsync()
{
    Channel<int> channel = Channel.CreateUnbounded<int>();

    Task producer = ProduceAsync(channel.Writer);
    Task consumer = ConsumeAsync(channel.Reader);

    await Task.WhenAll(producer, consumer);

    static async Task ProduceAsync(ChannelWriter<int> writer)
    {
        for (int i = 0; i < 5; i++)
        {
            await writer.WriteAsync(i);
        }

        writer.Complete();
    }

    static async Task ConsumeAsync(ChannelReader<int> reader)
    {
        await foreach (int item in reader.ReadAllAsync())
        {
            Console.WriteLine($"Received: {item}");
        }
    }
}

Stratégies de bornage

Selon la façon dont un Channel<T> est créé, son lecteur et son rédacteur se comportent différemment.

Pour créer un canal qui spécifie une capacité maximale, appelez Channel.CreateBounded. Pour créer un canal utilisé simultanément par un certain nombre de lecteurs et de rédacteurs, appelez Channel.CreateUnbounded. Chaque stratégie de délimitation expose différentes options définies par le créateur, BoundedChannelOptions ou UnboundedChannelOptions, respectivement.

Note

Quelle que soit la stratégie de limitation, un canal génère toujours une exception ChannelClosedException lorsqu'il est utilisé après avoir été fermé.

Canaux illimités

Pour créer un canal sans limite, appelez une des surcharges Channel.CreateUnbounded :

var channel = Channel.CreateUnbounded<T>();

Quand vous créez un canal illimité, par défaut, le canal peut être utilisé simultanément par n’importe quel nombre de lecteurs et de rédacteurs. Vous pouvez également spécifier un comportement autre que celui par défaut quand vous créez un canal illimité en fournissant une instance UnboundedChannelOptions. La capacité du canal est illimitée et toutes les écritures sont effectuées de manière synchrone. Pour obtenir plus d’exemples, consultez Modèles de création illimités.

Canaux limités

Pour créer un canal limité, appelez une des surcharges Channel.CreateBounded :

var channel = Channel.CreateBounded<T>(7);

Le code précédent crée un canal qui a une capacité maximale de 7 éléments. Quand vous créez un canal limité, le canal est limité à une capacité maximale. Quand la limite est atteinte, le comportement par défaut est que le canal bloque de façon asynchrone le producteur jusqu’à ce que l’espace soit disponible. Vous pouvez configurer ce comportement en spécifiant une option quand vous créez le canal. Vous pouvez créer des canaux limités avec n’importe quelle valeur de capacité supérieure à zéro. Pour obtenir d’autres exemples, consultez Modèles de création limités.

Comportement en mode complet

Quand vous utilisez un canal limité, vous pouvez spécifier le comportement auquel le canal adhère quand la limite configurée est atteinte. Le tableau suivant liste les comportements en mode complet pour chaque valeur BoundedChannelFullMode :

Valeur Comportement
BoundedChannelFullMode.Wait Il s'agit de la valeur par défaut. Les appels à WriteAsync attendent que de l’espace soit disponible pour terminer l’opération d’écriture. Les appels à TryWrite retournent false immédiatement.
BoundedChannelFullMode.DropNewest Supprime et ignore l’élément le plus récent dans le canal afin de libérer de l’espace pour l’élément en cours d’écriture.
BoundedChannelFullMode.DropOldest Supprime et ignore l’élément le plus ancien dans le canal afin de libérer de l’espace pour l’élément en cours d’écriture.
BoundedChannelFullMode.DropWrite Supprime l’élément en cours d’écriture.

Importante

Chaque fois qu’un Channel<TWrite,TRead>.Writer produit plus rapidement qu'un Channel<TWrite,TRead>.Reader peut consommer, le rédacteur du canal subit une contre-pression.

API de producteur

La fonctionnalité de producteur est exposée sur Channel<TWrite,TRead>.Writer. Les API de producteur et le comportement attendu sont détaillés dans le tableau suivant :

API (Interface de Programmation d'Applications) Comportement attendu
ChannelWriter<T>.Complete Marque le canal comme étant terminé, ce qui signifie qu’aucun autre élément n’y est écrit.
ChannelWriter<T>.TryComplete Tente de marquer le canal comme terminé, indiquant qu'aucune donnée supplémentaire n'y sera écrite.
ChannelWriter<T>.TryWrite Tente d'écrire l'élément spécifié sur le canal. Quand elle est utilisée avec un canal illimité, elle renvoie toujours true, sauf si le rédacteur du canal signale l’achèvement avec ChannelWriter<T>.Complete ou ChannelWriter<T>.TryComplete.
ChannelWriter<T>.WaitToWriteAsync Retourne un ValueTask<TResult> qui se termine lorsque de l’espace est disponible pour écrire un élément.
ChannelWriter<T>.WriteAsync Écrit de façon asynchrone un élément sur le canal.

API pour les consommateurs

La fonctionnalité consommateur est exposée sur Channel<TWrite,TRead>.Reader. Les API de consommateur et le comportement attendu sont détaillés dans le tableau suivant :

API (Interface de Programmation d'Applications) Comportement attendu
ChannelReader<T>.ReadAllAsync Crée un IAsyncEnumerable<T> qui permet la lecture de toutes les données du canal.
ChannelReader<T>.ReadAsync Lit de façon asynchrone un élément depuis le canal.
ChannelReader<T>.TryPeek Tente de jeter un coup d'œil à un élément du canal.
ChannelReader<T>.TryRead Tente de lire un élément depuis le canal.
ChannelReader<T>.WaitToReadAsync Retourne un ValueTask<TResult> qui se termine quand des données sont disponibles pour être lues.

Modes d’utilisation courants

Il existe plusieurs modèles d’utilisation pour les canaux :

L’API est conçue pour être simple, cohérente et aussi flexible que possible. Toutes les méthodes asynchrones renvoient un ValueTask (ou ValueTask<bool>) qui représente une opération asynchrone légère pouvant éviter l’allocation si l’opération se termine de manière synchrone et même potentiellement asynchrone. Par ailleurs, l’API est conçue pour être composable, en ce sens que le créateur d’un canal fait des promesses sur son utilisation prévue. Quand un canal est créé avec certains paramètres, l’implémentation interne peut fonctionner plus efficacement si elle connaît ces promesses.

Modèles de création

Imaginez que vous créez une solution de producteur/consommateur pour un GPS (Global Position System). Vous voulez suivre les coordonnées d’un appareil au fil du temps. Un exemple d’objet de coordonnées peut ressembler à ceci :

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Modèles de création illimités

Un modèle d’utilisation courant consiste à créer un canal non lié par défaut :

var channel = Channel.CreateUnbounded<Coordinates>();

Mais au lieu de cela, imaginez que vous souhaitez créer un canal sans limite avec plusieurs producteurs et consommateurs. Définissez SingleWriter = false et SingleReader = false dans les options de canal :

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

Dans ce cas, toutes les écritures sont synchrones, même WriteAsync. Ce comportement se produit parce qu’un canal non borné dispose toujours de place disponible pour une écriture immédiate. Toutefois, en définissant AllowSynchronousContinuations à true, les opérations d'écriture peuvent finir par effectuer des tâches associées à un lecteur en exécutant des continuations. Ce paramètre n’affecte pas la synchronité de l’opération.

Modèles de création limités

Avec les canaux délimités , la configurabilité du canal doit être connue du consommateur pour garantir une consommation appropriée. Autrement dit, le consommateur doit savoir quel comportement le canal affiche quand la limite configurée est atteinte. Les exemples suivants montrent quelques-uns des modèles de création délimités courants.

La façon la plus simple de créer un canal délimité consiste à spécifier une capacité. Le code suivant crée un canal délimité avec une capacité maximale de 1.

var channel = Channel.CreateBounded<Coordinates>(1);

D’autres options sont disponibles. Certaines options sont identiques à un canal non lié, tandis que d’autres sont spécifiques aux canaux délimités. Dans le code suivant, le canal est créé en tant que canal borné à 1 000 éléments, avec un seul écrivain, mais de nombreux lecteurs. Son comportement en mode complet est défini sur DropWrite, ce qui signifie qu’il supprime l’élément en cours d’écriture si le canal est plein.

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

Pour observer les éléments supprimés lors de l’utilisation de canaux limités, inscrivez un rappel itemDropped :

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Chaque fois que le canal est plein et qu’un nouvel élément est ajouté, le rappel itemDropped est appelé. Dans cet exemple, l'appel de retour fourni écrit l’élément dans la console, mais vous pouvez effectuer n'importe quelle autre action de votre choix.

Modèles de producteur

Imaginez que le producteur dans ce scénario écrit de nouvelles coordonnées vers le canal. Le producteur peut le faire en appelant TryWrite :

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

Code du producteur précédent :

  • Accepte l’objet Channel<Coordinates>.Writer (ChannelWriter<Coordinates>) comme argument, ainsi que l’objet Coordinates initial.
  • Définit une boucle conditionnelle while qui tente de déplacer les coordonnées avec TryWrite.

Un autre producteur peut utiliser la méthode WriteAsync :

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Là encore, l’objet Channel<Coordinates>.Writer est utilisé dans une boucle while. Mais cette fois, la méthode WriteAsync est appelée. La méthode se poursuit uniquement après l’écriture des coordonnées. Quand la boucle while se termine, un appel à Complete est effectué, ce qui indique qu’aucune donnée supplémentaire n’est écrite dans le canal.

Un autre modèle de producteur consiste à utiliser la méthode WaitToWriteAsync. Prenez le code suivant :

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

Dans le cadre de la boucle while conditionnelle, le résultat de l’appel à WaitToWriteAsync est utilisé pour déterminer s’il faut continuer la boucle.

Modèles de consommateur

Il existe plusieurs comportements de consommateurs courants dans les canaux. Quand un canal ne finit jamais, ce qui signifie qu’il produit des données indéfiniment, le consommateur peut utiliser une boucle while (true) et lire les données dès qu’elles sont disponibles :

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Note

Ce code lève une exception si le canal est fermé.

Un autre consommateur peut éviter ce problème en utilisant une boucle while imbriquée, comme indiqué dans le code suivant :

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

Dans le code précédent, le consommateur attend de lire les données. Une fois les données disponibles, le consommateur tente de les lire. Ces boucles continuent d’évaluer jusqu’à ce que le producteur du canal signale qu’il n’a plus de données à lire. Cela dit, quand un producteur est connu pour avoir un nombre fini d’éléments à produire et qu’il signale l’achèvement, le consommateur peut utiliser la sémantique await foreach pour itérer sur les éléments :

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

Le code précédent utilise la méthode ReadAllAsync pour lire toutes les coordonnées du canal.

Plusieurs producteurs et consommateurs

Les canaux prennent en charge plusieurs producteurs et consommateurs simultanés. Pour l’activer, créez un canal avec SingleWriter = false et SingleReader = false dans les options de canal. Vous écrivez ensuite en fan-out sur plusieurs tâches de producteurs et en fan-in pour la lecture sur plusieurs tâches de consommateurs.

static async Task UseMultipleProducersAndConsumersAsync()
{
    Channel<Coordinates> channel = Channel.CreateUnbounded<Coordinates>(
        new UnboundedChannelOptions
        {
            SingleWriter = false,
            SingleReader = false
        });

    // Start three concurrent producer tasks.
    Task[] producerTasks = Enumerable.Range(0, 3)
        .Select(id => ProduceAsync(id, channel))
        .ToArray();

    // Start two concurrent consumer tasks.
    Task[] consumerTasks = Enumerable.Range(0, 2)
        .Select(_ => ConsumeAsync(channel))
        .ToArray();

    // Wait for all producers to finish, then mark the channel as complete.
    await Task.WhenAll(producerTasks);
    channel.Writer.Complete();

    // Wait for all consumers to finish.
    await Task.WhenAll(consumerTasks);

    static async Task ProduceAsync(int id, Channel<Coordinates> channel)
    {
        Coordinates coordinates = new(
            DeviceId: Guid.NewGuid(),
            Latitude: -90 + (id * 30),
            Longitude: -180 + (id * 60));

        while (coordinates is { Latitude: < 90, Longitude: < 180 })
        {
            coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + 0.5,
                Longitude = coordinates.Longitude + 1
            };
                
            await channel.Writer.WriteAsync(coordinates);
        }
    }

    static async Task ConsumeAsync(Channel<Coordinates> channel)
    {
        await foreach (Coordinates coordinates in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine(coordinates);
        }
    }
}

Code précédent :

  • Crée un canal non borné qui prend explicitement en charge plusieurs auteurs et lecteurs concurrents.
  • Démarre trois tâches de producteur simultanées, chacune écrivant une série de coordonnées avec un identificateur d’appareil unique.
  • Démarre deux tâches de consommateur simultanées, chacune utilisant ReadAllAsync pour lire à partir du même canal.
  • Attend que tous les producteurs se terminent, puis appelle Complete pour signaler qu’aucune donnée supplémentaire n’est écrite sur le canal.
  • Attend que tous les consommateurs terminent de vider les données restantes du canal.

Conseil / Astuce

Avec plusieurs producteurs, appelez channel.Writer.Complete() seulement une fois que tous les producteurs terminent l’écriture. Cela signale qu’aucune autre donnée n’est écrite, ce qui permet ReadAllAsync() de se terminer après avoir consommé tous les éléments restants.

Voir aussi