Compartir a través de


Biblioteca System.Threading.Channels

El espacio de nombres System.Threading.Channels proporciona un conjunto de estructuras de datos de sincronización para pasar datos entre productores y consumidores de forma asincrónica. La biblioteca tiene como destino .NET Standard y funciona en todas las implementaciones de .NET.

Esta biblioteca está disponible en el paquete NuGet System.Threading.Channels. Pero si usa .NET Core 3.0 o una versión posterior, el paquete se incluye como parte del marco.

Modelo de programación conceptual de productor/consumidor

Los canales son una implementación del modelo de programación conceptual de productor/consumidor. En este modelo de programación, los productores generan datos de forma asincrónica y los consumidores consumen esos datos de forma asincrónica. En otras palabras, este modelo pasa datos de una entidad a otra a través de una cola de "primero en entrar, primero en salir" (FIFO). Intente pensar en los canales como lo haría con cualquier otro tipo de colección genérica común, como un elemento List<T>. La principal diferencia es que esta colección administra la sincronización y proporciona varios modelos de consumo mediante las opciones de creación de fábrica. Estas opciones controlan el comportamiento de los canales, como el número de elementos que pueden almacenar y qué ocurre si se alcanza ese límite, o bien si varios productores o varios consumidores acceden al canal de manera simultánea.

Estrategias de límite

En función de cómo se crea una instancia de Channel<T>, su lector y su escritor se comportan de forma diferente.

Para crear un canal que especifique una capacidad máxima, llame a Channel.CreateBounded. Para crear un canal que se use en cualquier número de lectores y escritores de manera simultánea, llame a Channel.CreateUnbounded. Cada estrategia de límite expone varias opciones definidas por el creador, ya sea BoundedChannelOptions o UnboundedChannelOptions respectivamente.

Nota

Independientemente de la estrategia de límite, un canal siempre generará una clase ChannelClosedException cuando se use después de cerrarse.

Canales ilimitados

Para crear un canal ilimitado, llame a una de las sobrecargas de Channel.CreateUnbounded:

var channel = Channel.CreateUnbounded<T>();

Cuando se crea un canal ilimitado, de manera predeterminada, el canal lo puede utilizar cualquier número de lectores y escritores simultáneamente. Como alternativa, puede especificar un comportamiento no predeterminado al crear un canal ilimitado si proporciona una instancia de UnboundedChannelOptions. La capacidad del canal es ilimitada y todas las escrituras se realizan de forma sincrónica. Para obtener más ejemplos, vea Patrones de creación ilimitados.

Canales limitados

Para crear un canal limitado, llame a una de las sobrecargas de Channel.CreateBounded:

var channel = Channel.CreateBounded<T>(7);

El código anterior crea un canal que tiene una capacidad máxima de 7 elementos. Al crear un canal limitado, una capacidad máxima limita este canal. Cuando se alcanza el límite, el comportamiento predeterminado es que el canal bloquee de forma asincrónica el productor hasta que haya espacio disponible. Puede configurar este comportamiento especificando una opción al crear el canal. Los canales limitados se pueden crear con cualquier valor de capacidad mayor que cero. Para obtener otros ejemplos, vea Patrones de creación limitados.

Comportamiento del modo completo

Al usar un canal limitado, puede especificar el comportamiento al que se adhiere el canal cuando se alcanza el límite configurado. En la tabla siguiente se enumeran los comportamientos de modo completo para cada valor de BoundedChannelFullMode:

Value Comportamiento
BoundedChannelFullMode.Wait Este es el valor predeterminado. Las llamadas a WriteAsync esperan a que haya espacio disponible para completar la operación de escritura. Las llamadas a TryWrite devuelven false inmediatamente.
BoundedChannelFullMode.DropNewest Quita y omite el elemento más reciente en el canal con el fin de dejar espacio para el elemento que se va a escribir.
BoundedChannelFullMode.DropOldest Quita y omite el elemento más antiguo en el canal con el fin de dejar espacio para el elemento que se va a escribir.
BoundedChannelFullMode.DropWrite Quita el elemento que se va a escribir.

Importante

Cada vez que un elemento Channel<TWrite,TRead>.Writer produce más rápido que lo que un elemento Channel<TWrite,TRead>.Reader puede consumir, el escritor del canal experimenta presión retroactiva.

API de productor

La función del productor se expone en Channel<TWrite,TRead>.Writer. Las API de productor y el comportamiento esperado se detallan en la tabla siguiente:

API Comportamiento esperado
ChannelWriter<T>.Complete Marca el canal como completo, lo que significa que ya no se escriben más elementos en él.
ChannelWriter<T>.TryComplete Intenta marcar el canal como completo, lo que significa que ya no se escriben más datos en él.
ChannelWriter<T>.TryWrite Intenta escribir el elemento especificado en el canal. Cuando se usa con un canal ilimitado, siempre devuelve true, a menos que el escritor del canal señale la finalización con ChannelWriter<T>.Complete o ChannelWriter<T>.TryComplete.
ChannelWriter<T>.WaitToWriteAsync Devuelve una instancia de ValueTask<TResult> que se completa cuando hay espacio disponible para escribir un elemento.
ChannelWriter<T>.WriteAsync Escribe asincrónicamente un elemento en el canal.

API de consumidor

La función del consumidor se expone en Channel<TWrite,TRead>.Reader. Las API de consumidor y el comportamiento esperado se detallan en la tabla siguiente:

API Comportamiento esperado
ChannelReader<T>.ReadAllAsync Crea una instancia de IAsyncEnumerable<T> que permite leer todos los datos del canal.
ChannelReader<T>.ReadAsync Lee asincrónicamente un elemento desde el canal.
ChannelReader<T>.TryPeek Intenta consultar un elemento del canal.
ChannelReader<T>.TryRead Intenta leer un elemento del canal.
ChannelReader<T>.WaitToReadAsync Devuelve una instancia de ValueTask<TResult> que se completa cuando hay datos disponibles para leer.

Patrones de uso común

Hay varios patrones de uso para los canales. La API está diseñada para ser sencilla, coherente y lo más flexible posible. Todos los métodos asincrónicos devuelven un elemento ValueTask (o ValueTask<bool>) que representa una operación asincrónica ligera que puede evitar la asignación si la operación se completa de forma sincrónica e incluso asincrónica. Además, la API está diseñada para admitir composición, de modo que el creador de un canal hace promesas sobre su uso previsto. Cuando se crea un canal con determinados parámetros, la implementación interna puede funcionar de forma más eficaz sabiendo estas promesas.

Patrones de creación

Imagine que está creando una solución de productor/consumidor para un sistema de posicionamiento global (GPS). Quiere realizar un seguimiento de las coordenadas de un dispositivo a lo largo del tiempo. Un objeto de coordenadas de ejemplo podría tener este aspecto:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Patrones de creación ilimitada

Un patrón de uso común es crear un canal ilimitado predeterminado:

var channel = Channel.CreateUnbounded<Coordinates>();

Pero en su lugar, supongamos que quiere crear un canal ilimitado con varios productores y consumidores:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

En este caso, todas las escrituras son sincrónicas, incluso WriteAsync. Esto se debe a que un canal ilimitado siempre tiene espacio disponible de inmediato para una escritura de forma eficaz. Pero con AllowSynchronousContinuations establecido en true, las escrituras pueden acabar realizando el trabajo asociado a un lector ejecutando sus continuaciones. Esto no afecta a la sincronización de la operación.

Patrones de creación limitada

Con los canales limitados, el consumidor debe conocer la capacidad de configuración del canal para ayudar a garantizar el consumo adecuado. Es decir, el consumidor debe saber qué comportamiento muestra el canal cuando se alcanza el límite configurado. Vamos a explorar algunos de los patrones comunes de creación limitada.

La manera más sencilla de crear un canal limitado es especificar una capacidad:

var channel = Channel.CreateBounded<Coordinates>(1);

El código anterior crea un canal limitado con una capacidad máxima de 1. Hay otras opciones disponibles; algunas son las mismas que en un canal ilimitado, mientras que otras son específicas de los canales ilimitados:

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

En el código anterior, el canal se crea como un canal limitado 1000 elementos, con un único escritor pero muchos lectores. Su comportamiento en el modo completo se define como DropWrite, lo que significa que quita el elemento que se escribe si el canal está completo.

Para observar los elementos que se quitan al usar canales limitados, registre una devolución de llamada itemDropped:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Siempre que el canal esté completo y se agregue un elemento nuevo, se invoca la devolución de llamada itemDropped. En este ejemplo, la devolución de llamada proporcionada escribe el elemento en la consola, pero puede realizar cualquier otra acción que quiera.

Patrones de productor

Imagine que el productor de este escenario está escribiendo nuevas coordenadas en el canal. El productor puede hacerlo llamando a TryWrite:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

El código anterior del productor:

  • Acepta Channel<Coordinates>.Writer (ChannelWriter<Coordinates>) como argumento, junto con el elemento Coordinates inicial.
  • Define un bucle while condicional que intenta mover las coordenadas mediante TryWrite.

Un productor alternativo podría usar el método WriteAsync:

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

De nuevo, Channel<Coordinates>.Writer se usa en un bucle while. Pero esta vez se llama al método WriteAsync. El método continuará solo después de que se hayan escrito las coordenadas. Cuando se cierra el bucle while, se realiza una llamada a Complete, lo que indica que no se escriben más datos en el canal.

Otro patrón de productor es usar el método WaitToWriteAsync; tenga en cuenta el código siguiente:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

Como parte del elemento while condicional, se usa el resultado de la llamada a WaitToWriteAsync para determinar si se va a continuar el bucle.

Patrones de consumidor

Hay varios patrones comunes de consumidor de canales. Cuando un canal no termina nunca, lo que significa que genera datos de manera indefinida, el consumidor puede usar un bucle while (true) y leer los datos a medida que estén disponibles:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Nota

Este código generará una excepción si el canal está cerrado.

Otro consumidor podría evitar este problema mediante un bucle while anidado, tal como se muestra en el código siguiente:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

En el código anterior, el consumidor espera a leer datos. Una vez que los datos están disponibles, el consumidor intenta leerlos. Estos bucles se siguen evaluando hasta que el productor del canal indica que ya no tiene datos para leer. Dicho esto, cuando se sabe que un productor tiene un número finito de elementos para generar y señala la finalización, el consumidor puede usar la semántica await foreach para iterar por los elementos:

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

El código anterior usa el método ReadAllAsync para leer todas las coordenadas del canal.

Vea también