Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
System.IO.Pipelines é uma biblioteca projetada para facilitar a produção de E/S de alto desempenho no .NET. É uma biblioteca direcionada ao .NET Standard que funciona em todas as implementações do .NET.
A biblioteca está disponível no pacote Nuget System.IO.Pipelines .
Qual problema System.IO.Pipelines resolve
Os aplicativos que analisam dados de streaming são compostos por código clichê com muitos fluxos de código especializados e incomuns. O clichê e o código de caso especial são complexos e difíceis de manter.
System.IO.Pipelines
foi arquitetado para:
- Tenha alto desempenho analisando dados de streaming.
- Reduza a complexidade do código.
O código a seguir é típico para um servidor TCP que recebe mensagens delimitadas por linha (delimitadas por '\n'
) de um cliente:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
O código anterior tem vários problemas:
- A mensagem inteira (fim de linha) pode não ser recebida em uma única chamada para
ReadAsync
. - É ignorar o resultado do
stream.ReadAsync
.stream.ReadAsync
Retorna a quantidade de dados que foram lidas. - Ele não lida com o caso em que várias linhas são lidas em uma única
ReadAsync
chamada. - Ele aloca uma
byte
matriz com cada leitura.
Para corrigir os problemas anteriores, as seguintes alterações são necessárias:
Armazene em buffer os dados de entrada até que uma nova linha seja encontrada.
Analise todas as linhas retornadas no buffer.
É possível que a linha seja maior que 1 KB (1024 bytes). O código precisa redimensionar o buffer de entrada até que o delimitador seja encontrado, a fim de ajustar a linha completa dentro do buffer.
- Se o buffer for redimensionado, mais cópias de buffer serão feitas à medida que linhas mais longas aparecerem na entrada.
- Para reduzir o espaço desperdiçado, compacte o buffer usado para ler linhas.
Considere o uso do pool de buffers para evitar a alocação de memória repetidamente.
O código a seguir aborda alguns desses problemas:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
O código anterior é complexo e não resolve todos os problemas identificados. Rede de alto desempenho geralmente significa escrever código complexo para maximizar o desempenho.
System.IO.Pipelines
foi concebido para facilitar a escrita deste tipo de código.
Tubo
A Pipe classe pode ser usada para criar um PipeWriter/PipeReader
par. Todos os dados gravados no PipeWriter
estão disponíveis no PipeReader
:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
Utilização básica do tubo
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Existem dois loops:
-
FillPipeAsync
lê a partir doSocket
e escreve para oPipeWriter
. -
ReadPipeAsync
lê aPipeReader
partir das linhas de entrada e analisa.
Não há buffers explícitos alocados. Todo o PipeReader
gerenciamento de buffer é delegado às implementações e PipeWriter
. Delegar o gerenciamento de buffer torna mais fácil para o consumo de código se concentrar apenas na lógica de negócios.
No primeiro ciclo:
- PipeWriter.GetMemory(Int32) é chamado para obter memória do escritor subjacente.
-
PipeWriter.Advance(Int32) é chamado para informar a
PipeWriter
quantidade de dados que foram gravados no buffer. -
PipeWriter.FlushAsync é chamado a disponibilizar os dados ao
PipeReader
.
No segundo loop, o PipeReader
consome os buffers gravados por PipeWriter
. Os buffers vêm do soquete. O apelo a PipeReader.ReadAsync
:
Retorna um ReadResult que contém duas informações importantes:
- Os dados que foram lidos na forma de
ReadOnlySequence<byte>
. - Um booleano
IsCompleted
que indica se o fim dos dados (EOF) foi atingido.
- Os dados que foram lidos na forma de
Depois de encontrar o delimitador de fim de linha (EOL) e analisar a linha:
- A lógica processa o buffer para ignorar o que já foi processado.
-
PipeReader.AdvanceTo
é chamado para informar aPipeReader
quantidade de dados que foram consumidos e examinados.
Os loops do leitor e do escritor terminam chamando Complete
.
Complete
permite que o Pipe subjacente libere a memória que ele alocou.
Controlo da contrapressão e do fluxo
Idealmente, a leitura e a análise trabalham em conjunto:
- O thread de leitura consome dados da rede e os coloca em buffers.
- O thread de análise é responsável pela construção das estruturas de dados apropriadas.
Normalmente, a análise leva mais tempo do que apenas copiar blocos de dados da rede:
- O segmento de leitura fica à frente do segmento de análise.
- O thread de leitura tem que diminuir a velocidade ou alocar mais memória para armazenar os dados para o thread de análise.
Para um desempenho ideal, há um equilíbrio entre pausas frequentes e alocação de mais memória.
Para resolver o problema anterior, o Pipe
tem duas configurações para controlar o fluxo de dados:
- PauseWriterThreshold: Determina a quantidade de dados que deve ser armazenada em buffer antes das chamadas para FlushAsync pausar.
-
ResumeWriterThreshold: Determina a quantidade de dados que o leitor deve observar antes que as chamadas sejam
PipeWriter.FlushAsync
retomadas.
- Retorna um incompleto
ValueTask<FlushResult>
quando a quantidade de dados nosPipe
cruzesPauseWriterThreshold
. - Completa
ValueTask<FlushResult>
quando se torna inferior aResumeWriterThreshold
.
Dois valores são usados para evitar ciclos rápidos, que podem ocorrer se um valor for usado.
Exemplos
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
PipeScheduler
Normalmente, ao usar async
e await
, o código assíncrono é retomado em um TaskScheduler ou no atual SynchronizationContext.
Ao fazer E/S, é importante ter um controle refinado sobre onde a E/S é executada. Esse controle permite tirar proveito dos caches da CPU de forma eficaz. O cache eficiente é fundamental para aplicativos de alto desempenho, como servidores Web. PipeScheduler Fornece controle sobre onde os retornos de chamada assíncronos são executados. Por predefinição:
- A corrente SynchronizationContext é usada.
- Se não
SynchronizationContext
houver , ele usa o pool de threads para executar retornos de chamada.
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool é a PipeScheduler implementação que enfileira retornos de chamada para o pool de threads.
PipeScheduler.ThreadPool
é o padrão e, geralmente, a melhor escolha.
PipeScheduler.Inline pode causar consequências não intencionais, como deadlocks.
Reposição do tubo
É frequentemente eficiente reutilizar o Pipe
objeto. Para redefinir o tubo, ligue PipeReaderReset quando ambos os PipeReader
e PipeWriter
estiverem concluídos.
Leitor de tubos
PipeReader gerencia a memória em nome do chamador.
Sempre ligue PipeReader.AdvanceTo depois de ligar PipeReader.ReadAsync. Isso permite saber PipeReader
quando o chamador é feito com a memória para que ele possa ser rastreado. O ReadOnlySequence<byte>
devolvido de PipeReader.ReadAsync
só é válido até a chamada do PipeReader.AdvanceTo
. É ilegal usar ReadOnlySequence<byte>
depois de ligar PipeReader.AdvanceTo
.
PipeReader.AdvanceTo
utiliza dois SequencePosition argumentos:
- O primeiro argumento determina a quantidade de memória consumida.
- O segundo argumento determina quanto do buffer foi observado.
Marcar dados como consumidos significa que o pipe pode retornar a memória para o pool de buffers subjacente. Marcar os dados como observados controla o que a próxima chamada faz PipeReader.ReadAsync
. Marcar tudo como observado significa que a próxima chamada para PipeReader.ReadAsync
não retornará até que haja mais dados gravados no pipe. Qualquer outro valor fará com que a próxima chamada retorne PipeReader.ReadAsync
imediatamente com os dados observados e não observados, mas não com os dados que já foram consumidos.
Ler cenários de streaming de dados
Existem alguns padrões típicos que surgem ao tentar ler dados de streaming:
- Dado um fluxo de dados, analise uma única mensagem.
- Dado um fluxo de dados, analise todas as mensagens disponíveis.
Os exemplos a seguir usam o TryParseLines
método para analisar mensagens de um ReadOnlySequence<byte>
arquivo .
TryParseLines
analisa uma única mensagem e atualiza o buffer de entrada para cortar a mensagem analisada do buffer.
TryParseLines
não faz parte do .NET, é um método escrito pelo usuário usado nas seções a seguir.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Ler uma única mensagem
O código a seguir lê uma única mensagem de um PipeReader
e a retorna ao chamador.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
O código anterior:
- Analisa uma única mensagem.
- Atualiza o consumido
SequencePosition
e examinadoSequencePosition
para apontar para o início do buffer de entrada cortado.
Os dois SequencePosition
argumentos são atualizados porque TryParseLines
remove a mensagem analisada do buffer de entrada. Geralmente, ao analisar uma única mensagem do buffer, a posição examinada deve ser uma das seguintes:
- O final da mensagem.
- O fim do buffer recebido se nenhuma mensagem foi encontrada.
O caso de mensagem única tem o maior potencial para erros. Passar os valores errados para examinados pode resultar em uma exceção de falta de memória ou um loop infinito. Para obter mais informações, consulte a seção Problemas comuns do PipeReader neste artigo.
Ler várias mensagens
O código a seguir lê todas as mensagens de um PipeReader
e chama ProcessMessageAsync
em cada um.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Cancelamento
PipeReader.ReadAsync
:
- Suporta a passagem de um CancellationTokenarquivo .
- Lança um OperationCanceledException se o
CancellationToken
é cancelado enquanto há uma leitura pendente. - Suporta uma maneira de cancelar a operação de leitura atual via PipeReader.CancelPendingRead, o que evita gerar uma exceção. A chamada
PipeReader.CancelPendingRead
faz com que a chamada atual ou seguinte retornePipeReader.ReadAsync
um ReadResult comIsCanceled
definido comotrue
. Isso pode ser útil para interromper o loop de leitura existente de forma não destrutiva e não excecional.
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Problemas comuns do PipeReader
Passar os valores errados para
consumed
ouexamined
pode resultar na leitura de dados já lidos.A aprovação na qualidade de
buffer.End
examinado pode resultar:- Dados paralisados
- Possivelmente uma eventual exceção de falta de memória (OOM) se os dados não forem consumidos. Por exemplo,
PipeReader.AdvanceTo(position, buffer.End)
ao processar uma única mensagem de cada vez a partir do buffer.
Passar os valores errados para
consumed
ouexamined
pode resultar em um loop infinito. Por exemplo, sePipeReader.AdvanceTo(buffer.Start)
não tiver sido alterado,buffer.Start
a próxima chamadaPipeReader.ReadAsync
retornará imediatamente antes da chegada de novos dados.Passar os valores errados para
consumed
ouexamined
pode resultar em buffering infinito (eventual OOM).Usar o
ReadOnlySequence<byte>
after callingPipeReader.AdvanceTo
pode resultar em corrupção de memória (use depois de livre).A falha na chamada
PipeReader.Complete/CompleteAsync
pode resultar em um vazamento de memória.Verificar ReadResult.IsCompleted e sair da lógica de leitura antes de processar o buffer resulta em perda de dados. A condição de saída do loop deve ser baseada em
ReadResult.Buffer.IsEmpty
eReadResult.IsCompleted
. Fazer isso incorretamente pode resultar em um loop infinito.
Código problemático
❌ Perda de dados
O ReadResult
pode retornar o segmento final de dados quando IsCompleted
estiver definido como true
. Não ler esses dados antes de sair do loop de leitura resultará em perda de dados.
Aviso
NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Aviso
NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.
❌ Loop infinito
A lógica a seguir pode resultar em um loop infinito se o forResult.IsCompleted
, true
mas nunca houver uma mensagem completa no buffer.
Aviso
NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Aviso
NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.
Aqui está outro pedaço de código com o mesmo problema. É verificar se há um buffer não vazio antes de verificar ReadResult.IsCompleted
. Como ele está em um else if
, ele fará um loop para sempre se nunca houver uma mensagem completa no buffer.
Aviso
NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Aviso
NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.
❌ Aplicativo sem resposta
Ligar PipeReader.AdvanceTo
incondicionalmente com na buffer.End
posição pode fazer com examined
que o aplicativo deixe de responder ao analisar uma única mensagem. A próxima chamada para PipeReader.AdvanceTo
não retornará até:
- Há mais dados gravados no tubo.
- E os novos dados não foram examinados previamente.
Aviso
NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Aviso
NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.
❌ Falta de memória (OOM)
Com as seguintes condições, o código a seguir mantém o buffer até que ocorra OutOfMemoryException :
- Não há tamanho máximo de mensagem.
- Os dados retornados do
PipeReader
não fazem uma mensagem completa. Por exemplo, ele não faz uma mensagem completa porque o outro lado está escrevendo uma mensagem grande (por exemplo, uma mensagem de 4 GB).
Aviso
NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Aviso
NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.
❌ Corrupção de memória
Ao escrever auxiliares que leem o buffer, qualquer carga retornada deve ser copiada antes de chamar Advance
o . O exemplo a seguir retornará a memória que o Pipe
descartou e poderá reutilizá-la para a próxima operação (leitura/gravação).
Aviso
NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Aviso
NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.
PipeWriter
O PipeWriter gerencia buffers para escrever em nome do chamador.
PipeWriter
implementos IBufferWriter<byte>
.
IBufferWriter<byte>
torna possível obter acesso a buffers para executar gravações sem cópias de buffer extras.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
O código anterior:
- Solicita um buffer de pelo menos 5 bytes do usando
PipeWriter
o GetMemory . - Grava bytes para a cadeia de caracteres
"Hello"
ASCII no .Memory<byte>
- Chamadas Advance para indicar quantos bytes foram gravados no buffer.
- Libera o
PipeWriter
, que envia os bytes para o dispositivo subjacente.
O método anterior de escrita usa os buffers fornecidos pelo PipeWriter
. Poderia também ter utilizado PipeWriter.WriteAsync, que:
- Copia o buffer existente para o
PipeWriter
arquivo . - Chama
GetSpan
,Advance
conforme apropriado e chama FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
Cancelamento
FlushAsync suporta a passagem de um CancellationTokenarquivo . Passar um CancellationToken
resulta em um OperationCanceledException
se o token for cancelado enquanto há uma liberação pendente.
PipeWriter.FlushAsync
suporta uma maneira de cancelar a operação de descarga atual via PipeWriter.CancelPendingFlush sem gerar uma exceção. A chamada PipeWriter.CancelPendingFlush
faz com que a chamada atual ou seguinte retorne PipeWriter.FlushAsync
PipeWriter.WriteAsync
um FlushResult com IsCanceled
definido como true
. Isto pode ser útil para interromper a descarga de rendimento de uma forma não destrutiva e não excecional.
Problemas comuns do PipeWriter
- GetSpan e GetMemory retornar um buffer com pelo menos a quantidade solicitada de memória. Não assuma tamanhos exatos de buffer.
- Não há garantia de que chamadas sucessivas retornarão o mesmo buffer ou o mesmo buffer de tamanho.
- Um novo buffer deve ser solicitado após a chamada Advance para continuar gravando mais dados. O buffer adquirido anteriormente não pode ser gravado.
- Ligar
GetMemory
ouGetSpan
fazer uma chamadaFlushAsync
incompleta não é seguro. - Ligar
Complete
ouCompleteAsync
enquanto há dados não liberados pode resultar em corrupção de memória.
Dicas para usar o PipeReader e o PipeWriter
As seguintes dicas irão ajudá-lo a usar as System.IO.Pipelines aulas com sucesso:
- Sempre preencha o PipeReader e o PipeWriter, incluindo uma exceção quando aplicável.
- Sempre ligue PipeReader.AdvanceTo depois de ligar PipeReader.ReadAsync.
- Periodicamente
await
PipeWriter.FlushAsync enquanto escreve, e sempre verificar FlushResult.IsCompleted. Aborte a escrita seIsCompleted
fortrue
, pois isso indica que o leitor está concluído e não se importa mais com o que está escrito. - Ligue PipeWriter.FlushAsync depois de escrever algo que você deseja que o
PipeReader
tenha acesso. - Não ligue
FlushAsync
se o leitor não puder iniciar atéFlushAsync
terminar, pois isso pode causar um impasse. - Certifique-se de que apenas um contexto "possui" um
PipeReader
ouPipeWriter
acessa-os. Esses tipos não são thread-safe. - Nunca aceda a um ReadResult.Buffer depois de ligar
AdvanceTo
ou concluir oPipeReader
ficheiro .
IDuplexPipe
Trata-se IDuplexPipe de um contrato para tipos que apoiam tanto a leitura como a escrita. Por exemplo, uma conexão de rede seria representada por um IDuplexPipe
arquivo .
Ao contrário Pipe
de , que contém a PipeReader
e a PipeWriter
, IDuplexPipe
representa um único lado de uma conexão full duplex. Isso significa que o que está escrito no testamento PipeWriter
não será lido a partir do PipeReader
.
Fluxos
Ao ler ou gravar dados de fluxo, você normalmente lê dados usando um desserializador e grava dados usando um serializador. A maioria dessas APIs de fluxo de leitura e gravação tem um Stream
parâmetro. Para facilitar a integração com essas APIs PipeReader
existentes e PipeWriter
expor um AsStream método.
AsStream retorna uma Stream
implementação em torno do PipeReader
ou PipeWriter
.
Exemplo de fluxo
PipeReader
e PipeWriter
as instâncias podem ser criadas usando os métodos estáticos Create
dados a um Stream objeto e opções de criação correspondentes opcionais.
A StreamPipeReaderOptions permissão para controle sobre a criação da instância com os PipeReader
seguintes parâmetros:
-
StreamPipeReaderOptions.BufferSize é o tamanho mínimo do buffer em bytes usado ao alugar memória do pool e o padrão é
4096
. -
StreamPipeReaderOptions.LeaveOpen flag determina se o fluxo subjacente é ou não deixado aberto após a conclusão e
PipeReader
assumefalse
como padrão . -
StreamPipeReaderOptions.MinimumReadSize representa o limite de bytes restantes no buffer antes que um novo buffer seja alocado e assume como
1024
padrão . -
StreamPipeReaderOptions.Pool é o
MemoryPool<byte>
usado ao alocar memória e assume comonull
padrão .
A StreamPipeWriterOptions permissão para controle sobre a criação da instância com os PipeWriter
seguintes parâmetros:
-
StreamPipeWriterOptions.LeaveOpen flag determina se o fluxo subjacente é ou não deixado aberto após a conclusão e
PipeWriter
assumefalse
como padrão . -
StreamPipeWriterOptions.MinimumBufferSizerepresenta o tamanho mínimo do buffer a ser usado ao alugar memória do Pool, e o padrão é .
4096
-
StreamPipeWriterOptions.Pool é o
MemoryPool<byte>
usado ao alocar memória e assume comonull
padrão .
Importante
Ao criar PipeReader
e PipeWriter
instâncias usando os Create
métodos, você precisa considerar o tempo de vida do Stream
objeto. Se você precisar acessar o fluxo depois que o leitor ou gravador terminar de usá-lo, você precisará definir o LeaveOpen
sinalizador nas true
opções de criação. Caso contrário, o fluxo será fechado.
O código a seguir demonstra a criação de PipeReader
instâncias e PipeWriter
usando os Create
métodos de um fluxo.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
O aplicativo usa um StreamReader para ler o arquivo lorem-ipsum.txt como um fluxo e deve terminar com uma linha em branco. O FileStream é passado para PipeReader.Create, que instancia um PipeReader
objeto. Em seguida, o aplicativo de console passa seu fluxo de saída padrão para PipeWriter.Create o uso do Console.OpenStandardOutput(). O exemplo suporta cancelamento.