Ler em inglês

Partilhar via


System.IO.Pipelines em .NET

System.IO.Pipelines é uma biblioteca projetada para facilitar a produção de E/S de alto desempenho no .NET. É uma biblioteca direcionada ao .NET Standard que funciona em todas as implementações do .NET.

A biblioteca está disponível no pacote Nuget System.IO.Pipelines .

Qual problema System.IO.Pipelines resolve

Os aplicativos que analisam dados de streaming são compostos por código clichê com muitos fluxos de código especializados e incomuns. O clichê e o código de caso especial são complexos e difíceis de manter.

System.IO.Pipelines foi arquitetado para:

  • Tenha alto desempenho analisando dados de streaming.
  • Reduza a complexidade do código.

O código a seguir é típico para um servidor TCP que recebe mensagens delimitadas por linha (delimitadas por '\n') de um cliente:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

O código anterior tem vários problemas:

  • A mensagem inteira (fim de linha) pode não ser recebida em uma única chamada para ReadAsync.
  • É ignorar o resultado do stream.ReadAsync. stream.ReadAsync Retorna a quantidade de dados que foram lidas.
  • Ele não lida com o caso em que várias linhas são lidas em uma única ReadAsync chamada.
  • Ele aloca uma byte matriz com cada leitura.

Para corrigir os problemas anteriores, as seguintes alterações são necessárias:

  • Armazene em buffer os dados de entrada até que uma nova linha seja encontrada.

  • Analise todas as linhas retornadas no buffer.

  • É possível que a linha seja maior que 1 KB (1024 bytes). O código precisa redimensionar o buffer de entrada até que o delimitador seja encontrado, a fim de ajustar a linha completa dentro do buffer.

    • Se o buffer for redimensionado, mais cópias de buffer serão feitas à medida que linhas mais longas aparecerem na entrada.
    • Para reduzir o espaço desperdiçado, compacte o buffer usado para ler linhas.
  • Considere o uso do pool de buffers para evitar a alocação de memória repetidamente.

  • O código a seguir aborda alguns desses problemas:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

O código anterior é complexo e não resolve todos os problemas identificados. Rede de alto desempenho geralmente significa escrever código complexo para maximizar o desempenho. System.IO.Pipelines foi concebido para facilitar a escrita deste tipo de código.

Tubo

A Pipe classe pode ser usada para criar um PipeWriter/PipeReader par. Todos os dados gravados no PipeWriter estão disponíveis no PipeReader:

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Utilização básica do tubo

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

Existem dois loops:

  • FillPipeAsync lê a partir do Socket e escreve para o PipeWriter.
  • ReadPipeAsync lê a PipeReader partir das linhas de entrada e analisa.

Não há buffers explícitos alocados. Todo o PipeReader gerenciamento de buffer é delegado às implementações e PipeWriter . Delegar o gerenciamento de buffer torna mais fácil para o consumo de código se concentrar apenas na lógica de negócios.

No primeiro ciclo:

No segundo loop, o PipeReader consome os buffers gravados por PipeWriter. Os buffers vêm do soquete. O apelo a PipeReader.ReadAsync:

  • Retorna um ReadResult que contém duas informações importantes:

    • Os dados que foram lidos na forma de ReadOnlySequence<byte>.
    • Um booleano IsCompleted que indica se o fim dos dados (EOF) foi atingido.

Depois de encontrar o delimitador de fim de linha (EOL) e analisar a linha:

  • A lógica processa o buffer para ignorar o que já foi processado.
  • PipeReader.AdvanceTo é chamado para informar a PipeReader quantidade de dados que foram consumidos e examinados.

Os loops do leitor e do escritor terminam chamando Complete. Complete permite que o Pipe subjacente libere a memória que ele alocou.

Controlo da contrapressão e do fluxo

Idealmente, a leitura e a análise trabalham em conjunto:

  • O thread de leitura consome dados da rede e os coloca em buffers.
  • O thread de análise é responsável pela construção das estruturas de dados apropriadas.

Normalmente, a análise leva mais tempo do que apenas copiar blocos de dados da rede:

  • O segmento de leitura fica à frente do segmento de análise.
  • O thread de leitura tem que diminuir a velocidade ou alocar mais memória para armazenar os dados para o thread de análise.

Para um desempenho ideal, há um equilíbrio entre pausas frequentes e alocação de mais memória.

Para resolver o problema anterior, o Pipe tem duas configurações para controlar o fluxo de dados:

  • PauseWriterThreshold: Determina a quantidade de dados que deve ser armazenada em buffer antes das chamadas para FlushAsync pausar.
  • ResumeWriterThreshold: Determina a quantidade de dados que o leitor deve observar antes que as chamadas sejam PipeWriter.FlushAsync retomadas.

Diagrama com ResumeWriterThreshold e PauseWriterThreshold

PipeWriter.FlushAsync:

  • Retorna um incompleto ValueTask<FlushResult> quando a quantidade de dados nos Pipe cruzes PauseWriterThreshold.
  • Completa ValueTask<FlushResult> quando se torna inferior a ResumeWriterThreshold.

Dois valores são usados para evitar ciclos rápidos, que podem ocorrer se um valor for usado.

Exemplos

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

Normalmente, ao usar async e await, o código assíncrono é retomado em um TaskScheduler ou no atual SynchronizationContext.

Ao fazer E/S, é importante ter um controle refinado sobre onde a E/S é executada. Esse controle permite tirar proveito dos caches da CPU de forma eficaz. O cache eficiente é fundamental para aplicativos de alto desempenho, como servidores Web. PipeScheduler Fornece controle sobre onde os retornos de chamada assíncronos são executados. Por predefinição:

  • A corrente SynchronizationContext é usada.
  • Se não SynchronizationContexthouver , ele usa o pool de threads para executar retornos de chamada.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPool é a PipeScheduler implementação que enfileira retornos de chamada para o pool de threads. PipeScheduler.ThreadPool é o padrão e, geralmente, a melhor escolha. PipeScheduler.Inline pode causar consequências não intencionais, como deadlocks.

Reposição do tubo

É frequentemente eficiente reutilizar o Pipe objeto. Para redefinir o tubo, ligue PipeReader Reset quando ambos os PipeReader e PipeWriter estiverem concluídos.

Leitor de tubos

PipeReader gerencia a memória em nome do chamador. Sempre ligue PipeReader.AdvanceTo depois de ligar PipeReader.ReadAsync. Isso permite saber PipeReader quando o chamador é feito com a memória para que ele possa ser rastreado. O ReadOnlySequence<byte> devolvido de PipeReader.ReadAsync só é válido até a chamada do PipeReader.AdvanceTo. É ilegal usar ReadOnlySequence<byte> depois de ligar PipeReader.AdvanceTo.

PipeReader.AdvanceTo utiliza dois SequencePosition argumentos:

  • O primeiro argumento determina a quantidade de memória consumida.
  • O segundo argumento determina quanto do buffer foi observado.

Marcar dados como consumidos significa que o pipe pode retornar a memória para o pool de buffers subjacente. Marcar os dados como observados controla o que a próxima chamada faz PipeReader.ReadAsync . Marcar tudo como observado significa que a próxima chamada para PipeReader.ReadAsync não retornará até que haja mais dados gravados no pipe. Qualquer outro valor fará com que a próxima chamada retorne PipeReader.ReadAsync imediatamente com os dados observados e não observados, mas não com os dados que já foram consumidos.

Ler cenários de streaming de dados

Existem alguns padrões típicos que surgem ao tentar ler dados de streaming:

  • Dado um fluxo de dados, analise uma única mensagem.
  • Dado um fluxo de dados, analise todas as mensagens disponíveis.

Os exemplos a seguir usam o TryParseLines método para analisar mensagens de um ReadOnlySequence<byte>arquivo . TryParseLines analisa uma única mensagem e atualiza o buffer de entrada para cortar a mensagem analisada do buffer. TryParseLines não faz parte do .NET, é um método escrito pelo usuário usado nas seções a seguir.

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Ler uma única mensagem

O código a seguir lê uma única mensagem de um PipeReader e a retorna ao chamador.

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

O código anterior:

  • Analisa uma única mensagem.
  • Atualiza o consumido SequencePosition e examinado SequencePosition para apontar para o início do buffer de entrada cortado.

Os dois SequencePosition argumentos são atualizados porque TryParseLines remove a mensagem analisada do buffer de entrada. Geralmente, ao analisar uma única mensagem do buffer, a posição examinada deve ser uma das seguintes:

  • O final da mensagem.
  • O fim do buffer recebido se nenhuma mensagem foi encontrada.

O caso de mensagem única tem o maior potencial para erros. Passar os valores errados para examinados pode resultar em uma exceção de falta de memória ou um loop infinito. Para obter mais informações, consulte a seção Problemas comuns do PipeReader neste artigo.

Ler várias mensagens

O código a seguir lê todas as mensagens de um PipeReader e chama ProcessMessageAsync em cada um.

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Cancelamento

PipeReader.ReadAsync:

  • Suporta a passagem de um CancellationTokenarquivo .
  • Lança um OperationCanceledException se o CancellationToken é cancelado enquanto há uma leitura pendente.
  • Suporta uma maneira de cancelar a operação de leitura atual via PipeReader.CancelPendingRead, o que evita gerar uma exceção. A chamada PipeReader.CancelPendingRead faz com que a chamada atual ou seguinte retorne PipeReader.ReadAsync um ReadResult com IsCanceled definido como true. Isso pode ser útil para interromper o loop de leitura existente de forma não destrutiva e não excecional.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Problemas comuns do PipeReader

  • Passar os valores errados para consumed ou examined pode resultar na leitura de dados já lidos.

  • A aprovação na qualidade de buffer.End examinado pode resultar:

    • Dados paralisados
    • Possivelmente uma eventual exceção de falta de memória (OOM) se os dados não forem consumidos. Por exemplo, PipeReader.AdvanceTo(position, buffer.End) ao processar uma única mensagem de cada vez a partir do buffer.
  • Passar os valores errados para consumed ou examined pode resultar em um loop infinito. Por exemplo, se buffer.Start não tiver sido alterado, PipeReader.AdvanceTo(buffer.Start) a próxima chamada PipeReader.ReadAsync retornará imediatamente antes da chegada de novos dados.

  • Passar os valores errados para consumed ou examined pode resultar em buffering infinito (eventual OOM).

  • Usar o ReadOnlySequence<byte> after calling PipeReader.AdvanceTo pode resultar em corrupção de memória (use depois de livre).

  • A falha na chamada PipeReader.Complete/CompleteAsync pode resultar em um vazamento de memória.

  • Verificar ReadResult.IsCompleted e sair da lógica de leitura antes de processar o buffer resulta em perda de dados. A condição de saída do loop deve ser baseada em ReadResult.Buffer.IsEmpty e ReadResult.IsCompleted. Fazer isso incorretamente pode resultar em um loop infinito.

Código problemático

Perda de dados

O ReadResult pode retornar o segmento final de dados quando IsCompleted estiver definido como true. Não ler esses dados antes de sair do loop de leitura resultará em perda de dados.

Aviso

NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Aviso

NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.

Loop infinito

A lógica a seguir pode resultar em um loop infinito se o fortrue, Result.IsCompleted mas nunca houver uma mensagem completa no buffer.

Aviso

NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Aviso

NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.

Aqui está outro pedaço de código com o mesmo problema. É verificar se há um buffer não vazio antes de verificar ReadResult.IsCompleted. Como ele está em um else if, ele fará um loop para sempre se nunca houver uma mensagem completa no buffer.

Aviso

NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Aviso

NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.

Aplicativo sem resposta

Ligar PipeReader.AdvanceTo incondicionalmente com na examined posição pode fazer com buffer.End que o aplicativo deixe de responder ao analisar uma única mensagem. A próxima chamada para PipeReader.AdvanceTo não retornará até:

  • Há mais dados gravados no tubo.
  • E os novos dados não foram examinados previamente.

Aviso

NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Aviso

NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.

Falta de memória (OOM)

Com as seguintes condições, o código a seguir mantém o buffer até que ocorra OutOfMemoryException :

  • Não há tamanho máximo de mensagem.
  • Os dados retornados do PipeReader não fazem uma mensagem completa. Por exemplo, ele não faz uma mensagem completa porque o outro lado está escrevendo uma mensagem grande (por exemplo, uma mensagem de 4 GB).

Aviso

NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Aviso

NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.

Corrupção de memória

Ao escrever auxiliares que leem o buffer, qualquer carga retornada deve ser copiada antes de chamar Advanceo . O exemplo a seguir retornará a memória que o Pipe descartou e poderá reutilizá-la para a próxima operação (leitura/gravação).

Aviso

NÃO use o código a seguir. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo a seguir é fornecido para explicar os problemas comuns do PipeReader.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Aviso

NÃO use o código anterior. Usar este exemplo resultará em perda de dados, travamentos, problemas de segurança e NÃO deve ser copiado. O exemplo anterior é fornecido para explicar os problemas comuns do PipeReader.

PipeWriter

O PipeWriter gerencia buffers para escrever em nome do chamador. PipeWriter implementos IBufferWriter<byte>. IBufferWriter<byte> torna possível obter acesso a buffers para executar gravações sem cópias de buffer extras.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

O código anterior:

  • Solicita um buffer de pelo menos 5 bytes do usando GetMemoryo PipeWriter .
  • Grava bytes para a cadeia de caracteres "Hello" ASCII no .Memory<byte>
  • Chamadas Advance para indicar quantos bytes foram gravados no buffer.
  • Libera o PipeWriter, que envia os bytes para o dispositivo subjacente.

O método anterior de escrita usa os buffers fornecidos pelo PipeWriter. Poderia também ter utilizado PipeWriter.WriteAsync, que:

  • Copia o buffer existente para o PipeWriterarquivo .
  • Chama GetSpan, Advance conforme apropriado e chama FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

Cancelamento

FlushAsync suporta a passagem de um CancellationTokenarquivo . Passar um CancellationToken resulta em um OperationCanceledException se o token for cancelado enquanto há uma liberação pendente. PipeWriter.FlushAsync suporta uma maneira de cancelar a operação de descarga atual via PipeWriter.CancelPendingFlush sem gerar uma exceção. A chamada PipeWriter.CancelPendingFlush faz com que a chamada atual ou seguinte retorne PipeWriter.FlushAsync PipeWriter.WriteAsync um FlushResult com IsCanceled definido como true. Isto pode ser útil para interromper a descarga de rendimento de uma forma não destrutiva e não excecional.

Problemas comuns do PipeWriter

  • GetSpan e GetMemory retornar um buffer com pelo menos a quantidade solicitada de memória. Não assuma tamanhos exatos de buffer.
  • Não há garantia de que chamadas sucessivas retornarão o mesmo buffer ou o mesmo buffer de tamanho.
  • Um novo buffer deve ser solicitado após a chamada Advance para continuar gravando mais dados. O buffer adquirido anteriormente não pode ser gravado.
  • Ligar GetMemory ou GetSpan fazer uma chamada FlushAsync incompleta não é seguro.
  • Ligar Complete ou CompleteAsync enquanto há dados não liberados pode resultar em corrupção de memória.

Dicas para usar o PipeReader e o PipeWriter

As seguintes dicas irão ajudá-lo a usar as System.IO.Pipelines aulas com sucesso:

  • Sempre preencha o PipeReader e o PipeWriter, incluindo uma exceção quando aplicável.
  • Sempre ligue PipeReader.AdvanceTo depois de ligar PipeReader.ReadAsync.
  • Periodicamente await PipeWriter.FlushAsync enquanto escreve, e sempre verificar FlushResult.IsCompleted. Aborte a escrita se IsCompleted for true, pois isso indica que o leitor está concluído e não se importa mais com o que está escrito.
  • Ligue PipeWriter.FlushAsync depois de escrever algo que você deseja que o PipeReader tenha acesso.
  • Não ligue FlushAsync se o leitor não puder iniciar até FlushAsync terminar, pois isso pode causar um impasse.
  • Certifique-se de que apenas um contexto "possui" um PipeReader ou PipeWriter acessa-os. Esses tipos não são thread-safe.
  • Nunca aceda a um ReadResult.Buffer depois de ligar AdvanceTo ou concluir o PipeReaderficheiro .

IDuplexPipe

Trata-se IDuplexPipe de um contrato para tipos que apoiam tanto a leitura como a escrita. Por exemplo, uma conexão de rede seria representada por um IDuplexPipearquivo .

Ao contrário Pipede , que contém a PipeReader e a PipeWriter, IDuplexPipe representa um único lado de uma conexão full duplex. Isso significa que o que está escrito no testamento PipeWriter não será lido a partir do PipeReader.

Fluxos

Ao ler ou gravar dados de fluxo, você normalmente lê dados usando um desserializador e grava dados usando um serializador. A maioria dessas APIs de fluxo de leitura e gravação tem um Stream parâmetro. Para facilitar a integração com essas APIs PipeReader existentes e PipeWriter expor um AsStream método. AsStream retorna uma Stream implementação em torno do PipeReader ou PipeWriter.

Exemplo de fluxo

PipeReader e PipeWriter as instâncias podem ser criadas usando os métodos estáticos Create dados a um Stream objeto e opções de criação correspondentes opcionais.

A StreamPipeReaderOptions permissão para controle sobre a criação da instância com os PipeReader seguintes parâmetros:

A StreamPipeWriterOptions permissão para controle sobre a criação da instância com os PipeWriter seguintes parâmetros:

Importante

Ao criar PipeReader e PipeWriter instâncias usando os Create métodos, você precisa considerar o tempo de vida do Stream objeto. Se você precisar acessar o fluxo depois que o leitor ou gravador terminar de usá-lo, você precisará definir o LeaveOpen sinalizador nas true opções de criação. Caso contrário, o fluxo será fechado.

O código a seguir demonstra a criação de PipeReader instâncias e PipeWriter usando os Create métodos de um fluxo.

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

O aplicativo usa um StreamReader para ler o arquivo lorem-ipsum.txt como um fluxo e deve terminar com uma linha em branco. O FileStream é passado para PipeReader.Create, que instancia um PipeReader objeto. Em seguida, o aplicativo de console passa seu fluxo de saída padrão para PipeWriter.Create o uso do Console.OpenStandardOutput(). O exemplo suporta cancelamento.