System.IO.Pipelines

System.IO.Pipelines — это библиотека, предназначенная для упрощения высокопроизводительного ввода-вывода в .NET. Пакет разработан для .NET Standard для обеспечения широкой совместимости, а также для .NET Framework и современных версий .NET. В современных версиях .NET System.IO.Pipelines включен в общую платформу и не требует отдельного пакета NuGet.

Библиотека также доступна в виде пакета NuGet System.IO.Pipelines .

Какая проблема решает System.IO.Pipelines?

Приложения, которые анализируют потоковые данные, состоят из стандартного кода, имеющего множество специализированных и необычных потоков кода. Стандартный и специальный код сложен, и его трудно поддерживать.

System.IO.Pipelines было разработано для:

  • Обеспечьте высокопроизводительную обработку потоковых данных.
  • Упрощение кода.

Этот код обычно используется для TCP-сервера, который получает сообщения с разделителями строк (разделенные разделителями '\n') от клиента:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

В предыдущем коде несколько проблем:

  • Все сообщение (включая конец строки) может не быть получено в одном вызове ReadAsync.
  • Игнорируется результат stream.ReadAsync. stream.ReadAsync возвращает объем считанных данных.
  • Он не обрабатывает случай, когда несколько строк считываются в одном вызове ReadAsync.
  • Он выделяет массив byte при каждом чтении.

Чтобы устранить предыдущие проблемы, внесите следующие изменения:

  • Буферизуйте входящие данные, пока не будет найдена новая строка.

  • Проанализируйте все строки, возвращенные в буфер.

  • Строка может быть больше 1 КБ (1024 байта). Код должен изменить размер входного буфера, пока разделитель не будет найден, чтобы поместить полную строку внутри буфера.

    • Если размер буфера изменить, создаются дополнительные буферные копии, так как во входных данных отображаются более длинные строки.
    • Чтобы уменьшить объем неиспользуемого пространства, необходимо сжать буфер, используемый для чтения строк.
  • Рекомендуется использовать буферные пулы, чтобы избежать повторного выделения памяти.

  • Этот код устраняет некоторые из следующих проблем:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

Предыдущий код является сложным и не предназначен для устранения всех обнаруженных проблем. Высокопроизводительная сеть обычно означает написание сложного кода для повышения производительности. System.IO.Pipelines был разработан для упрощения написания этого типа кода.

Труба

Используйте класс Pipe, чтобы создать пару PipeWriter/PipeReader. Все данные, записанные в PipeWriter, доступны в PipeReader:

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Основное использование пайпа

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

Два цикла управляют чтением и записью:

  • FillPipeAsync считывает из Socket и выполняет запись в PipeWriter.
  • ReadPipeAsync считывает из PipeReader и анализирует входящие строки.

Явные буферы не выделяются. Управление всеми буферами делегируется реализациям PipeReader и PipeWriter. Делегирование управления буфером упрощает использование кода, чтобы сосредоточиться только на бизнес-логике.

В первом цикле:

  • PipeWriter.GetMemory(Int32) вызывается для получения памяти от базового модуля записи.
  • PipeWriter.Advance(Int32) вызывается, чтобы сообщить PipeWriter, сколько данных было записано в буфер.
  • PipeWriter.FlushAsync вызывается, чтобы сделать данные доступными для PipeReader.

Во втором цикле PipeReader использует буферы, записанные PipeWriter. Буферы поступают из сокета. Вызов PipeReader.ReadAsync:

  • Возвращает ReadResult, который содержит два важных элемента информации:

    • Данные, прочитанные в виде ReadOnlySequence<T>.
    • Логическое значение IsCompleted, указывающее, достигнут ли конец данных (EOF).

После нахождения разделителя конца строки (EOL) и синтаксического анализа строки:

  • Логика обрабатывает буфер, чтобы пропустить уже обработанные данные.
  • PipeReader.AdvanceTo вызывается, чтобы сообщить PipeReader, сколько данных было обработано и проверено.

Циклы чтения и записи завершаются вызовом PipeReader.Complete и PipeWriter.Complete. Вызов Complete освобождает память, выделенную базовым Pipe объектом.

Управление обратным давлением и потоком

В идеале чтение и анализ работают вместе:

  • Поток чтения потребляет данные из сети и помещает его в буферы.
  • Поток анализа отвечает за построение соответствующих структур данных.

Обычно синтаксический анализ занимает больше времени, чем копирование блоков данных из сети:

  • Поток чтения идет впереди потока синтаксического анализа.
  • Поток чтения должен либо замедлить свою работу, либо выделить больше памяти для хранения данных потока синтаксического анализа.

Для оптимальной производительности необходим баланс между частыми паузами и выделением большего объема памяти.

Чтобы устранить описанную выше проблему, Pipe имеет два параметра для управления потоком данных:

  • PauseWriterThreshold: определяет, сколько данных следует буферизовать перед приостановкой вызовов FlushAsync .
  • ResumeWriterThreshold: определяет, какой объем данных читателю нужно проанализировать перед возобновлением вызовов PipeWriter.FlushAsync.

Схема с ResumeWriterThreshold и PauseWriterThreshold

PipeWriter.FlushAsync:

  • Возвращает неполный ValueTask<FlushResult>, если объем данных в Pipe пересекает PauseWriterThreshold.
  • Завершает ValueTask<FlushResult>, когда он становится меньше ResumeWriterThreshold.

Для предотвращения быстрого цикла, который может возникнуть при одном значении, используются два значения.

Примеры

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

Планировщик труб

Обычно при использовании async и await, асинхронный код возобновляется либо на TaskScheduler, либо на текущем SynchronizationContext.

При выполнении операций ввода-вывода важно иметь точный контроль над местом выполнения операций ввода-вывода. Этот контроль позволяет эффективно использовать кэш ЦП. Эффективное кэширование является критически важным для высокопроизводительных приложений, таких как веб-серверы. PipeScheduler предоставляет контроль над тем, где выполняются асинхронные обратные вызовы. По умолчанию:

  • используется текущий SynchronizationContext.
  • Если SynchronizationContext отсутствует, он использует пул потоков для выполнения обратных вызовов.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPool — это реализация PipeScheduler, которая позволяет поместить обратные вызовы в очередь пула потоков. PipeScheduler.ThreadPool является параметром по умолчанию и, как правило, лучшим выбором. PipeScheduler.Inline может привести к непредвиденным последствиям, например к взаимоблокировкам.

Сброс потока

Повторное использование объекта Pipe часто бывает эффективным. Чтобы сбросить канал, вызовите PipeReaderReset при завершении PipeReader и PipeWriter.

PipeReader

PipeReader управляет памятью от имени вызывающего объекта. Всегда вызывайте PipeReader.AdvanceTo после вызова PipeReader.ReadAsync. Это позволяет PipeReader узнать, когда вызывающий объект завершил использование памяти, чтобы она могла быть отслежена. Возвращаемое значение ReadOnlySequence<T> из PipeReader.ReadAsync действительно только до вызова PipeReader.AdvanceTo. Использование ReadOnlySequence<T> после вызова PipeReader.AdvanceTo недопустимо.

PipeReader.AdvanceTo принимает два аргумента SequencePosition:

  • Первый аргумент определяет объем используемой памяти.
  • Второй аргумент определяет, какая часть буфера была видима.

Если пометить данные как потребленные, канал сможет вернуть память в базовый буферный пул. Отметка данных как наблюдаемых контролирует выполнение следующего вызова PipeReader.ReadAsync. Пометка всех элементов как отмеченных означает, что следующий вызов PipeReader.ReadAsync не вернется, пока в канал не будет записано больше данных. Любое другое значение вызывает следующий вызов PipeReader.ReadAsync, который немедленно возвращается с наблюдаемыми и ненаблюдаемыми данными, но не данными, которые уже были использованы.

Сценарии чтения потоковых данных

При чтении потоковых данных возникает несколько типичных шаблонов:

  • При получении потока данных анализировать одно сообщение.
  • При получении потока данных анализировать все доступные сообщения.

В этих примерах используется TryParseLines метод для синтаксического анализа сообщений из ReadOnlySequence<T>. TryParseLines анализирует одно сообщение и обновляет входной буфер, чтобы обрезать проанализированное сообщение из буфера. TryParseLines не является частью .NET; это метод, написанный пользователем, используемый в следующих разделах.

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Чтение одного сообщения

Этот код считывает одно сообщение из PipeReader объекта и возвращает его вызывающему объекту.

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

Предыдущий код:

  • анализирует одно сообщение.
  • Обновляет потребленный SequencePosition и изученный SequencePosition, чтобы указать на начало обрезанного входного буфера.

Два аргумента SequencePosition обновляются, так как TryParseLines удаляет проанализированное сообщение из входного буфера. Как правило, при синтаксическом анализе одного сообщения из буфера изученное расположение должно быть одним из следующих:

  • Конец сообщения.
  • Конец полученного буфера, если сообщение не найдено.

В случае с одним сообщением риск ошибок наиболее велик. Передача неправильных значений в examined может привести к исключению из-за нехватки памяти или бесконечному циклу. Дополнительные сведения см. в разделе Распространенные проблемы PipeReader в этой статье.

Внимание

ReadSingleMessageAsync не вызывает PipeReader.CompleteAsync. Вызывающий объект отвечает за завершение PipeReaderработы. Вызов PipeReader.CompleteAsync внутри ReadSingleMessageAsync сигнализирует о том, что больше данных не может быть прочитаны, что предотвращает чтение последующих сообщений.

Чтение нескольких сообщений

Этот код считывает все сообщения из PipeReader и вызывает ProcessMessageAsync для каждого.

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Поскольку ProcessMessagesAsync владеет полным циклом чтения сообщений, он вызывает PipeReader.CompleteAsync после его завершения. В отличие от случая с одним сообщением, вызывающему объекту не нужно завершать чтение. ProcessMessagesAsync полностью управляет жизненным циклом PipeReader.

Отмена

PipeReader.ReadAsync:

  • Поддерживает передачу CancellationToken.
  • Создает исключение OperationCanceledException, если CancellationToken отменяется при ожидании чтения.
  • Поддерживает способ отмены текущей операции чтения с помощью PipeReader.CancelPendingRead, что позволяет избежать исключения. Вызов PipeReader.CancelPendingRead приводит к тому, что текущий или следующий вызов PipeReader.ReadAsync возвращает ReadResult, где IsCanceled установлено в true. Это полезно для остановки существующего цикла чтения без деструкции и без генерации исключений.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Распространенные проблемы PipeReader

  • Передача неправильных значений consumed или examined может привести к чтению уже прочитанных данных.

  • Передача buffer.End в ходе проверки может привести к следующим результатам:

    • Остановленные данные
    • Возможное исключение из-за недостатка памяти (OOM), если данные не обрабатываются. Например, PipeReader.AdvanceTo(position, buffer.End) при обработке одного сообщения за раз из буфера.
  • Передача неправильных значений consumed или examined может привести к бесконечному циклу. Например, если buffer.Start не изменилось, это вызывает то, что следующий вызов PipeReader.ReadAsync возвращается непосредственно перед поступлением новых данных.

  • Передача неправильных значений consumed в или examined может привести к бесконечному буферизации (итоговая OOM).

  • Использование ReadOnlySequence<T> после вызова PipeReader.AdvanceTo может привести к повреждению памяти (использовать после бесплатного использования).

  • Сбой вызова Complete/CompleteAsync может привести к утечке памяти.

  • Проверка ReadResult.IsCompleted и выход из логики чтения до обработки буфера приводит к потере данных. Условие выхода цикла должно основываться на ReadResult.Buffer.IsEmpty и ReadResult.IsCompleted. Неправильное выполнение этого действия может привести к бесконечному циклу.

Проблемный код

Потеря данных

ReadResult может возвращать окончательный сегмент данных, если IsCompleted имеет значение true. Не считывание данных перед выходом из цикла чтения приводит к потере данных.

Предупреждение

НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и возможным проблемам безопасности, и поэтому его категорически НЕ следует копировать. Ниже приведен пример для объяснения распространенных проблем PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Предупреждение

НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и возможным проблемам безопасности, и поэтому его категорически НЕ следует копировать. Приведенный выше пример представлен для объяснения распространенных проблем PipeReader.

Бесконечный цикл

Следующая логика может привести к бесконечному циклу, если Result.IsCompleted — это true, но в буфере никогда не оказывается полного сообщения.

Предупреждение

НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и возможным проблемам безопасности, и поэтому его категорически НЕ следует копировать. Ниже приведен пример для объяснения распространенных проблем PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Предупреждение

НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и возможным проблемам безопасности, и поэтому его категорически НЕ следует копировать. Приведенный выше пример представлен для объяснения распространенных проблем PipeReader.

Вот еще один фрагмент кода с такой же проблемой. Проверяется наличие непустого буфера перед проверкой ReadResult.IsCompleted. Так как он находится в else ifбуфере, он циклизируется навсегда, если в буфере никогда не было полного сообщения.

Предупреждение

НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и возможным проблемам безопасности, и поэтому его категорически НЕ следует копировать. Ниже приведен пример для объяснения распространенных проблем PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Предупреждение

НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и возможным проблемам безопасности, и поэтому его категорически НЕ следует копировать. Приведенный выше пример представлен для объяснения распространенных проблем PipeReader.

Неответственное приложение

Безусловный вызов PipeReader.AdvanceTo с buffer.Endexamined позицией может привести к тому, что приложение не отвечает при анализе одного сообщения. Следующий вызов PipeReader.AdvanceTo не вернет значение до тех пор, пока не будут выполнены следующие условия:

  • В канал записано больше данных.
  • Новые данные не были проверены ранее.

Предупреждение

НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и возможным проблемам безопасности, и поэтому его категорически НЕ следует копировать. Ниже приведен пример для объяснения распространенных проблем PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Предупреждение

НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и возможным проблемам безопасности, и поэтому его категорически НЕ следует копировать. Приведенный выше пример представлен для объяснения распространенных проблем PipeReader.

Нехватка памяти

При следующих условиях этот код сохраняет буферизацию до тех пор, пока не произойдет событие OutOfMemoryException.

  • Максимальный размер сообщения не указан.
  • Данные, возвращенные из PipeReader, не составляют полное сообщение. Например, другая сторона записывает большое сообщение (например, сообщение размером 4 ГБ).

Предупреждение

НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и возможным проблемам безопасности, и поэтому его категорически НЕ следует копировать. Ниже приведен пример для объяснения распространенных проблем PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Предупреждение

НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и возможным проблемам безопасности, и поэтому его категорически НЕ следует копировать. Приведенный выше пример представлен для объяснения распространенных проблем PipeReader.

Повреждение памяти

При написании вспомогательных функций, которые считывают буфер, скопируйте любую возвращаемую полезную нагрузку перед вызовом Advance. В следующем примере возвращается память, которую Pipe освободил, и она может быть повторно использована для следующей операции (чтение/запись).

Предупреждение

НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и возможным проблемам безопасности, и поэтому его категорически НЕ следует копировать. Ниже приведен пример для объяснения распространенных проблем PipeReader.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Предупреждение

НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и возможным проблемам безопасности, и поэтому его категорически НЕ следует копировать. Приведенный выше пример представлен для объяснения распространенных проблем PipeReader.

PipeWriter

Управление буферами для записи от имени вызывающего объекта осуществляет PipeWriter. PipeWriter реализует IBufferWriter<byte>. IBufferWriter<byte> предоставляет доступ к буферам для выполнения операций записи без дополнительных копий буфера.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

Предыдущий код:

  • Запрашивает буфер длиной не менее 5 байт у PipeWriter с помощью GetMemory.
  • Записывает байты из строки ASCII "Hello" в указанный Memory<byte>.
  • Вызывает Advance, чтобы указать, сколько байтов было записано в буфер.
  • Очищает PipeWriter, который отправляет байты на базовое устройство.

Предыдущий метод записи использует буферы, предоставленные PipeWriter. Он также может использовать PipeWriter.WriteAsync, что:

  • Копирует существующий буфер в PipeWriter.
  • Вызовы GetSpan, Advance по мере необходимости и вызовы FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

Отмена

FlushAsync поддерживает передачу CancellationToken. Передача CancellationToken приводит к исключению OperationCanceledException, если маркер отменяется в ожидании освобождения. PipeWriter.FlushAsync поддерживает способ отмены текущей операции сброса с помощью PipeWriter.CancelPendingFlush без вызова исключения. Вызов PipeWriter.CancelPendingFlush приводит к тому, что текущий или следующий вызов PipeWriter.FlushAsync или PipeWriter.WriteAsync возвращает FlushResult с IsCanceled со значением true. Это полезно для остановки удаляющегося смыка в неразрушительном и неисключаемом способе.

Распространенные проблемы PipeWriter

  • GetSpan и GetMemory возвращают буфер по крайней мере с запрошенным объемом памяти. Не рассчитывайте на точный размер буфера.
  • Последовательные вызовы не гарантируют возврат того же самого буфера или буфера одинакового размера.
  • Чтобы продолжить запись дополнительных данных, необходимо запросить новый буфер после вызова Advance. Запись в ранее полученный буфер невозможна.
  • Вызов GetMemory или GetSpan при наличии незавершенного вызова FlushAsync не является надежным.
  • Вызов Complete или CompleteAsync при наличии незафысленных данных может привести к повреждению памяти.

Советы по PipeReader и PipeWriter

Используйте следующие советы, чтобы успешно использовать System.IO.Pipelines классы:

  • Всегда завершайте PipeReader и PipeWriter, обрабатывая исключение, если это применимо.
  • Всегда вызывайте PipeReader.AdvanceTo после вызова PipeReader.ReadAsync.
  • Периодически при написании awaitPipeWriter.FlushAsync и всегда проверяйте FlushResult.IsCompleted. Прервать запись, если IsCompleted является true, означает, что читатель завершил и больше не интересуется тем, что пишется.
  • После того, как вы напишете что-то, к чему вы хотите, чтобы PipeReader имел доступ, вызовите PipeWriter.FlushAsync.
  • Не вызывайте FlushAsync, если читатель не может начать работу до завершения FlushAsync, так как это может привести к зависанию.
  • Убедитесь, что только один контекст владеет PipeReader или PipeWriter, или обращается к ним. Эти типы не потокобезопасны.
  • Никогда не обращайтесь к ReadResult.Buffer после вызова PipeReader.AdvanceTo или завершения PipeReader.

IDuplexPipe

IDuplexPipe — это контракт для типов, которые поддерживают как чтение, так и запись. Например, сетевое подключение представлено IDuplexPipe.

В отличие от Pipe, который содержит PipeReader и PipeWriter, IDuplexPipe представляет собой одну сторону полного дуплексного подключения. То, что вы записываете в PipeWriter, не будет считываться из PipeReader.

Потоки

При чтении или записи потоковых данных данные обычно считываются с помощью десериализатора и записываются с помощью сериализатора. Большая часть API потока чтения и записи имеет параметр Stream. Чтобы упростить интеграцию с существующими API, PipeReader и PipeWriter предоставляют метод AsStream. AsStream возвращает реализацию Stream на основе PipeReader или PipeWriter.

Примеры потоков

Создайте PipeReader и PipeWriter экземпляры, используя статические Create методы, учитывая Stream объект и необязательные параметры создания.

StreamPipeReaderOptions позволяют контролировать создание экземпляра PipeReader со следующими параметрами:

  • StreamPipeReaderOptions.BufferSize — минимальный размер буфера в байтах, используемый при аренде памяти из пула, и значение по умолчанию 4096.
  • Флаг StreamPipeReaderOptions.LeaveOpen определяет, остается ли базовый поток открытым после завершения PipeReader, и его значение по умолчанию — false.
  • StreamPipeReaderOptions.MinimumReadSize представляет пороговое значение оставшихся байтов в буфере до выделения нового буфера, и его значение по умолчанию — 1024.
  • StreamPipeReaderOptions.Pool — это MemoryPool<byte>, используемый при выделении памяти, а значение по умолчанию — null.

StreamPipeWriterOptions позволяют контролировать создание экземпляра PipeWriter со следующими параметрами:

  • Флаг StreamPipeWriterOptions.LeaveOpen определяет, остается ли базовый поток открытым после завершения PipeWriter, и его значение по умолчанию — false.
  • StreamPipeWriterOptions.MinimumBufferSize представляет минимальный размер буфера, используемый при аренде памяти из Pool, и его значение по умолчанию — 4096.
  • StreamPipeWriterOptions.Pool — это MemoryPool<byte>, используемый при выделении памяти, а значение по умолчанию — null.

Внимание

При создании экземпляров PipeReader и PipeWriter с использованием методов Create, учитывайте время существования объекта Stream. Если вам нужен доступ к потоку после завершения чтения или записи, установите в параметрах создания флаг LeaveOpen на true. В противном случае поток закрыт.

Этот код демонстрирует создание экземпляров PipeReader и PipeWriter методами Create из потока.

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

Приложение использует StreamReader для чтения файла lorem-ipsum.txt в виде потока, и оно должно заканчиваться пустой строкой. FileStream передается в PipeReader.Create, который создает экземпляр объекта PipeReader. Затем консольное приложение передает стандартный выходной поток в PipeWriter.Create с помощью Console.OpenStandardOutput(). Пример поддерживает отмену.