Formação
Módulo
Conectar comandos a um pipeline - Training
Neste módulo, você aprenderá como conectar comandos em um pipeline.
Este browser já não é suportado.
Atualize para o Microsoft Edge para tirar partido das mais recentes funcionalidades, atualizações de segurança e de suporte técnico.
System.IO.Pipelines é uma biblioteca projetada para facilitar a produção de E/S de alto desempenho no .NET. É uma biblioteca direcionada ao .NET Standard que funciona em todas as implementações do .NET.
A biblioteca está disponível no pacote Nuget System.IO.Pipelines .
Os aplicativos que analisam dados de streaming são compostos por código clichê com muitos fluxos de código especializados e incomuns. O clichê e o código de caso especial são complexos e difíceis de manter.
System.IO.Pipelines
foi arquitetado para:
O código a seguir é típico para um servidor TCP que recebe mensagens delimitadas por linha (delimitadas por '\n'
) de um cliente:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
O código anterior tem vários problemas:
ReadAsync
.stream.ReadAsync
. stream.ReadAsync
Retorna a quantidade de dados que foram lidas.ReadAsync
chamada.byte
matriz com cada leitura.Para corrigir os problemas anteriores, as seguintes alterações são necessárias:
Armazene em buffer os dados de entrada até que uma nova linha seja encontrada.
Analise todas as linhas retornadas no buffer.
É possível que a linha seja maior que 1 KB (1024 bytes). O código precisa redimensionar o buffer de entrada até que o delimitador seja encontrado, a fim de ajustar a linha completa dentro do buffer.
Considere o uso do pool de buffers para evitar a alocação de memória repetidamente.
O código a seguir aborda alguns desses problemas:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
O código anterior é complexo e não resolve todos os problemas identificados. Rede de alto desempenho geralmente significa escrever código complexo para maximizar o desempenho. System.IO.Pipelines
foi concebido para facilitar a escrita deste tipo de código.
A Pipe classe pode ser usada para criar um PipeWriter/PipeReader
par. Todos os dados gravados no PipeWriter
estão disponíveis no PipeReader
:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Existem dois loops:
FillPipeAsync
lê a partir do Socket
e escreve para o PipeWriter
.ReadPipeAsync
lê a PipeReader
partir das linhas de entrada e analisa.Não há buffers explícitos alocados. Todo o PipeReader
gerenciamento de buffer é delegado às implementações e PipeWriter
. Delegar o gerenciamento de buffer torna mais fácil para o consumo de código se concentrar apenas na lógica de negócios.
No primeiro ciclo:
PipeWriter
quantidade de dados que foram gravados no buffer.PipeReader
.No segundo loop, o PipeReader
consome os buffers gravados por PipeWriter
. Os buffers vêm do soquete. O apelo a PipeReader.ReadAsync
:
Retorna um ReadResult que contém duas informações importantes:
ReadOnlySequence<byte>
.IsCompleted
que indica se o fim dos dados (EOF) foi atingido.Depois de encontrar o delimitador de fim de linha (EOL) e analisar a linha:
PipeReader.AdvanceTo
é chamado para informar a PipeReader
quantidade de dados que foram consumidos e examinados.Os loops do leitor e do escritor terminam chamando Complete
. Complete
permite que o Pipe subjacente libere a memória que ele alocou.
Idealmente, a leitura e a análise trabalham em conjunto:
Normalmente, a análise leva mais tempo do que apenas copiar blocos de dados da rede:
Para um desempenho ideal, há um equilíbrio entre pausas frequentes e alocação de mais memória.
Para resolver o problema anterior, o Pipe
tem duas configurações para controlar o fluxo de dados:
PipeWriter.FlushAsync
retomadas.ValueTask<FlushResult>
quando a quantidade de dados nos Pipe
cruzes PauseWriterThreshold
.ValueTask<FlushResult>
quando se torna inferior a ResumeWriterThreshold
.Dois valores são usados para evitar ciclos rápidos, que podem ocorrer se um valor for usado.
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
Normalmente, ao usar async
e await
, o código assíncrono é retomado em um TaskScheduler ou no atual SynchronizationContext.
Ao fazer E/S, é importante ter um controle refinado sobre onde a E/S é executada. Esse controle permite tirar proveito dos caches da CPU de forma eficaz. O cache eficiente é fundamental para aplicativos de alto desempenho, como servidores Web. PipeScheduler Fornece controle sobre onde os retornos de chamada assíncronos são executados. Por predefinição:
SynchronizationContext
houver , ele usa o pool de threads para executar retornos de chamada.public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool é a PipeScheduler implementação que enfileira retornos de chamada para o pool de threads. PipeScheduler.ThreadPool
é o padrão e, geralmente, a melhor escolha. PipeScheduler.Inline pode causar consequências não intencionais, como deadlocks.
É frequentemente eficiente reutilizar o Pipe
objeto. Para redefinir o tubo, ligue PipeReader Reset quando ambos os PipeReader
e PipeWriter
estiverem concluídos.
PipeReader gerencia a memória em nome do chamador. Sempre ligue PipeReader.AdvanceTo depois de ligar PipeReader.ReadAsync. Isso permite saber PipeReader
quando o chamador é feito com a memória para que ele possa ser rastreado. O ReadOnlySequence<byte>
devolvido de PipeReader.ReadAsync
só é válido até a chamada do PipeReader.AdvanceTo
. É ilegal usar ReadOnlySequence<byte>
depois de ligar PipeReader.AdvanceTo
.
PipeReader.AdvanceTo
utiliza dois SequencePosition argumentos:
Marcar dados como consumidos significa que o pipe pode retornar a memória para o pool de buffers subjacente. Marcar os dados como observados controla o que a próxima chamada faz PipeReader.ReadAsync
. Marcar tudo como observado significa que a próxima chamada para PipeReader.ReadAsync
não retornará até que haja mais dados gravados no pipe. Qualquer outro valor fará com que a próxima chamada retorne PipeReader.ReadAsync
imediatamente com os dados observados e não observados, mas não com os dados que já foram consumidos.
Existem alguns padrões típicos que surgem ao tentar ler dados de streaming:
Os exemplos a seguir usam o TryParseLines
método para analisar mensagens de um ReadOnlySequence<byte>
arquivo . TryParseLines
analisa uma única mensagem e atualiza o buffer de entrada para cortar a mensagem analisada do buffer. TryParseLines
não faz parte do .NET, é um método escrito pelo usuário usado nas seções a seguir.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
O código a seguir lê uma única mensagem de um PipeReader
e a retorna ao chamador.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
O código anterior:
SequencePosition
e examinado SequencePosition
para apontar para o início do buffer de entrada cortado.Os dois SequencePosition
argumentos são atualizados porque TryParseLines
remove a mensagem analisada do buffer de entrada. Geralmente, ao analisar uma única mensagem do buffer, a posição examinada deve ser uma das seguintes:
O caso de mensagem única tem o maior potencial para erros. Passar os valores errados para examinados pode resultar em uma exceção de falta de memória ou um loop infinito. Para obter mais informações, consulte a seção Problemas comuns do PipeReader neste artigo.
O código a seguir lê todas as mensagens de um PipeReader
e chama ProcessMessageAsync
em cada um.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader.ReadAsync
:
CancellationToken
é cancelado enquanto há uma leitura pendente.PipeReader.CancelPendingRead
faz com que a chamada atual ou seguinte retorne PipeReader.ReadAsync
um ReadResult com IsCanceled
definido como true
. Isso pode ser útil para interromper o loop de leitura existente de forma não destrutiva e não excecional.private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Passar os valores errados para consumed
ou examined
pode resultar na leitura de dados já lidos.
A aprovação na qualidade de buffer.End
examinado pode resultar:
PipeReader.AdvanceTo(position, buffer.End)
ao processar uma única mensagem de cada vez a partir do buffer.Passar os valores errados para consumed
ou examined
pode resultar em um loop infinito. Por exemplo, se buffer.Start
não tiver sido alterado, PipeReader.AdvanceTo(buffer.Start)
a próxima chamada PipeReader.ReadAsync
retornará imediatamente antes da chegada de novos dados.
Passar os valores errados para consumed
ou examined
pode resultar em buffering infinito (eventual OOM).
Usar o ReadOnlySequence<byte>
after calling PipeReader.AdvanceTo
pode resultar em corrupção de memória (use depois de livre).
A falha na chamada PipeReader.Complete/CompleteAsync
pode resultar em um vazamento de memória.
Verificar ReadResult.IsCompleted e sair da lógica de leitura antes de processar o buffer resulta em perda de dados. A condição de saída do loop deve ser baseada em ReadResult.Buffer.IsEmpty
e ReadResult.IsCompleted
. Fazer isso incorretamente pode resultar em um loop infinito.
❌Perda de dados
O ReadResult
pode retornar o segmento final de dados quando IsCompleted
estiver definido como true
. Não ler esses dados antes de sair do loop de leitura resultará em perda de dados.
Aviso
NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Aviso
NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.
❌Loop infinito
A lógica a seguir pode resultar em um loop infinito se o fortrue
, Result.IsCompleted
mas nunca houver uma mensagem completa no buffer.
Aviso
NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Aviso
NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.
Aqui está outro pedaço de código com o mesmo problema. É verificar se há um buffer não vazio antes de verificar ReadResult.IsCompleted
. Como ele está em um else if
, ele fará um loop para sempre se nunca houver uma mensagem completa no buffer.
Aviso
NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Aviso
NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.
❌Aplicativo sem resposta
Ligar PipeReader.AdvanceTo
incondicionalmente com na examined
posição pode fazer com buffer.End
que o aplicativo deixe de responder ao analisar uma única mensagem. A próxima chamada para PipeReader.AdvanceTo
não retornará até:
Aviso
NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Aviso
NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.
❌Falta de memória (OOM)
Com as seguintes condições, o código a seguir mantém o buffer até que ocorra OutOfMemoryException :
PipeReader
não fazem uma mensagem completa. Por exemplo, ele não faz uma mensagem completa porque o outro lado está escrevendo uma mensagem grande (por exemplo, uma mensagem de 4 GB).Aviso
NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Aviso
NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.
❌Corrupção de memória
Ao escrever auxiliares que leem o buffer, qualquer carga retornada deve ser copiada antes de chamar Advance
o . O exemplo a seguir retornará a memória que o Pipe
descartou e poderá reutilizá-la para a próxima operação (leitura/gravação).
Aviso
NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Aviso
NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.
O PipeWriter gerencia buffers para escrever em nome do chamador. PipeWriter
implementos IBufferWriter<byte>
. IBufferWriter<byte>
torna possível obter acesso a buffers para executar gravações sem cópias de buffer extras.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
O código anterior:
PipeWriter
."Hello"
ASCII no .Memory<byte>
PipeWriter
, que envia os bytes para o dispositivo subjacente.O método anterior de escrita usa os buffers fornecidos pelo PipeWriter
. Poderia também ter utilizado PipeWriter.WriteAsync, que:
PipeWriter
arquivo .GetSpan
, Advance
conforme apropriado e chama FlushAsync.async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
FlushAsync suporta a passagem de um CancellationTokenarquivo . Passar um CancellationToken
resulta em um OperationCanceledException
se o token for cancelado enquanto há uma liberação pendente. PipeWriter.FlushAsync
suporta uma maneira de cancelar a operação de descarga atual via PipeWriter.CancelPendingFlush sem gerar uma exceção. A chamada PipeWriter.CancelPendingFlush
faz com que a chamada atual ou seguinte retorne PipeWriter.FlushAsync
PipeWriter.WriteAsync
um FlushResult com IsCanceled
definido como true
. Isto pode ser útil para interromper a descarga de rendimento de uma forma não destrutiva e não excecional.
GetMemory
ou GetSpan
fazer uma chamada FlushAsync
incompleta não é seguro.Complete
ou CompleteAsync
enquanto há dados não liberados pode resultar em corrupção de memória.As seguintes dicas irão ajudá-lo a usar as System.IO.Pipelines aulas com sucesso:
await
PipeWriter.FlushAsync enquanto escreve, e sempre verificar FlushResult.IsCompleted. Aborte a escrita se IsCompleted
for true
, pois isso indica que o leitor está concluído e não se importa mais com o que está escrito.PipeReader
tenha acesso.FlushAsync
se o leitor não puder iniciar até FlushAsync
terminar, pois isso pode causar um impasse.PipeReader
ou PipeWriter
acessa-os. Esses tipos não são thread-safe.AdvanceTo
ou concluir o PipeReader
ficheiro .Trata-se IDuplexPipe de um contrato para tipos que apoiam tanto a leitura como a escrita. Por exemplo, uma conexão de rede seria representada por um IDuplexPipe
arquivo .
Ao contrário Pipe
de , que contém a PipeReader
e a PipeWriter
, IDuplexPipe
representa um único lado de uma conexão full duplex. Isso significa que o que está escrito no testamento PipeWriter
não será lido a partir do PipeReader
.
Ao ler ou gravar dados de fluxo, você normalmente lê dados usando um desserializador e grava dados usando um serializador. A maioria dessas APIs de fluxo de leitura e gravação tem um Stream
parâmetro. Para facilitar a integração com essas APIs PipeReader
existentes e PipeWriter
expor um AsStream método. AsStream retorna uma Stream
implementação em torno do PipeReader
ou PipeWriter
.
PipeReader
e PipeWriter
as instâncias podem ser criadas usando os métodos estáticos Create
dados a um Stream objeto e opções de criação correspondentes opcionais.
A StreamPipeReaderOptions permissão para controle sobre a criação da instância com os PipeReader
seguintes parâmetros:
4096
.PipeReader
assume false
como padrão .1024
padrão .MemoryPool<byte>
usado ao alocar memória e assume como null
padrão .A StreamPipeWriterOptions permissão para controle sobre a criação da instância com os PipeWriter
seguintes parâmetros:
PipeWriter
assume false
como padrão .4096
MemoryPool<byte>
usado ao alocar memória e assume como null
padrão .Importante
Ao criar PipeReader
e PipeWriter
instâncias usando os Create
métodos, você precisa considerar o tempo de vida do Stream
objeto. Se você precisar acessar o fluxo depois que o leitor ou gravador terminar de usá-lo, você precisará definir o LeaveOpen
sinalizador nas true
opções de criação. Caso contrário, o fluxo será fechado.
O código a seguir demonstra a criação de PipeReader
instâncias e PipeWriter
usando os Create
métodos de um fluxo.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
O aplicativo usa um StreamReader para ler o arquivo lorem-ipsum.txt como um fluxo e deve terminar com uma linha em branco. O FileStream é passado para PipeReader.Create, que instancia um PipeReader
objeto. Em seguida, o aplicativo de console passa seu fluxo de saída padrão para PipeWriter.Create o uso do Console.OpenStandardOutput(). O exemplo suporta cancelamento.
Comentários do .NET
O .NET é um projeto código aberto. Selecione um link para fornecer comentários:
Formação
Módulo
Conectar comandos a um pipeline - Training
Neste módulo, você aprenderá como conectar comandos em um pipeline.