Edit

Share via


System.IO.Pipelines in .NET

System.IO.Pipelines is a library that is designed to make it easier to do high-performance I/O in .NET. It's a library targeting .NET Standard that works on all .NET implementations.

The library is available in the System.IO.Pipelines Nuget package.

What problem does System.IO.Pipelines solve

Apps that parse streaming data are composed of boilerplate code having many specialized and unusual code flows. The boilerplate and special case code is complex and difficult to maintain.

System.IO.Pipelines was architected to:

  • Have high performance parsing streaming data.
  • Reduce code complexity.

The following code is typical for a TCP server that receives line-delimited messages (delimited by '\n') from a client:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

The preceding code has several problems:

  • The entire message (end of line) might not be received in a single call to ReadAsync.
  • It's ignoring the result of stream.ReadAsync. stream.ReadAsync returns how much data was read.
  • It doesn't handle the case where multiple lines are read in a single ReadAsync call.
  • It allocates a byte array with each read.

To fix the preceding problems, the following changes are required:

  • Buffer the incoming data until a new line is found.

  • Parse all the lines returned in the buffer.

  • It's possible that the line is bigger than 1 KB (1024 bytes). The code needs to resize the input buffer until the delimiter is found in order to fit the complete line inside the buffer.

    • If the buffer is resized, more buffer copies are made as longer lines appear in the input.
    • To reduce wasted space, compact the buffer used for reading lines.
  • Consider using buffer pooling to avoid allocating memory repeatedly.

  • The following code addresses some of these problems:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

The previous code is complex and doesn't address all the problems identified. High-performance networking usually means writing complex code to maximize performance. System.IO.Pipelines was designed to make writing this type of code easier.

Pipe

The Pipe class can be used to create a PipeWriter/PipeReader pair. All data written into the PipeWriter is available in the PipeReader:

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Pipe basic usage

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

There are two loops:

  • FillPipeAsync reads from the Socket and writes to the PipeWriter.
  • ReadPipeAsync reads from the PipeReader and parses incoming lines.

There are no explicit buffers allocated. All buffer management is delegated to the PipeReader and PipeWriter implementations. Delegating buffer management makes it easier for consuming code to focus solely on the business logic.

In the first loop:

In the second loop, the PipeReader consumes the buffers written by PipeWriter. The buffers come from the socket. The call to PipeReader.ReadAsync:

  • Returns a ReadResult that contains two important pieces of information:

    • The data that was read in the form of ReadOnlySequence<byte>.
    • A boolean IsCompleted that indicates if the end of data (EOF) has been reached.

After finding the end of line (EOL) delimiter and parsing the line:

  • The logic processes the buffer to skip what's already processed.
  • PipeReader.AdvanceTo is called to tell the PipeReader how much data has been consumed and examined.

The reader and writer loops end by calling Complete. Complete lets the underlying Pipe release the memory it allocated.

Backpressure and flow control

Ideally, reading and parsing work together:

  • The reading thread consumes data from the network and puts it in buffers.
  • The parsing thread is responsible for constructing the appropriate data structures.

Typically, parsing takes more time than just copying blocks of data from the network:

  • The reading thread gets ahead of the parsing thread.
  • The reading thread has to either slow down or allocate more memory to store the data for the parsing thread.

For optimal performance, there's a balance between frequent pauses and allocating more memory.

To solve the preceding problem, the Pipe has two settings to control the flow of data:

Diagram with ResumeWriterThreshold and PauseWriterThreshold

PipeWriter.FlushAsync:

  • Returns an incomplete ValueTask<FlushResult> when the amount of data in the Pipe crosses PauseWriterThreshold.
  • Completes ValueTask<FlushResult> when it becomes lower than ResumeWriterThreshold.

Two values are used to prevent rapid cycling, which can occur if one value is used.

Examples

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

Typically when using async and await, asynchronous code resumes on either a TaskScheduler or the current SynchronizationContext.

When doing I/O, it's important to have fine-grained control over where the I/O is performed. This control allows taking advantage of CPU caches effectively. Efficient caching is critical for high-performance apps like web servers. PipeScheduler provides control over where asynchronous callbacks run. By default:

  • The current SynchronizationContext is used.
  • If there's no SynchronizationContext, it uses the thread pool to run callbacks.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPool is the PipeScheduler implementation that queues callbacks to the thread pool. PipeScheduler.ThreadPool is the default and generally the best choice. PipeScheduler.Inline can cause unintended consequences such as deadlocks.

Pipe reset

It's frequently efficient to reuse the Pipe object. To reset the pipe, call PipeReader Reset when both the PipeReader and PipeWriter are complete.

PipeReader

PipeReader manages memory on the caller's behalf. Always call PipeReader.AdvanceTo after calling PipeReader.ReadAsync. This lets the PipeReader know when the caller is done with the memory so that it can be tracked. The ReadOnlySequence<byte> returned from PipeReader.ReadAsync is only valid until the call the PipeReader.AdvanceTo. It's illegal to use ReadOnlySequence<byte> after calling PipeReader.AdvanceTo.

PipeReader.AdvanceTo takes two SequencePosition arguments:

  • The first argument determines how much memory was consumed.
  • The second argument determines how much of the buffer was observed.

Marking data as consumed means that the pipe can return the memory to the underlying buffer pool. Marking data as observed controls what the next call to PipeReader.ReadAsync does. Marking everything as observed means that the next call to PipeReader.ReadAsync won't return until there's more data written to the pipe. Any other value will make the next call to PipeReader.ReadAsync return immediately with the observed and unobserved data, but not data that has already been consumed.

Read streaming data scenarios

There are a couple of typical patterns that emerge when trying to read streaming data:

  • Given a stream of data, parse a single message.
  • Given a stream of data, parse all available messages.

The following examples use the TryParseLines method for parsing messages from a ReadOnlySequence<byte>. TryParseLines parses a single message and updates the input buffer to trim the parsed message from the buffer. TryParseLines isn't part of .NET, it's a user written method used in the following sections.

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Read a single message

The following code reads a single message from a PipeReader and returns it to the caller.

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

The preceding code:

  • Parses a single message.
  • Updates the consumed SequencePosition and examined SequencePosition to point to the start of the trimmed input buffer.

The two SequencePosition arguments are updated because TryParseLines removes the parsed message from the input buffer. Generally, when parsing a single message from the buffer, the examined position should be one of the following:

  • The end of the message.
  • The end of the received buffer if no message was found.

The single message case has the most potential for errors. Passing the wrong values to examined can result in an out of memory exception or an infinite loop. For more information, see the PipeReader common problems section in this article.

Reading multiple messages

The following code reads all messages from a PipeReader and calls ProcessMessageAsync on each.

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Cancellation

PipeReader.ReadAsync:

  • Supports passing a CancellationToken.
  • Throws an OperationCanceledException if the CancellationToken is canceled while there's a read pending.
  • Supports a way to cancel the current read operation via PipeReader.CancelPendingRead, which avoids raising an exception. Calling PipeReader.CancelPendingRead causes the current or next call to PipeReader.ReadAsync to return a ReadResult with IsCanceled set to true. This can be useful for halting the existing read loop in a non-destructive and non-exceptional way.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

PipeReader common problems

  • Passing the wrong values to consumed or examined may result in reading already read data.

  • Passing buffer.End as examined may result in:

    • Stalled data
    • Possibly an eventual Out of Memory (OOM) exception if data isn't consumed. For example, PipeReader.AdvanceTo(position, buffer.End) when processing a single message at a time from the buffer.
  • Passing the wrong values to consumed or examined may result in an infinite loop. For example, PipeReader.AdvanceTo(buffer.Start) if buffer.Start hasn't changed will cause the next call to PipeReader.ReadAsync to return immediately before new data arrives.

  • Passing the wrong values to consumed or examined may result in infinite buffering (eventual OOM).

  • Using the ReadOnlySequence<byte> after calling PipeReader.AdvanceTo may result in memory corruption (use after free).

  • Failing to call PipeReader.Complete/CompleteAsync may result in a memory leak.

  • Checking ReadResult.IsCompleted and exiting the reading logic before processing the buffer results in data loss. The loop exit condition should be based on ReadResult.Buffer.IsEmpty and ReadResult.IsCompleted. Doing this incorrectly could result in an infinite loop.

Problematic code

Data loss

The ReadResult can return the final segment of data when IsCompleted is set to true. Not reading that data before exiting the read loop will result in data loss.

Warning

Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader Common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Warning

Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader Common problems.

Infinite loop

The following logic may result in an infinite loop if the Result.IsCompleted is true but there's never a complete message in the buffer.

Warning

Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader Common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Warning

Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader Common problems.

Here's another piece of code with the same problem. It's checking for a non-empty buffer before checking ReadResult.IsCompleted. Because it's in an else if, it will loop forever if there's never a complete message in the buffer.

Warning

Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader Common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Warning

Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader Common problems.

Unresponsive application

Unconditionally calling PipeReader.AdvanceTo with buffer.End in the examined position may result in the application becoming unresponsive when parsing a single message. The next call to PipeReader.AdvanceTo won't return until:

  • There's more data written to the pipe.
  • And the new data wasn't previously examined.

Warning

Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader Common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Warning

Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader Common problems.

Out of Memory (OOM)

With the following conditions, the following code keeps buffering until an OutOfMemoryException occurs:

  • There's no maximum message size.
  • The data returned from the PipeReader doesn't make a complete message. For example, it doesn't make a complete message because the other side is writing a large message (For example, a 4-GB message).

Warning

Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader Common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Warning

Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader Common problems.

Memory Corruption

When writing helpers that read the buffer, any returned payload should be copied before calling Advance. The following example will return memory that the Pipe has discarded and may reuse it for the next operation (read/write).

Warning

Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader Common problems.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Warning

Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader Common problems.

PipeWriter

The PipeWriter manages buffers for writing on the caller's behalf. PipeWriter implements IBufferWriter<byte>. IBufferWriter<byte> makes it possible to get access to buffers to perform writes without extra buffer copies.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

The previous code:

  • Requests a buffer of at least 5 bytes from the PipeWriter using GetMemory.
  • Writes bytes for the ASCII string "Hello" to the returned Memory<byte>.
  • Calls Advance to indicate how many bytes were written to the buffer.
  • Flushes the PipeWriter, which sends the bytes to the underlying device.

The previous method of writing uses the buffers provided by the PipeWriter. It could also have used PipeWriter.WriteAsync, which:

  • Copies the existing buffer to the PipeWriter.
  • Calls GetSpan, Advance as appropriate and calls FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

Cancellation

FlushAsync supports passing a CancellationToken. Passing a CancellationToken results in an OperationCanceledException if the token is canceled while there's a flush pending. PipeWriter.FlushAsync supports a way to cancel the current flush operation via PipeWriter.CancelPendingFlush without raising an exception. Calling PipeWriter.CancelPendingFlush causes the current or next call to PipeWriter.FlushAsync or PipeWriter.WriteAsync to return a FlushResult with IsCanceled set to true. This can be useful for halting the yielding flush in a non-destructive and non-exceptional way.

PipeWriter common problems

  • GetSpan and GetMemory return a buffer with at least the requested amount of memory. Don't assume exact buffer sizes.
  • There's no guarantee that successive calls will return the same buffer or the same-sized buffer.
  • A new buffer must be requested after calling Advance to continue writing more data. The previously acquired buffer can't be written to.
  • Calling GetMemory or GetSpan while there's an incomplete call to FlushAsync isn't safe.
  • Calling Complete or CompleteAsync while there's unflushed data can result in memory corruption.

Tips for using PipeReader and PipeWriter

The following tips will help you use the System.IO.Pipelines classes successfully:

  • Always complete the PipeReader and PipeWriter, including an exception where applicable.
  • Always call PipeReader.AdvanceTo after calling PipeReader.ReadAsync.
  • Periodically await PipeWriter.FlushAsync while writing, and always check FlushResult.IsCompleted. Abort writing if IsCompleted is true, as that indicates the reader is completed and no longer cares about what is written.
  • Do call PipeWriter.FlushAsync after writing something that you want the PipeReader to have access to.
  • Do not call FlushAsync if the reader can't start until FlushAsync finishes, as that may cause a deadlock.
  • Ensure that only one context "owns" a PipeReader or PipeWriter or accesses them. These types are not thread-safe.
  • Never access a ReadResult.Buffer after calling AdvanceTo or completing the PipeReader.

IDuplexPipe

The IDuplexPipe is a contract for types that support both reading and writing. For example, a network connection would be represented by an IDuplexPipe.

Unlike Pipe, which contains a PipeReader and a PipeWriter, IDuplexPipe represents a single side of a full duplex connection. That means what is written to the PipeWriter will not be read from the PipeReader.

Streams

When reading or writing stream data, you typically read data using a de-serializer and write data using a serializer. Most of these read and write stream APIs have a Stream parameter. To make it easier to integrate with these existing APIs, PipeReader and PipeWriter expose an AsStream method. AsStream returns a Stream implementation around the PipeReader or PipeWriter.

Stream example

PipeReader and PipeWriter instances can be created using the static Create methods given a Stream object and optional corresponding creation options.

The StreamPipeReaderOptions allow for control over the creation of the PipeReader instance with the following parameters:

The StreamPipeWriterOptions allow for control over the creation of the PipeWriter instance with the following parameters:

Important

When creating PipeReader and PipeWriter instances using the Create methods, you need to consider the Stream object lifetime. If you need access to the stream after the reader or writer is done with it, you'll need to set the LeaveOpen flag to true on the creation options. Otherwise, the stream will be closed.

The following code demonstrates the creation of PipeReader and PipeWriter instances using the Create methods from a stream.

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

The application uses a StreamReader to read the lorem-ipsum.txt file as a stream, and it must end with a blank line. The FileStream is passed to PipeReader.Create, which instantiates a PipeReader object. The console application then passes its standard output stream to PipeWriter.Create using Console.OpenStandardOutput(). The example supports cancellation.