培训
.NET 中的 System.IO.Pipelines
System.IO.Pipelines 是一个库,旨在使在 .NET 中执行高性能 I/O 更加容易。 该库的目标为适用于所有 .NET 实现的 .NET Standard。
该库可作为 System.IO.Pipelines Nuget 包提供。
分析流数据的应用由样板代码组成,后者由许多专门且不寻常的代码流组成。 样板代码和特殊情况代码很复杂且难以进行维护。
System.IO.Pipelines
已构建为:
- 具有高性能的流数据分析功能。
- 减少代码复杂性。
下面的代码是典型的 TCP 服务器,它从客户机接收行分隔的消息(由 '\n'
分隔):
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
前面的代码有几个问题:
- 单次调用
ReadAsync
可能无法接收整条消息(行尾)。 - 忽略了
stream.ReadAsync
的结果。stream.ReadAsync
返回读取的数据量。 - 它不能处理在单个
ReadAsync
调用中读取多行的情况。 - 它为每次读取分配一个
byte
数组。
要解决上述问题,需要进行以下更改:
缓冲传入的数据,直到找到新行。
分析缓冲区中返回的所有行。
该行可能大于 1KB(1024 字节)。 此代码需要调整输入缓冲区的大小,直到找到分隔符后,才能在缓冲区内容纳完整行。
- 如果调整缓冲区的大小,当输入中出现较长的行时,将生成更多缓冲区副本。
- 压缩用于读取行的缓冲区,以减少空余。
请考虑使用缓冲池来避免重复分配内存。
下面的代码解决了其中一些问题:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
前面的代码很复杂,不能解决所识别的所有问题。 高性能网络通常意味着编写复杂的代码以使性能最大化。 System.IO.Pipelines
的设计目的是使编写此类代码更容易。
Pipe 类可用于创建 PipeWriter/PipeReader
对。 写入 PipeWriter
的所有数据都可用于 PipeReader
:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
有两个循环:
FillPipeAsync
从Socket
读取并写入PipeWriter
。ReadPipeAsync
从PipeReader
读取并分析传入的行。
没有分配显式缓冲区。 所有缓冲区管理都委托给 PipeReader
和 PipeWriter
实现。 委派缓冲区管理使使用代码更容易集中关注业务逻辑。
在第一个循环中:
- 调用 PipeWriter.GetMemory(Int32) 从基础编写器获取内存。
- 调用 PipeWriter.Advance(Int32) 以告知
PipeWriter
有多少数据已写入缓冲区。 - 调用 PipeWriter.FlushAsync 以使数据可用于
PipeReader
。
在第二个循环中,PipeReader
使用由 PipeWriter
写入的缓冲区。 缓冲区来自套接字。 对 PipeReader.ReadAsync
的调用:
返回包含两条重要信息的 ReadResult:
- 以
ReadOnlySequence<byte>
形式读取的数据。 - 布尔值
IsCompleted
,指示是否已到达数据结尾 (EOF)。
- 以
找到行尾 (EOL) 分隔符并分析该行后:
- 该逻辑处理缓冲区以跳过已处理的内容。
- 调用
PipeReader.AdvanceTo
以告知PipeReader
已消耗和检查了多少数据。
读取器和编写器循环通过调用 Complete
结束。 Complete
使基础管道释放其分配的内存。
理想情况下,读取和分析可协同工作:
- 读取线程使用来自网络的数据并将其放入缓冲区。
- 分析线程负责构造适当的数据结构。
通常,分析所花费的时间比仅从网络复制数据块所用时间更长:
- 读取线程领先于分析线程。
- 读取线程必须减缓或分配更多内存来存储用于分析线程的数据。
为了获得最佳性能,需要在频繁暂停和分配更多内存之间取得平衡。
为解决上述问题,Pipe
提供了两个设置来控制数据流:
- PauseWriterThreshold:确定在调用 FlushAsync 暂停之前应缓冲多少数据。
- ResumeWriterThreshold:确定在恢复对
PipeWriter.FlushAsync
的调用之前,读取器必须观察多少数据。
- 当
Pipe
中的数据量超过PauseWriterThreshold
时,返回不完整的ValueTask<FlushResult>
。 - 低于
ResumeWriterThreshold
时,返回完整的ValueTask<FlushResult>
。
使用两个值可防止快速循环,如果只使用一个值,则可能发生这种循环。
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
通常在使用 async
和 await
时,异步代码会在 TaskScheduler 或当前 SynchronizationContext 上恢复。
在执行 I/O 时,对执行 I/O 的位置进行细粒度控制非常重要。 此控件允许高效利用 CPU 缓存。 高效的缓存对于 Web 服务器等高性能应用至关重要。 PipeScheduler 提供对异步回调运行位置的控制。 默认情况下:
- 使用当前的 SynchronizationContext。
- 如果没有
SynchronizationContext
,它将使用线程池运行回调。
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool 是 PipeScheduler 实现,用于对线程池的回调进行排队。 PipeScheduler.ThreadPool
是默认选项,通常也是最佳选项。 PipeScheduler.Inline 可能会导致意外后果,如死锁。
通常重用 Pipe
对象即可重置。 若要重置管道,请在 PipeReader
和 PipeWriter
完成时调用 PipeReader Reset。
PipeReader 代表调用方管理内存。 在调用 PipeReader.ReadAsync 之后始终调用 PipeReader.AdvanceTo。 这使 PipeReader
知道调用方何时用完内存,以便可以对其进行跟踪。 从 PipeReader.ReadAsync
返回的 ReadOnlySequence<byte>
仅在调用 PipeReader.AdvanceTo
之前有效。 调用 PipeReader.AdvanceTo
后,不能使用 ReadOnlySequence<byte>
。
PipeReader.AdvanceTo
采用两个 SequencePosition 参数:
- 第一个参数确定消耗的内存量。
- 第二个参数确定观察到的缓冲区数。
将数据标记为“已使用”意味着管道可以将内存返回到底层缓冲池。 将数据标记为“已观察”可控制对 PipeReader.ReadAsync
的下一个调用的操作。 将所有内容都标记为“已观察”意味着下次对 PipeReader.ReadAsync
的调用将不会返回,直到有更多数据写入管道。 任何其他值都将使对 PipeReader.ReadAsync
的下一次调用立即返回并包含已观察到的和未观察到的数据,但不是已被使用的数据。
尝试读取流数据时会出现以下几种典型模式:
- 给定数据流时,分析单条消息。
- 给定数据流时,分析所有可用消息。
以下示例使用 TryParseLines
方法分析来自 ReadOnlySequence<byte>
的消息。 TryParseLines
分析单条消息并更新输入缓冲区,以从缓冲区中剪裁已分析的消息。 TryParseLines
不是 .NET 的一部分,它是在以下部分中使用的用户编写的方法。
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
下面的代码从 PipeReader
读取一条消息并将其返回给调用方。
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
前面的代码:
- 分析单条消息。
- 更新已使用的
SequencePosition
并检查SequencePosition
以指向已剪裁的输入缓冲区的开始。
因为 TryParseLines
从输入缓冲区中删除了已分析的消息,所以更新了两个 SequencePosition
参数。 通常,分析来自缓冲区的单条消息时,检查的位置应为以下位置之一:
- 消息的结尾。
- 如果未找到消息,则返回接收缓冲区的结尾。
单条消息案例最有可能出现错误。 将错误的值传递给“已检查”可能会导致内存不足异常或无限循环 。 有关详细信息,请参阅本文中的 PipeReader 常见问题部分。
以下代码从 PipeReader
读取所有消息,并在每条消息上调用 ProcessMessageAsync
。
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader.ReadAsync
?
- 支持传递 CancellationToken。
- 如果在读取挂起期间取消了
CancellationToken
,则会引发 OperationCanceledException。 - 支持通过 PipeReader.CancelPendingRead 取消当前读取操作的方法,这样可以避免引发异常。 调用
PipeReader.CancelPendingRead
将导致对PipeReader.ReadAsync
的当前或下次调用返回 ReadResult,并将IsCanceled
设置为true
。 这对于以非破坏性和非异常的方式停止现有的读取循环非常有用。
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
将错误的值传递给
consumed
或examined
可能会导致读取已读取的数据。传递
buffer.End
作为检查对象可能会导致以下问题:- 数据停止
- 如果数据未使用,可能最终会出现内存不足 (OOM) 异常。 例如,当一次处理来自缓冲区的单条消息时,可能会出现
PipeReader.AdvanceTo(position, buffer.End)
。
将错误的值传递给
consumed
或examined
可能会导致无限循环。 例如,如果buffer.Start
没有更改,则PipeReader.AdvanceTo(buffer.Start)
将导致在下一个对PipeReader.ReadAsync
的调用在新数据到来之前立即返回。将错误的值传递给
consumed
或examined
可能会导致无限缓冲(最终导致 OOM)。在调用
PipeReader.AdvanceTo
之后使用ReadOnlySequence<byte>
可能会导致内存损坏(在释放之后使用)。未能调用
PipeReader.Complete/CompleteAsync
可能会导致内存泄漏。在处理缓冲区之前检查 ReadResult.IsCompleted 并退出读取逻辑会导致数据丢失。 循环退出条件应基于
ReadResult.Buffer.IsEmpty
和ReadResult.IsCompleted
。 如果错误执行此操作,可能会导致无限循环。
❌数据丢失
当 IsCompleted
被设置为 true
时,ReadResult
可能会返回最后一段数据。 在退出读循环之前不读取该数据将导致数据丢失。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 以下示例用于解释 PipeReader 常见问题。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 前面的示例用于解释 PipeReader 常见问题。
❌无限循环
如果 Result.IsCompleted
是 true
,则以下逻辑可能会导致无限循环,但缓冲区中永远不会有完整的消息。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 以下示例用于解释 PipeReader 常见问题。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 前面的示例用于解释 PipeReader 常见问题。
下面是另一段具有相同问题的代码。 该代码在检查 ReadResult.IsCompleted
之前检查非空缓冲区。 由于该代码位于 else if
中,如果缓冲区中没有完整的消息,它将永远循环。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 以下示例用于解释 PipeReader 常见问题。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 前面的示例用于解释 PipeReader 常见问题。
❌应用程序无响应
在分析单条消息时,如果无条件调用 PipeReader.AdvanceTo
而 buffer.End
位于 examined
位置,则可能导致应用程序变为无响应。 对 PipeReader.AdvanceTo
的下次调用将在以下情况下返回:
- 有更多数据写入管道。
- 以及之前未检查过新数据。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 以下示例用于解释 PipeReader 常见问题。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 前面的示例用于解释 PipeReader 常见问题。
❌内存不足 (OOM)
在满足以下条件的情况下,以下代码将保持缓冲,直到发生 OutOfMemoryException:
- 没有最大消息大小。
- 从
PipeReader
返回的数据不会生成完整的消息。 例如,它不会生成完整的消息,因为另一端正在编写一条大消息(例如,一条为 4GB 的消息)。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 以下示例用于解释 PipeReader 常见问题。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 前面的示例用于解释 PipeReader 常见问题。
❌内存损坏
当写入读取缓冲区的帮助程序时,应在调用 Advance
之前复制任何返回的有效负载。 下面的示例将返回 Pipe
已丢弃的内存,并可能将其重新用于下一个操作(读/写)。
警告
不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 以下示例用于解释 PipeReader 常见问题。
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
警告
不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 前面的示例用于解释 PipeReader 常见问题。
PipeWriter 管理用于代表调用方写入的缓冲区。 PipeWriter
可实现 IBufferWriter<byte>
。 IBufferWriter<byte>
使得无需额外的缓冲区副本就可以访问缓冲区来执行写入操作。
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
之前的代码:
- 使用 GetMemory 从
PipeWriter
请求至少 5 个字节的缓冲区。 - 将 ASCII 字符串
"Hello"
的字节写入返回的Memory<byte>
。 - 调用 Advance 以指示写入缓冲区的字节数。
- 刷新
PipeWriter
,以便将字节发送到基础设备。
以前的写入方法使用 PipeWriter
提供的缓冲区。 它可能还使用了 PipeWriter.WriteAsync,该项执行以下操作:
- 将现有缓冲区复制到
PipeWriter
。 - 根据需要调用
GetSpan
Advance
,然后调用 FlushAsync。
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
FlushAsync 支持传递 CancellationToken。 如果令牌在刷新挂起时被取消,则传递 CancellationToken
将导致 OperationCanceledException
。 PipeWriter.FlushAsync
支持通过 PipeWriter.CancelPendingFlush 取消当前刷新操作而不引发异常的方法。 调用 PipeWriter.CancelPendingFlush
将导致对 PipeWriter.FlushAsync
或 PipeWriter.WriteAsync
的当前或下次调用返回 FlushResult,并将 IsCanceled
设置为 true
。 这对于以非破坏性和非异常的方式停止暂停刷新非常有用。
- GetSpan 和 GetMemory 返回至少具有请求内存量的缓冲区。 请勿假设确切的缓冲区大小 。
- 无法保证连续的调用将返回相同的缓冲区或相同大小的缓冲区。
- 在调用 Advance 之后,必须请求一个新的缓冲区来继续写入更多数据。 不能写入先前获得的缓冲区。
- 如果未完成对
FlushAsync
的调用,则调用GetMemory
或GetSpan
将不安全。 - 如果未刷新数据,则调用
Complete
或CompleteAsync
可能导致内存损坏。
以下提示将帮助你成功使用 System.IO.Pipelines 类:
- 始终完成 PipeReader 和 PipeWriter(包括适用时的例外情况)。
- 在调用 PipeReader.ReadAsync 之后始终调用 PipeReader.AdvanceTo。
- 写入时定期
await
PipeWriter.FlushAsync,并始终检查 FlushResult.IsCompleted。 如果IsCompleted
为true
,则中止写入,因为这表示读取器已完成,不再关心所写入的内容。 - 在写入希望
PipeReader
有权访问的内容后调用 PipeWriter.FlushAsync。 - 如果读取器在
FlushAsync
完成之前无法启动,请勿调用FlushAsync
,因为这可能会导致死锁。 - 确保只有一个上下文“拥有”
PipeReader
或PipeWriter
或访问它们。 这些类型不是线程安全的。 - 调用
AdvanceTo
或完成PipeReader
后,切勿访问 ReadResult.Buffer。
IDuplexPipe 是支持读写的类型的协定。 例如,网络连接将由 IDuplexPipe
表示。
与包含 PipeReader
和 PipeWriter
的 Pipe
不同,IDuplexPipe
表示全双工连接的一侧。 这意味着写入 PipeWriter
的内容不会从 PipeReader
中读取。
在读取或写入流数据时,通常使用反序列化程序读取数据,并使用序列化程序写入数据。 大多数读取和写入流 API 都有一个 Stream
参数。 为了更轻松地与这些现有 API 集成,PipeReader
和 PipeWriter
公开了一个 AsStream 方法。 AsStream 返回围绕 PipeReader
或 PipeWriter
的 Stream
实现。
可使用给定了 Stream 对象和可选的相应创建选项的静态 Create
方法创建 PipeReader
和 PipeWriter
实例。
StreamPipeReaderOptions 允许使用以下参数控制 PipeReader
实例的创建:
- StreamPipeReaderOptions.BufferSize 是从池中租用内存时使用的最小缓冲区大小(以字节为单位),默认值为
4096
。 - StreamPipeReaderOptions.LeaveOpen 标志确定在
PipeReader
完成之后基础流是否保持打开状态,默认值为false
。 - StreamPipeReaderOptions.MinimumReadSize 表示分配新缓冲区之前缓冲区中剩余字节的阈值,默认值为
1024
。 - StreamPipeReaderOptions.Pool 是分配内存时使用的
MemoryPool<byte>
,默认值为null
。
StreamPipeWriterOptions 允许使用以下参数控制 PipeWriter
实例的创建:
- StreamPipeWriterOptions.LeaveOpen 标志确定在
PipeWriter
完成之后基础流是否保持打开状态,默认值为false
。 - StreamPipeWriterOptions.MinimumBufferSize 表示从 Pool 租用内存时要使用的最小缓冲区大小,默认值为
4096
。 - StreamPipeWriterOptions.Pool 是分配内存时使用的
MemoryPool<byte>
,默认值为null
。
重要
使用 Create
方法创建 PipeReader
和 PipeWriter
实例时,需要考虑 Stream
对象的生存期。 如果在读取器或编写器使用该方法完成操作后,你需要访问流,则需要在创建选项上将 LeaveOpen
标志设置为 true
。 否则,流将关闭。
以下代码演示了使用 Create
方法从流中创建 PipeReader
和 PipeWriter
实例。
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
应用程序使用 StreamReader 以流形式读取 lorem-ipsum.txt 文件,并且必须以空白行结尾。 FileStream 传递给 PipeReader.Create,后者实例化 PipeReader
对象。 然后,控制台应用程序使用 Console.OpenStandardOutput() 将其标准输出流传递到 PipeWriter.Create。 示例支持取消。
其他资源
文档
-
使用 System.IO.Pipelines 的高性能 IO
System.IO.Pipelines 诞生于 .NET Core 团队正在执行的工作,以便更轻松地在 .NET 中实现高性能 IO。在本集中,帕维尔·克里梅茨(@帕克里姆)和大卫·福勒(@davidfowl)将出现在节目中,让我们大致了解 Pipelines 编程模型的工作原理,并提供有关如何使用 API 的一些演示。[00:26] - System.IO.Pipelines 的理由是什么?[02:10] - 管道和流之间的性能比较[04:17] - 使用 Stream 时的担忧[09:42] - 移动到管道[13:45] - 客户端服务器演示[22:16] - 管道如何使用 C# 8 IAsyncEnumerable?[26:04] - 减少分配[28:46] - 管道入门有用链接NuGet 上的 System.IO.PipelinesSystem.IO.Pipelines:.NET 中的高性能 IOTCP 回显示例使用 ValueTask<T> 和 ValueTask 的无分配可等待异步操作 Techempower Web 框架基准.NET 设计评审:System.IO
-
详细了解:在 .NET 中使用缓冲区