.NET 中的 System.IO.Pipelines
System.IO.Pipelines 是一個程式庫,其設計目的是讓您更輕鬆地在 .NET 中執行高效能 I/O。 此程式庫鎖定 .NET Standard,適用於所有 .NET 實作上。
程式庫可在 System.IO.Pipelines Nuget 套件中使用。
System.IO.Pipelines 解決的問題
剖析串流資料的應用程式是由具有許多特殊且不常見程式碼流程的重複使用程式碼所組成。 重複使用和特殊案例代碼很複雜且難以維護。
System.IO.Pipelines
已將結構建立為:
- 具有高效能剖析串流資料。
- 降低程式碼複雜度。
下列程式碼通常適用於從用戶端接收以行分隔的訊息 (由 '\n'
分隔):
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
上述程式碼有數個問題:
- 整個訊息 (行結尾處) 可能不會在對
ReadAsync
的單一呼叫中收到。 - 它會忽略
stream.ReadAsync
的結果。stream.ReadAsync
會傳回讀取的資料量。 - 此程式碼不會處理在單一
ReadAsync
呼叫中讀取多行的情況。 - 此項目會為每個讀取配置
byte
陣列。
若要修正上述問題,需要進行下列變更:
緩衝傳入資料,直到找到新資料行為止。
剖析緩衝區中傳回的所有資料行。
資料行可能大於 1 KB (1024 個位元組)。 程式碼必須調整輸入緩衝區的大小,直到找到分隔符號,才能符合緩衝區內的完整資料行。
- 如果已調整緩衝區大小,則會在輸入中出現較長的資料行時,建立更多緩衝區複本。
- 若要減少浪費的空間,請壓縮用於讀取資料行的緩衝區。
請考慮使用緩衝集區來避免重複配置記憶體。
下列程式碼可解決其中一些問題:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
先前的程式碼很複雜,而且無法解決所有已識別的問題。 高效能網路通常表示需要撰寫複雜的程式碼以最大化效能。 System.IO.Pipelines
設計用於讓撰寫此類型的程式碼更容易。
Pipe
Pipe 類別可用來建立 PipeWriter/PipeReader
配對。 寫入至 PipeWriter
的所有資料都可在 PipeReader
中取得:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
管道基本用法
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
有兩個迴圈:
FillPipeAsync
會從Socket
進行讀取,並寫入至PipeWriter
。ReadPipeAsync
從PipeReader
進行讀取,並剖析傳入資料行。
未配置明確的緩衝區。 所有緩衝區管理都會委派給 PipeReader
和 PipeWriter
實作。 委派緩衝區管理可讓您更輕鬆取用程式碼,僅專注於業務邏輯。
在第一個迴圈中:
- 呼叫 PipeWriter.GetMemory(Int32) 以從基礎寫入器中取得記憶體。
- 呼叫 PipeWriter.Advance(Int32) 以告知
PipeWriter
寫入緩衝區的資料量。 - 呼叫 PipeWriter.FlushAsync,讓資料可供
PipeReader
使用。
在第二個迴圈中,PipeReader
會使用 PipeWriter
所寫入的緩衝區。 緩衝區來自通訊端。 呼叫 PipeReader.ReadAsync
:
傳回的 ReadResult 包含兩個重要資訊片段:
- 以
ReadOnlySequence<byte>
形式讀取的資料。 - 布林值
IsCompleted
,指出是否已達到資料結尾(EOF)。
- 以
尋找資料行結尾 (EOL) 分隔符號並剖析該資料行後:
- 邏輯會處理緩衝區,以略過已處理的內容。
- 呼叫
PipeReader.AdvanceTo
來告知PipeReader
已取用和檢查的資料量。
讀取器和寫入器會藉由呼叫 Complete
來結束迴圈。 Complete
可讓基礎管道釋放其配置的記憶體。
背壓和流程式控制項
理想情況下讀取和剖析會一起運作:
- 讀取執行緒會取用來自網路的資料,並將資料置於緩衝區中。
- 剖析執行緒負責建構適當的資料結構。
一般而言剖析所花費的時間,將超過從網路複製資料區塊:
- 讀取執行緒會比剖析執行緒更為優先。
- 讀取執行緒必須慢下來,或配置更多記憶體以儲存剖析執行緒的資料。
為了達到最佳效能,會在頻繁暫停和配置更多記憶體之間進行權衡。
若要解決上述問題,Pipe
有兩個可控制資料流程的設定:
- PauseWriterThreshold:決定在呼叫 FlushAsync 暫停之前,應該緩衝處理的資料量。
- ResumeWriterThreshold:決定在呼叫
PipeWriter.FlushAsync
暫停之前,應該緩衝處理的資料量。
- 當
Pipe
中的資料量超出PauseWriterThreshold
時,傳回不完整的ValueTask<FlushResult>
。 - 當
ValueTask<FlushResult>
低於ResumeWriterThreshold
時,將其完成。
使用兩個值來防止快速迴圈,如果使用一個值,可能會發生這種情況。
範例
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
PipeScheduler
通常使用 async
和 await
時,非同步程式碼會在 TaskScheduler 或目前的 SynchronizationContext 上恢復執行。
執行 I/O 時,請務必更精細地控制執行 I/O 的位置。 此控制項可讓您有效地利用 CPU 快取。 有效率進行快取對於 Web 服務器之類的高效能應用程式而言非常重要。 PipeScheduler 會提供非同步回呼執行位置的控制。 預設情況:
- 使用目前的 SynchronizationContext。
- 如果沒有
SynchronizationContext
,則項目會使用執行緒集區來執行回呼。
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool 為 PipeScheduler 實作,可將回呼排入執行緒集區佇列。 PipeScheduler.ThreadPool
是預設值,而通常也是最佳選擇。 PipeScheduler.Inline 可能會導致非預期的結果,例如死結。
管道重設
重複使用 Pipe
物件十分有效率。 若要重設管道,請在 PipeReader
和 PipeWriter
完成時,呼叫PipeReaderReset。
PipeReader
PipeReader 代表呼叫者管理記憶體。 呼叫 PipeReader.ReadAsync 後一律呼叫 PipeReader.AdvanceTo。 這可讓 PipeReader
知道呼叫者將記憶體使用完畢的時間,以便進行追蹤。 僅在呼叫 PipeReader.AdvanceTo
時,PipeReader.ReadAsync
傳回的 ReadOnlySequence<byte>
才會有效。 呼叫 PipeReader.AdvanceTo
之後,使用 ReadOnlySequence<byte>
不再合法。
PipeReader.AdvanceTo
會採用兩個 SequencePosition 引數:
- 第一個引數會決定耗用的記憶體量。
- 第二個引數會決定觀察到的緩衝區數目。
將資料標示為已取用,表示管道可以將記憶體傳回基礎緩衝集區。 將資料標示為觀察到的項目,可控制下一次呼叫的 PipeReader.ReadAsync
動作。 將一切標示為觀察到的項目,表示下一次呼叫 PipeReader.ReadAsync
時不會進行傳回,直到更多資料寫入管道為止。 任何其他值都會對 PipeReader.ReadAsync
進行下一次呼叫,以立即傳回觀察到和未觀察到的資料,但不會傳回已取用的資料。
讀取串流資料案例
嘗試讀取串流資料時,會出現幾個典型的模式:
- 指定資料流時,剖析單一訊息。
- 指定資料流時,剖析所有可用訊息。
下列範例會使用 TryParseLines
方法來剖析 ReadOnlySequence<byte>
的訊息。 TryParseLines
剖析單一訊息並更新輸入緩衝區,以修剪緩衝區中剖析的訊息。 TryParseLines
不屬於 .NET 的一部分,此項目為使用者撰寫方法,並將用於下列章節中。
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
讀取單一訊息
下列程式碼會從 PipeReader
讀取單一訊息,並將其傳回給呼叫者。
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
上述 程式碼:
- 剖析單一訊息。
- 更新已取用
SequencePosition
,並檢查SequencePosition
,以指向修剪輸入緩衝區的起點。
兩個 SequencePosition
引數會進行更新,因為 TryParseLines
會從輸入緩衝區移除剖析訊息。 一般而言,從緩衝區剖析單一訊息時,檢查的位置應該是下列其中一項:
- 訊息的結尾處。
- 如果找不到訊息,則為已接收緩衝區的結尾處。
單一訊息案例最有可能發生錯誤。 傳遞錯誤值以檢查可能導致記憶體不足的例外狀況或無限迴圈。 如需詳細資訊,請參閱本文中的 PipeReader 常見問題章節。
讀取多個訊息
下列程式碼會從 PipeReader
讀所有訊息,並在每個項目上呼叫 ProcessMessageAsync
。
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
取消
PipeReader.ReadAsync
:
- 支援傳遞 CancellationToken。
- 如果
CancellationToken
在讀取擱置時取消,則擲回 OperationCanceledException。 - 支援透過 PipeReader.CancelPendingRead 取消目前讀取作業,可避免引發例外狀況。 呼叫
PipeReader.CancelPendingRead
會導致目前或下一個呼叫的PipeReader.ReadAsync
傳回 ReadResult,並將IsCanceled
設定為true
。 這對於以非破壞性和非例外方式停止現有讀取迴圈十分有用。
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader 常見問題
將錯誤的值傳遞至
consumed
或examined
可能會導致讀取已讀取的資料。將
buffer.End
作為已檢查項目傳遞可能會導致下列結果:- 停止的資料
- 如果未取用資料,則會導致最終的記憶體不足 (OOM) 例外狀況。 例如從緩衝區一次處理單一訊息時,發生
PipeReader.AdvanceTo(position, buffer.End)
。
將錯誤的值傳遞至
consumed
或examined
可能會導致無限迴圈。 例如,PipeReader.AdvanceTo(buffer.Start)
如果buffer.Start
尚未變更,會導致下一次呼叫PipeReader.ReadAsync
在新的資料送達之前立即進行傳回。將錯誤的值傳遞至
consumed
或examined
可能會導致無限緩衝 (最終造成 OOM 情況)。在呼叫
PipeReader.AdvanceTo
後使用ReadOnlySequence<byte>
可能會導致記憶體損毀 (請在釋放後使用)。無法呼叫
PipeReader.Complete/CompleteAsync
可能會導致記憶體流失。在處理緩衝區之前檢查 ReadResult.IsCompleted 和結束讀取邏輯會導致資料遺失。 迴圈結束條件應以
ReadResult.Buffer.IsEmpty
和ReadResult.IsCompleted
為基礎。 不正確執行可能會導致無限迴圈。
有問題的程式碼
❌資料遺失
當 IsCompleted
設定為 true
時,ReadResult
可以傳回資料的最後一個區段。 在結束讀取迴圈之前,不讀取該資料會導致資料遺失。
警告
請勿使用下列程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供下列範例來說明 PipeReader 常見問題。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
警告
請勿使用上述程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供上述範例來說明 PipeReader 常見問題。
❌無限迴圈
如果 Result.IsCompleted
是 true
,下列邏輯可能會導致無限迴圈,但緩衝區中永遠不會有完整的訊息。
警告
請勿使用下列程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供下列範例來說明 PipeReader 常見問題。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
警告
請勿使用上述程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供上述範例來說明 PipeReader 常見問題。
以下是另一個有相同問題的程式碼片段。 此片段會先檢查非空白的緩衝區,再檢查 ReadResult.IsCompleted
。 因為此程式碼片段處於 else if
中,所以如果緩衝區中沒有完整的訊息,此片段就會永遠進行迴圈。
警告
請勿使用下列程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供下列範例來說明 PipeReader 常見問題。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
警告
請勿使用上述程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供上述範例來說明 PipeReader 常見問題。
❌沒有回應的應用程式
無條件地在 examined
位置使用 buffer.End
呼叫 PipeReader.AdvanceTo
,可能會導致應用程式在剖析單一訊息時變得沒有回應。 PipeReader.AdvanceTo
的下一個呼叫不會傳回,直到下列狀況發生為止:
- 還有更多資料要寫入管道。
- 而且先前未檢查到新的資料。
警告
請勿使用下列程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供下列範例來說明 PipeReader 常見問題。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
警告
請勿使用上述程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供上述範例來說明 PipeReader 常見問題。
❌記憶體不足 (OOM)
在下列情況下,下列程式碼會持續緩衝處理,直到 OutOfMemoryException 情況發生為止:
- 沒有訊息大小上限。
- 從
PipeReader
傳回的資料不會產生完整的訊息。 例如此資料不會建立完整的訊息,因為另一端為寫入大型訊息 (例如,4 GB 的訊息)。
警告
請勿使用下列程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供下列範例來說明 PipeReader 常見問題。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
警告
請勿使用上述程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供上述範例來說明 PipeReader 常見問題。
❌記憶體損毀
撰寫讀取緩衝區的協助程式時,應該先複製任何傳回的承載資料,再呼叫 Advance
。 下列範例會傳回 Pipe
捨棄的記憶體,可能會在下次作業 (讀取/寫入) 重複使用。
警告
請勿使用下列程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供下列範例來說明 PipeReader 常見問題。
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
警告
請勿使用上述程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供上述範例來說明 PipeReader 常見問題。
PipeWriter
PipeWriter 會管理代表呼叫者寫入的緩衝區。 PipeWriter
會實作 IBufferWriter<byte>
。 IBufferWriter<byte>
可讓您存取緩衝區來執行寫入,而不需額外的緩衝區複本。
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
先前的程式碼:
- 使用 GetMemory 從
PipeWriter
要求至少 5 個位元組的緩衝區。 - 將 ASCII 字串
"Hello"
的位元組寫入至傳回的Memory<byte>
。 - 呼叫 Advance 以指出寫入緩衝區的位元組數目。
- 排清
PipeWriter
以將位元組傳送至基礎裝置。
先前的寫入方法會使用 PipeWriter
所提供的緩衝區。 此寫入方法也可能已使用 PipeWriter.WriteAsync,其狀況如下:
- 會將現有的緩衝區複製到
PipeWriter
。 - 視需要呼叫
GetSpan
、Advance
,並呼叫 FlushAsync。
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
取消
FlushAsync 支援傳遞 CancellationToken。 如果權杖在排清擱置中時取消,則傳遞 CancellationToken
會導致 OperationCanceledException
。 PipeWriter.FlushAsync
支援透過 PipeWriter.CancelPendingFlush 取消目前排清作業,而不引發例外狀況。 呼叫 PipeWriter.CancelPendingFlush
會導致目前或下一個呼叫的 PipeWriter.FlushAsync
或 PipeWriter.WriteAsync
傳回 FlushResult,並將 IsCanceled
設定為 true
。 這對於以非破壞性和非例外方式停止暫止排清的狀況十分有用。
PipeWriter 常見問題
- GetSpan 和 GetMemory 會傳回具有最低要求記憶體數量的緩衝區。 請勿假設確切的緩衝區大小。
- 無法保證後續呼叫會傳回相同的緩衝區或大小相同的緩衝區。
- 呼叫 Advance 繼續寫入更多資料之後,必須要求新的緩衝區。 先前取得的緩衝區無法寫入。
- 呼叫
GetMemory
或GetSpan
,但呼叫FlushAsync
不完整且不安全。 - 有未排清資料時呼叫
Complete
或CompleteAsync
,可能會導致記憶體損毀。
使用 PipeReader 和 PipeWriter 的秘訣
下列秘訣將協助您成功使用 System.IO.Pipelines 類別:
- 請一律完成 PipeReader 和 PipeWriter,包括適用的例外狀況。
- 呼叫 PipeReader.ReadAsync 後請一律呼叫 PipeReader.AdvanceTo。
- 撰寫時定期
await
PipeWriter.FlushAsync,且一律檢查 FlushResult.IsCompleted。 如果IsCompleted
為true
,則中止寫入,因為這表示讀取器已完成,不再關心寫入的內容。 - 撰寫您想要
PipeReader
存取的內容之後,請呼叫 PipeWriter.FlushAsync。 - 如果讀取器無法啟用,請勿在
FlushAsync
完成前呼叫FlushAsync
,因為這可能導致死結。 - 請確定只有一個內容「擁有」
PipeReader
或PipeWriter
,或存取上述項目。 這些類型不是安全執行緒。 - 在呼叫
AdvanceTo
或完成PipeReader
之後,永遠不要存取 ReadResult.Buffer。
IDuplexPipe
IDuplexPipe 是同時支援讀取和寫入類型的合約。 例如,網路連線會以 IDuplexPipe
表示。
不同於 Pipe
,包含 PipeReader
和 PipeWriter
,IDuplexPipe
通常代表完整雙面連線的一端。 這表示寫入 PipeWriter
的內容不會從 PipeReader
中讀取。
資料流
讀取或寫串流資料時,您通常會使用取消序列化程式來讀取資料,並使用序列化程式寫入資料。 其中大部分的讀取和寫入串流的 API 都有 Stream
參數。 若要更輕鬆地與這些現有的 API 整合,PipeReader
和 PipeWriter
會公開 AsStream 方法。 AsStream傳回 PipeReader
或 PipeWriter
周圍的 Stream
實作。
資料流範例
PipeReader
和 PipeWriter
執行個體例可以使用指定的 Stream 物件和選擇性的對應建立選項來建立靜態 Create
方法。
StreamPipeReaderOptions 允許使用下列參數來控制建立 PipeReader
執行個體:
- StreamPipeReaderOptions.BufferSize 是從集區租用記憶體時所使用的最小緩衝區大小,預設為
4096
。 - StreamPipeReaderOptions.LeaveOpen 旗標會決定基礎資料流在完成之後
PipeReader
是否保持開啟狀態,並且預設為false
。 - StreamPipeReaderOptions.MinimumReadSize 代表配置新緩衝區之前,緩衝區中剩餘位元組的臨界值,並且預設為
1024
。 - StreamPipeReaderOptions.Pool 是配置記憶體時所使用的
MemoryPool<byte>
,預設為null
。
StreamPipeWriterOptions 允許使用下列參數來控制建立 PipeWriter
執行個體:
- StreamPipeWriterOptions.LeaveOpen 旗標會決定基礎資料流在完成之後
PipeWriter
是否保持開啟狀態,並且預設為false
。 - StreamPipeWriterOptions.MinimumBufferSize 代表從 Pool 租借記憶體時要使用的最小緩衝區大小,並且預設為
4096
。 - StreamPipeWriterOptions.Pool 是配置記憶體時所使用的
MemoryPool<byte>
,預設為null
。
重要
使用 PipeReader
方法建立 PipeWriter
和 Create
實例時,您必須考慮 Stream
物件存留期。 如果您在讀取器或寫入器完成串流之後需要存取資料流,您必須在建立選項上將 LeaveOpen
旗標設定為 true
。 否則,資料流將會關閉。
下列程式碼示範如何使用資料流中的 Create
方法建立 PipeReader
和 PipeWriter
執行個體。
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
應用程式會使用 StreamReader 讀取 lorem-ipsum.txt 檔案做為資料流,且結尾必須是空白行。 FileStream 會傳遞至 PipeReader.Create,以具現化 PipeReader
物件。 主控台應用程式接著會使用 Console.OpenStandardOutput() 將標準輸出資料流傳遞至 PipeWriter.Create。 此範例支援取消。