Training
Module
Connect commands into a pipeline - Training
In this module, you'll learn how to connect commands into a pipeline.
This browser is no longer supported.
Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
System.IO.Pipelines is a library that is designed to make it easier to do high-performance I/O in .NET. It's a library targeting .NET Standard that works on all .NET implementations.
The library is available in the System.IO.Pipelines Nuget package.
Apps that parse streaming data are composed of boilerplate code having many specialized and unusual code flows. The boilerplate and special case code is complex and difficult to maintain.
System.IO.Pipelines
was architected to:
The following code is typical for a TCP server that receives line-delimited messages (delimited by '\n'
) from a client:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
The preceding code has several problems:
ReadAsync
.stream.ReadAsync
. stream.ReadAsync
returns how much data was read.ReadAsync
call.byte
array with each read.To fix the preceding problems, the following changes are required:
Buffer the incoming data until a new line is found.
Parse all the lines returned in the buffer.
It's possible that the line is bigger than 1 KB (1024 bytes). The code needs to resize the input buffer until the delimiter is found in order to fit the complete line inside the buffer.
Consider using buffer pooling to avoid allocating memory repeatedly.
The following code addresses some of these problems:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
The previous code is complex and doesn't address all the problems identified. High-performance networking usually means writing complex code to maximize performance. System.IO.Pipelines
was designed to make writing this type of code easier.
The Pipe class can be used to create a PipeWriter/PipeReader
pair. All data written into the PipeWriter
is available in the PipeReader
:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
There are two loops:
FillPipeAsync
reads from the Socket
and writes to the PipeWriter
.ReadPipeAsync
reads from the PipeReader
and parses incoming lines.There are no explicit buffers allocated. All buffer management is delegated to the PipeReader
and PipeWriter
implementations. Delegating buffer management makes it easier for consuming code to focus solely on the business logic.
In the first loop:
PipeWriter
how much data was written to the buffer.PipeReader
.In the second loop, the PipeReader
consumes the buffers written by PipeWriter
. The buffers come from the socket. The call to PipeReader.ReadAsync
:
Returns a ReadResult that contains two important pieces of information:
ReadOnlySequence<byte>
.IsCompleted
that indicates if the end of data (EOF) has been reached.After finding the end of line (EOL) delimiter and parsing the line:
PipeReader.AdvanceTo
is called to tell the PipeReader
how much data has been consumed and examined.The reader and writer loops end by calling Complete
. Complete
lets the underlying Pipe release the memory it allocated.
Ideally, reading and parsing work together:
Typically, parsing takes more time than just copying blocks of data from the network:
For optimal performance, there's a balance between frequent pauses and allocating more memory.
To solve the preceding problem, the Pipe
has two settings to control the flow of data:
PipeWriter.FlushAsync
resume.ValueTask<FlushResult>
when the amount of data in the Pipe
crosses PauseWriterThreshold
.ValueTask<FlushResult>
when it becomes lower than ResumeWriterThreshold
.Two values are used to prevent rapid cycling, which can occur if one value is used.
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
Typically when using async
and await
, asynchronous code resumes on either a TaskScheduler or the current SynchronizationContext.
When doing I/O, it's important to have fine-grained control over where the I/O is performed. This control allows taking advantage of CPU caches effectively. Efficient caching is critical for high-performance apps like web servers. PipeScheduler provides control over where asynchronous callbacks run. By default:
SynchronizationContext
, it uses the thread pool to run callbacks.public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool is the PipeScheduler implementation that queues callbacks to the thread pool. PipeScheduler.ThreadPool
is the default and generally the best choice. PipeScheduler.Inline can cause unintended consequences such as deadlocks.
It's frequently efficient to reuse the Pipe
object. To reset the pipe, call PipeReader Reset when both the PipeReader
and PipeWriter
are complete.
PipeReader manages memory on the caller's behalf. Always call PipeReader.AdvanceTo after calling PipeReader.ReadAsync. This lets the PipeReader
know when the caller is done with the memory so that it can be tracked. The ReadOnlySequence<byte>
returned from PipeReader.ReadAsync
is only valid until the call the PipeReader.AdvanceTo
. It's illegal to use ReadOnlySequence<byte>
after calling PipeReader.AdvanceTo
.
PipeReader.AdvanceTo
takes two SequencePosition arguments:
Marking data as consumed means that the pipe can return the memory to the underlying buffer pool. Marking data as observed controls what the next call to PipeReader.ReadAsync
does. Marking everything as observed means that the next call to PipeReader.ReadAsync
won't return until there's more data written to the pipe. Any other value will make the next call to PipeReader.ReadAsync
return immediately with the observed and unobserved data, but not data that has already been consumed.
There are a couple of typical patterns that emerge when trying to read streaming data:
The following examples use the TryParseLines
method for parsing messages from a ReadOnlySequence<byte>
. TryParseLines
parses a single message and updates the input buffer to trim the parsed message from the buffer. TryParseLines
isn't part of .NET, it's a user written method used in the following sections.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
The following code reads a single message from a PipeReader
and returns it to the caller.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
The preceding code:
SequencePosition
and examined SequencePosition
to point to the start of the trimmed input buffer.The two SequencePosition
arguments are updated because TryParseLines
removes the parsed message from the input buffer. Generally, when parsing a single message from the buffer, the examined position should be one of the following:
The single message case has the most potential for errors. Passing the wrong values to examined can result in an out of memory exception or an infinite loop. For more information, see the PipeReader common problems section in this article.
The following code reads all messages from a PipeReader
and calls ProcessMessageAsync
on each.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader.ReadAsync
:
CancellationToken
is canceled while there's a read pending.PipeReader.CancelPendingRead
causes the current or next call to PipeReader.ReadAsync
to return a ReadResult with IsCanceled
set to true
. This can be useful for halting the existing read loop in a non-destructive and non-exceptional way.private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Passing the wrong values to consumed
or examined
may result in reading already read data.
Passing buffer.End
as examined may result in:
PipeReader.AdvanceTo(position, buffer.End)
when processing a single message at a time from the buffer.Passing the wrong values to consumed
or examined
may result in an infinite loop. For example, PipeReader.AdvanceTo(buffer.Start)
if buffer.Start
hasn't changed will cause the next call to PipeReader.ReadAsync
to return immediately before new data arrives.
Passing the wrong values to consumed
or examined
may result in infinite buffering (eventual OOM).
Using the ReadOnlySequence<byte>
after calling PipeReader.AdvanceTo
may result in memory corruption (use after free).
Failing to call PipeReader.Complete/CompleteAsync
may result in a memory leak.
Checking ReadResult.IsCompleted and exiting the reading logic before processing the buffer results in data loss. The loop exit condition should be based on ReadResult.Buffer.IsEmpty
and ReadResult.IsCompleted
. Doing this incorrectly could result in an infinite loop.
❌ Data loss
The ReadResult
can return the final segment of data when IsCompleted
is set to true
. Not reading that data before exiting the read loop will result in data loss.
Warning
Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader Common problems.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Warning
Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader Common problems.
❌ Infinite loop
The following logic may result in an infinite loop if the Result.IsCompleted
is true
but there's never a complete message in the buffer.
Warning
Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader Common problems.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Warning
Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader Common problems.
Here's another piece of code with the same problem. It's checking for a non-empty buffer before checking ReadResult.IsCompleted
. Because it's in an else if
, it will loop forever if there's never a complete message in the buffer.
Warning
Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader Common problems.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Warning
Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader Common problems.
❌ Unresponsive application
Unconditionally calling PipeReader.AdvanceTo
with buffer.End
in the examined
position may result in the application becoming unresponsive when parsing a single message. The next call to PipeReader.AdvanceTo
won't return until:
Warning
Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader Common problems.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Warning
Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader Common problems.
❌ Out of Memory (OOM)
With the following conditions, the following code keeps buffering until an OutOfMemoryException occurs:
PipeReader
doesn't make a complete message. For example, it doesn't make a complete message because the other side is writing a large message (For example, a 4-GB message).Warning
Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader Common problems.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Warning
Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader Common problems.
❌ Memory Corruption
When writing helpers that read the buffer, any returned payload should be copied before calling Advance
. The following example will return memory that the Pipe
has discarded and may reuse it for the next operation (read/write).
Warning
Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader Common problems.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Warning
Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader Common problems.
The PipeWriter manages buffers for writing on the caller's behalf. PipeWriter
implements IBufferWriter<byte>
. IBufferWriter<byte>
makes it possible to get access to buffers to perform writes without extra buffer copies.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
The previous code:
PipeWriter
using GetMemory."Hello"
to the returned Memory<byte>
.PipeWriter
, which sends the bytes to the underlying device.The previous method of writing uses the buffers provided by the PipeWriter
. It could also have used PipeWriter.WriteAsync, which:
PipeWriter
.GetSpan
, Advance
as appropriate and calls FlushAsync.async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
FlushAsync supports passing a CancellationToken. Passing a CancellationToken
results in an OperationCanceledException
if the token is canceled while there's a flush pending. PipeWriter.FlushAsync
supports a way to cancel the current flush operation via PipeWriter.CancelPendingFlush without raising an exception. Calling PipeWriter.CancelPendingFlush
causes the current or next call to PipeWriter.FlushAsync
or PipeWriter.WriteAsync
to return a FlushResult with IsCanceled
set to true
. This can be useful for halting the yielding flush in a non-destructive and non-exceptional way.
GetMemory
or GetSpan
while there's an incomplete call to FlushAsync
isn't safe.Complete
or CompleteAsync
while there's unflushed data can result in memory corruption.The following tips will help you use the System.IO.Pipelines classes successfully:
await
PipeWriter.FlushAsync while writing, and always check FlushResult.IsCompleted. Abort writing if IsCompleted
is true
, as that indicates the reader is completed and no longer cares about what is written.PipeReader
to have access to.FlushAsync
if the reader can't start until FlushAsync
finishes, as that may cause a deadlock.PipeReader
or PipeWriter
or accesses them. These types are not thread-safe.AdvanceTo
or completing the PipeReader
.The IDuplexPipe is a contract for types that support both reading and writing. For example, a network connection would be represented by an IDuplexPipe
.
Unlike Pipe
, which contains a PipeReader
and a PipeWriter
, IDuplexPipe
represents a single side of a full duplex connection. That means what is written to the PipeWriter
will not be read from the PipeReader
.
When reading or writing stream data, you typically read data using a de-serializer and write data using a serializer. Most of these read and write stream APIs have a Stream
parameter. To make it easier to integrate with these existing APIs, PipeReader
and PipeWriter
expose an AsStream method. AsStream returns a Stream
implementation around the PipeReader
or PipeWriter
.
PipeReader
and PipeWriter
instances can be created using the static Create
methods given a Stream object and optional corresponding creation options.
The StreamPipeReaderOptions allow for control over the creation of the PipeReader
instance with the following parameters:
4096
.PipeReader
completes, and defaults to false
.1024
.MemoryPool<byte>
used when allocating memory, and defaults to null
.The StreamPipeWriterOptions allow for control over the creation of the PipeWriter
instance with the following parameters:
PipeWriter
completes, and defaults to false
.4096
.MemoryPool<byte>
used when allocating memory, and defaults to null
.Important
When creating PipeReader
and PipeWriter
instances using the Create
methods, you need to consider the Stream
object lifetime. If you need access to the stream after the reader or writer is done with it, you'll need to set the LeaveOpen
flag to true
on the creation options. Otherwise, the stream will be closed.
The following code demonstrates the creation of PipeReader
and PipeWriter
instances using the Create
methods from a stream.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
The application uses a StreamReader to read the lorem-ipsum.txt file as a stream, and it must end with a blank line. The FileStream is passed to PipeReader.Create, which instantiates a PipeReader
object. The console application then passes its standard output stream to PipeWriter.Create using Console.OpenStandardOutput(). The example supports cancellation.
.NET feedback
.NET is an open source project. Select a link to provide feedback:
Training
Module
Connect commands into a pipeline - Training
In this module, you'll learn how to connect commands into a pipeline.