Biblioteca System.Threading.Channels
O System.Threading.Channels namespace fornece um conjunto de estruturas de dados de sincronização para passar dados entre produtores e consumidores de forma assíncrona. A biblioteca tem como alvo o .NET Standard e funciona em todas as implementações do .NET.
Esta biblioteca está disponível no pacote NuGet System.Threading.Channels . No entanto, se você estiver usando o .NET Core 3.0 ou posterior, o pacote será incluído como parte da estrutura.
Os canais são uma implementação do modelo de programação conceptual produtor/consumidor. Neste modelo de programação, os produtores produzem dados de forma assíncrona e os consumidores consomem esses dados de forma assíncrona. Em outras palavras, esse modelo passa dados de uma parte para outra por meio de uma fila FIFO ("first-in, first-out"). Tente pensar em canais como faria com qualquer outro tipo de coleção genérica comum, como um List<T>
arquivo . A principal diferença é que essa coleção gerencia a sincronização e fornece vários modelos de consumo por meio de opções de criação de fábrica. Essas opções controlam o comportamento dos canais, como quantos elementos eles podem armazenar e o que acontece se esse limite for atingido, ou se o canal for acessado por vários produtores ou vários consumidores simultaneamente.
Dependendo de como um Channel<T>
é criado, seu leitor e escritor se comportam de forma diferente.
Para criar um canal que especifique uma capacidade máxima, chame Channel.CreateBounded. Para criar um canal que seja usado por qualquer número de leitores e escritores simultaneamente, ligue Channel.CreateUnboundedpara . Cada estratégia de delimitação expõe várias opções definidas pelo criador, ou BoundedChannelOptions UnboundedChannelOptions respectivamente.
Nota
Independentemente da estratégia de delimitação, um canal sempre lançará um ChannelClosedException quando for usado depois de fechado.
Para criar um canal ilimitado, chame uma das Channel.CreateUnbounded sobrecargas:
var channel = Channel.CreateUnbounded<T>();
Quando você cria um canal ilimitado, por padrão, o canal pode ser usado por qualquer número de leitores e gravadores simultaneamente. Como alternativa, você pode especificar um comportamento não padrão ao criar um canal não limitado fornecendo uma UnboundedChannelOptions
instância. A capacidade do canal é ilimitada e todas as gravações são realizadas de forma síncrona. Para obter mais exemplos, consulte Padrões de criação ilimitados.
Para criar um canal limitado, chame uma das Channel.CreateBounded sobrecargas:
var channel = Channel.CreateBounded<T>(7);
O código anterior cria um canal que tem uma capacidade máxima de 7
itens. Quando você cria um canal limitado, o canal é vinculado a uma capacidade máxima. Quando o limite é atingido, o comportamento padrão é que o canal bloqueia assincronamente o produtor até que o espaço fique disponível. Você pode configurar esse comportamento especificando uma opção ao criar o canal. Os canais delimitados podem ser criados com qualquer valor de capacidade superior a zero. Para outros exemplos, consulte Padrões de criação limitados.
Ao usar um canal limitado, você pode especificar o comportamento ao qual o canal adere quando o limite configurado é atingido. A tabela a seguir lista os comportamentos de modo completo para cada BoundedChannelFullMode valor:
Value | Comportamento |
---|---|
BoundedChannelFullMode.Wait | Este é o valor predefinido. Chamadas para WriteAsync aguardar espaço disponível para concluir a operação de gravação. Chamadas para TryWrite retornar false imediatamente. |
BoundedChannelFullMode.DropNewest | Remove e ignora o item mais recente no canal para abrir espaço para o item que está sendo escrito. |
BoundedChannelFullMode.DropOldest | Remove e ignora o item mais antigo no canal para abrir espaço para o item que está sendo escrito. |
BoundedChannelFullMode.DropWrite | Descarta o item que está sendo escrito. |
Importante
Sempre que um Channel<TWrite,TRead>.Writer produz mais rápido do que um Channel<TWrite,TRead>.Reader pode consumir, o escritor do canal experimenta pressão de volta.
A funcionalidade do produtor é exposta no Channel<TWrite,TRead>.Writer. As APIs do produtor e o comportamento esperado são detalhados na tabela a seguir:
API | Comportamento esperado |
---|---|
ChannelWriter<T>.Complete | Marca o canal como completo, o que significa que não há mais itens gravados nele. |
ChannelWriter<T>.TryComplete | Tenta marcar o canal como concluído, o que significa que não há mais dados gravados nele. |
ChannelWriter<T>.TryWrite | Tenta gravar o item especificado no canal. Quando usado com um canal ilimitado, isso sempre retorna true , a menos que o gravador do canal sinalize a conclusão com ChannelWriter<T>.Complete, ou ChannelWriter<T>.TryComplete. |
ChannelWriter<T>.WaitToWriteAsync | Retorna um ValueTask<TResult> que é concluído quando há espaço disponível para escrever um item. |
ChannelWriter<T>.WriteAsync | Grava um item de forma assíncrona no canal. |
A funcionalidade do consumidor é exposta no Channel<TWrite,TRead>.Reader. As APIs do consumidor e o comportamento esperado são detalhados na tabela a seguir:
API | Comportamento esperado |
---|---|
ChannelReader<T>.ReadAllAsync | Cria um IAsyncEnumerable<T> que permite ler todos os dados do canal. |
ChannelReader<T>.ReadAsync | Lê de forma assíncrona um item do canal. |
ChannelReader<T>.TryPeek | Tenta espreitar um item do canal. |
ChannelReader<T>.TryRead | Tenta ler um item do canal. |
ChannelReader<T>.WaitToReadAsync | Retorna um ValueTask<TResult> que é concluído quando os dados estão disponíveis para leitura. |
Existem vários padrões de uso para canais. A API foi projetada para ser simples, consistente e o mais flexível possível. Todos os métodos assíncronos retornam um ValueTask
(ou ValueTask<bool>
) que representa uma operação assíncrona leve que pode evitar a alocação se a operação for concluída de forma síncrona e, potencialmente, até assíncrona. Além disso, a API é projetada para ser composta, na medida em que o criador de um canal faz promessas sobre seu uso pretendido. Quando um canal é criado com determinados parâmetros, a implementação interna pode operar de forma mais eficiente conhecendo essas promessas.
Imagine que está a criar uma solução de produtor/consumidor para um sistema de posição global (GPS). Você deseja rastrear as coordenadas de um dispositivo ao longo do tempo. Um objeto de coordenadas de exemplo pode ter esta aparência:
/// <summary>
/// A representation of a device's coordinates,
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
Guid DeviceId,
double Latitude,
double Longitude);
Um padrão de uso comum é criar um canal não limitado padrão:
var channel = Channel.CreateUnbounded<Coordinates>();
Mas, em vez disso, vamos imaginar que você queira criar um canal ilimitado com vários produtores e consumidores:
var channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});
Nesse caso, todas as gravações são síncronas, mesmo o WriteAsync
. Isso ocorre porque um canal ilimitado sempre tem espaço disponível para uma gravação efetivamente imediata. No entanto, com AllowSynchronousContinuations
definido como true
, as gravações podem acabar fazendo um trabalho associado a um leitor, executando suas continuações. Isso não afeta a sincronicidade da operação.
Com canais limitados, a configurabilidade do canal deve ser conhecida pelo consumidor para ajudar a garantir o consumo adequado. Ou seja, o consumidor deve saber qual o comportamento que o canal apresenta quando o limite configurado é atingido. Vamos explorar alguns dos padrões comuns de criação limitada.
A maneira mais simples de criar um canal delimitado é especificar uma capacidade:
var channel = Channel.CreateBounded<Coordinates>(1);
O código anterior cria um canal delimitado com uma capacidade máxima de 1
. Outras opções estão disponíveis, algumas opções são as mesmas que um canal ilimitado, enquanto outras são específicas para canais ilimitados:
var channel = Channel.CreateBounded<Coordinates>(
new BoundedChannelOptions(1_000)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropWrite
});
No código anterior, o canal é criado como um canal limitado a 1.000 itens, com um único gravador, mas muitos leitores. Seu comportamento de modo completo é definido como DropWrite
, o que significa que ele descarta o item que está sendo escrito se o canal estiver cheio.
Para observar itens que são descartados ao usar canais limitados, registre um retorno de itemDropped
chamada:
var channel = Channel.CreateBounded(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest
},
static void (Coordinates dropped) =>
Console.WriteLine($"Coordinates dropped: {dropped}"));
Sempre que o canal está cheio e um novo item é adicionado, o retorno de itemDropped
chamada é invocado. Neste exemplo, o retorno de chamada fornecido grava o item no console, mas você é livre para executar qualquer outra ação que desejar.
Imagine que o produtor neste cenário está escrevendo novas coordenadas para o canal. O produtor pode fazê-lo ligando para TryWrite:
static void ProduceWithWhileAndTryWrite(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
}
}
O código do produtor precedente:
- Aceita o
Channel<Coordinates>.Writer
(ChannelWriter<Coordinates>
) como um argumento, juntamente com o inicialCoordinates
. - Define um loop condicional
while
que tenta mover as coordenadas usandoTryWrite
.
Um produtor alternativo pode utilizar o WriteAsync
método:
static async ValueTask ProduceWithWhileWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
await writer.WriteAsync(
item: coordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
});
}
writer.Complete();
}
Novamente, o Channel<Coordinates>.Writer
é usado dentro de um while
loop. Mas, desta vez, o WriteAsync método é chamado. O método continuará somente depois que as coordenadas tiverem sido gravadas. Quando o while
loop sai, uma chamada para é feita, o que sinaliza Complete que não há mais dados gravados no canal.
Outro padrão de produtor é usar o WaitToWriteAsync método, considere o seguinte código:
static async ValueTask ProduceWithWaitToWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
await writer.WaitToWriteAsync())
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
writer.Complete();
}
Como parte do condicional while
, o resultado da WaitToWriteAsync
chamada é usado para determinar se o loop deve continuar.
Existem vários padrões comuns de consumo de canais. Quando um canal não termina, o que significa que produz dados indefinidamente, o consumidor pode usar um while (true)
loop e ler os dados à medida que ficam disponíveis:
static async ValueTask ConsumeWithWhileAsync(
ChannelReader<Coordinates> reader)
{
while (true)
{
// May throw ChannelClosedException if
// the parent channel's writer signals complete.
Coordinates coordinates = await reader.ReadAsync();
Console.WriteLine(coordinates);
}
}
Nota
Este código lançará uma exceção se o canal estiver fechado.
Um consumidor alternativo poderia evitar essa preocupação usando um loop while aninhado, conforme mostrado no código a seguir:
static async ValueTask ConsumeWithNestedWhileAsync(
ChannelReader<Coordinates> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Coordinates coordinates))
{
Console.WriteLine(coordinates);
}
}
}
No código anterior, o consumidor aguarda para ler os dados. Uma vez que os dados estão disponíveis, o consumidor tenta lê-los. Esses loops continuam a avaliar até que o produtor do canal sinalize que não tem mais dados para serem lidos. Dito isto, quando um produtor é conhecido por ter um número finito de itens que produz e sinaliza conclusão, o consumidor pode usar await foreach
semântica para iterar sobre os itens:
static async ValueTask ConsumeWithAwaitForeachAsync(
ChannelReader<Coordinates> reader)
{
await foreach (Coordinates coordinates in reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
O código anterior usa o ReadAllAsync método para ler todas as coordenadas do canal.
Comentários do .NET
O .NET é um projeto código aberto. Selecione um link para fornecer comentários: