.NET 中的 System.IO.Pipelines

System.IO.Pipelines 是一個程式庫,其設計目的是讓您更輕鬆地在 .NET 中執行高效能 I/O。 此程式庫鎖定 .NET Standard,適用於所有 .NET 實作上。

程式庫可在 System.IO.Pipelines Nuget 套件中使用。

System.IO.Pipelines 解決的問題

剖析串流資料的應用程式是由具有許多特殊且不常見程式碼流程的重複使用程式碼所組成。 重複使用和特殊案例代碼很複雜且難以維護。

System.IO.Pipelines 已將結構建立為:

  • 具有高效能剖析串流資料。
  • 降低程式碼複雜度。

下列程式碼通常適用於從用戶端接收以行分隔的訊息 (由 '\n' 分隔):

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

上述程式碼有數個問題:

  • 整個訊息 (行結尾處) 可能不會在對 ReadAsync 的單一呼叫中收到。
  • 它會忽略 stream.ReadAsync 的結果。 stream.ReadAsync 會傳回讀取的資料量。
  • 此程式碼不會處理在單一 ReadAsync 呼叫中讀取多行的情況。
  • 此項目會為每個讀取配置 byte 陣列。

若要修正上述問題,需要進行下列變更:

  • 緩衝傳入資料,直到找到新資料行為止。

  • 剖析緩衝區中傳回的所有資料行。

  • 資料行可能大於 1 KB (1024 個位元組)。 程式碼必須調整輸入緩衝區的大小,直到找到分隔符號,才能符合緩衝區內的完整資料行。

    • 如果已調整緩衝區大小,則會在輸入中出現較長的資料行時,建立更多緩衝區複本。
    • 若要減少浪費的空間,請壓縮用於讀取資料行的緩衝區。
  • 請考慮使用緩衝集區來避免重複配置記憶體。

  • 下列程式碼可解決其中一些問題:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

先前的程式碼很複雜,而且無法解決所有已識別的問題。 高效能網路通常表示需要撰寫複雜的程式碼以最大化效能。 System.IO.Pipelines 設計用於讓撰寫此類型的程式碼更容易。

Pipe

Pipe 類別可用來建立 PipeWriter/PipeReader 配對。 寫入至 PipeWriter 的所有資料都可在 PipeReader 中取得:

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

管道基本用法

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

有兩個迴圈:

  • FillPipeAsync 會從 Socket 進行讀取,並寫入至 PipeWriter
  • ReadPipeAsyncPipeReader 進行讀取,並剖析傳入資料行。

未配置明確的緩衝區。 所有緩衝區管理都會委派給 PipeReaderPipeWriter 實作。 委派緩衝區管理可讓您更輕鬆取用程式碼,僅專注於業務邏輯。

在第一個迴圈中:

在第二個迴圈中,PipeReader 會使用 PipeWriter 所寫入的緩衝區。 緩衝區來自通訊端。 呼叫 PipeReader.ReadAsync

  • 傳回的 ReadResult 包含兩個重要資訊片段:

    • ReadOnlySequence<byte> 形式讀取的資料。
    • 布林值 IsCompleted,指出是否已達到資料結尾(EOF)。

尋找資料行結尾 (EOL) 分隔符號並剖析該資料行後:

  • 邏輯會處理緩衝區,以略過已處理的內容。
  • 呼叫 PipeReader.AdvanceTo 來告知 PipeReader 已取用和檢查的資料量。

讀取器和寫入器會藉由呼叫 Complete 來結束迴圈。 Complete 可讓基礎管道釋放其配置的記憶體。

背壓和流程式控制項

理想情況下讀取和剖析會一起運作:

  • 讀取執行緒會取用來自網路的資料,並將資料置於緩衝區中。
  • 剖析執行緒負責建構適當的資料結構。

一般而言剖析所花費的時間,將超過從網路複製資料區塊:

  • 讀取執行緒會比剖析執行緒更為優先。
  • 讀取執行緒必須慢下來,或配置更多記憶體以儲存剖析執行緒的資料。

為了達到最佳效能,會在頻繁暫停和配置更多記憶體之間進行權衡。

若要解決上述問題,Pipe 有兩個可控制資料流程的設定:

Diagram with ResumeWriterThreshold and PauseWriterThreshold

PipeWriter.FlushAsync

  • Pipe 中的資料量超出 PauseWriterThreshold 時,傳回不完整的 ValueTask<FlushResult>
  • ValueTask<FlushResult> 低於 ResumeWriterThreshold 時,將其完成。

使用兩個值來防止快速迴圈,如果使用一個值,可能會發生這種情況。

範例

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

通常使用 asyncawait 時,非同步程式碼會在 TaskScheduler 或目前的 SynchronizationContext 上恢復執行。

執行 I/O 時,請務必更精細地控制執行 I/O 的位置。 此控制項可讓您有效地利用 CPU 快取。 有效率進行快取對於 Web 服務器之類的高效能應用程式而言非常重要。 PipeScheduler 會提供非同步回呼執行位置的控制。 預設情況:

  • 使用目前的 SynchronizationContext
  • 如果沒有 SynchronizationContext,則項目會使用執行緒集區來執行回呼。
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPoolPipeScheduler 實作,可將回呼排入執行緒集區佇列。 PipeScheduler.ThreadPool 是預設值,而通常也是最佳選擇。 PipeScheduler.Inline 可能會導致非預期的結果,例如死結。

管道重設

重複使用 Pipe 物件十分有效率。 若要重設管道,請在 PipeReaderPipeWriter 完成時,呼叫PipeReaderReset

PipeReader

PipeReader 代表呼叫者管理記憶體。 呼叫 PipeReader.ReadAsync一律呼叫 PipeReader.AdvanceTo。 這可讓 PipeReader 知道呼叫者將記憶體使用完畢的時間,以便進行追蹤。 僅在呼叫 PipeReader.AdvanceTo 時,PipeReader.ReadAsync 傳回的 ReadOnlySequence<byte> 才會有效。 呼叫 PipeReader.AdvanceTo 之後,使用 ReadOnlySequence<byte> 不再合法。

PipeReader.AdvanceTo 會採用兩個 SequencePosition 引數:

  • 第一個引數會決定耗用的記憶體量。
  • 第二個引數會決定觀察到的緩衝區數目。

將資料標示為已取用,表示管道可以將記憶體傳回基礎緩衝集區。 將資料標示為觀察到的項目,可控制下一次呼叫的 PipeReader.ReadAsync 動作。 將一切標示為觀察到的項目,表示下一次呼叫 PipeReader.ReadAsync 時不會進行傳回,直到更多資料寫入管道為止。 任何其他值都會對 PipeReader.ReadAsync 進行下一次呼叫,以立即傳回觀察到未觀察到的資料,但不會傳回已取用的資料。

讀取串流資料案例

嘗試讀取串流資料時,會出現幾個典型的模式:

  • 指定資料流時,剖析單一訊息。
  • 指定資料流時,剖析所有可用訊息。

下列範例會使用 TryParseLines 方法來剖析 ReadOnlySequence<byte> 的訊息。 TryParseLines 剖析單一訊息並更新輸入緩衝區,以修剪緩衝區中剖析的訊息。 TryParseLines 不屬於 .NET 的一部分,此項目為使用者撰寫方法,並將用於下列章節中。

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

讀取單一訊息

下列程式碼會從 PipeReader 讀取單一訊息,並將其傳回給呼叫者。

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

上述 程式碼:

  • 剖析單一訊息。
  • 更新已取用 SequencePosition,並檢查 SequencePosition,以指向修剪輸入緩衝區的起點。

兩個 SequencePosition 引數會進行更新,因為 TryParseLines 會從輸入緩衝區移除剖析訊息。 一般而言,從緩衝區剖析單一訊息時,檢查的位置應該是下列其中一項:

  • 訊息的結尾處。
  • 如果找不到訊息,則為已接收緩衝區的結尾處。

單一訊息案例最有可能發生錯誤。 傳遞錯誤值以檢查可能導致記憶體不足的例外狀況或無限迴圈。 如需詳細資訊,請參閱本文中的 PipeReader 常見問題章節。

讀取多個訊息

下列程式碼會從 PipeReader 讀所有訊息,並在每個項目上呼叫 ProcessMessageAsync

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

取消

PipeReader.ReadAsync

  • 支援傳遞 CancellationToken
  • 如果 CancellationToken 在讀取擱置時取消,則擲回 OperationCanceledException
  • 支援透過 PipeReader.CancelPendingRead 取消目前讀取作業,可避免引發例外狀況。 呼叫 PipeReader.CancelPendingRead 會導致目前或下一個呼叫的 PipeReader.ReadAsync 傳回 ReadResult,並將 IsCanceled 設定為 true。 這對於以非破壞性和非例外方式停止現有讀取迴圈十分有用。
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

PipeReader 常見問題

  • 將錯誤的值傳遞至 consumedexamined 可能會導致讀取已讀取的資料。

  • buffer.End 作為已檢查項目傳遞可能會導致下列結果:

    • 停止的資料
    • 如果未取用資料,則會導致最終的記憶體不足 (OOM) 例外狀況。 例如從緩衝區一次處理單一訊息時,發生 PipeReader.AdvanceTo(position, buffer.End)
  • 將錯誤的值傳遞至 consumedexamined 可能會導致無限迴圈。 例如,PipeReader.AdvanceTo(buffer.Start) 如果 buffer.Start 尚未變更,會導致下一次呼叫 PipeReader.ReadAsync 在新的資料送達之前立即進行傳回。

  • 將錯誤的值傳遞至 consumedexamined 可能會導致無限緩衝 (最終造成 OOM 情況)。

  • 在呼叫 PipeReader.AdvanceTo 後使用 ReadOnlySequence<byte> 可能會導致記憶體損毀 (請在釋放後使用)。

  • 無法呼叫 PipeReader.Complete/CompleteAsync 可能會導致記憶體流失。

  • 在處理緩衝區之前檢查 ReadResult.IsCompleted 和結束讀取邏輯會導致資料遺失。 迴圈結束條件應以 ReadResult.Buffer.IsEmptyReadResult.IsCompleted 為基礎。 不正確執行可能會導致無限迴圈。

有問題的程式碼

資料遺失

IsCompleted 設定為 true 時,ReadResult 可以傳回資料的最後一個區段。 在結束讀取迴圈之前,不讀取該資料會導致資料遺失。

警告

請勿使用下列程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供下列範例來說明 PipeReader 常見問題

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

警告

請勿使用上述程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供上述範例來說明 PipeReader 常見問題

無限迴圈

如果 Result.IsCompletedtrue,下列邏輯可能會導致無限迴圈,但緩衝區中永遠不會有完整的訊息。

警告

請勿使用下列程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供下列範例來說明 PipeReader 常見問題

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

警告

請勿使用上述程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供上述範例來說明 PipeReader 常見問題

以下是另一個有相同問題的程式碼片段。 此片段會先檢查非空白的緩衝區,再檢查 ReadResult.IsCompleted。 因為此程式碼片段處於 else if 中,所以如果緩衝區中沒有完整的訊息,此片段就會永遠進行迴圈。

警告

請勿使用下列程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供下列範例來說明 PipeReader 常見問題

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

警告

請勿使用上述程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供上述範例來說明 PipeReader 常見問題

沒有回應的應用程式

無條件地在 examined 位置使用 buffer.End 呼叫 PipeReader.AdvanceTo,可能會導致應用程式在剖析單一訊息時變得沒有回應。 PipeReader.AdvanceTo 的下一個呼叫不會傳回,直到下列狀況發生為止:

  • 還有更多資料要寫入管道。
  • 而且先前未檢查到新的資料。

警告

請勿使用下列程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供下列範例來說明 PipeReader 常見問題

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

警告

請勿使用上述程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供上述範例來說明 PipeReader 常見問題

記憶體不足 (OOM)

在下列情況下,下列程式碼會持續緩衝處理,直到 OutOfMemoryException 情況發生為止:

  • 沒有訊息大小上限。
  • PipeReader 傳回的資料不會產生完整的訊息。 例如此資料不會建立完整的訊息,因為另一端為寫入大型訊息 (例如,4 GB 的訊息)。

警告

請勿使用下列程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供下列範例來說明 PipeReader 常見問題

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

警告

請勿使用上述程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供上述範例來說明 PipeReader 常見問題

記憶體損毀

撰寫讀取緩衝區的協助程式時,應該先複製任何傳回的承載資料,再呼叫 Advance。 下列範例會傳回 Pipe 捨棄的記憶體,可能會在下次作業 (讀取/寫入) 重複使用。

警告

請勿使用下列程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供下列範例來說明 PipeReader 常見問題

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

警告

請勿使用上述程式碼。 使用此範例會導致資料遺失、停止回應、安全性問題,且不應複製。 提供上述範例來說明 PipeReader 常見問題

PipeWriter

PipeWriter 會管理代表呼叫者寫入的緩衝區。 PipeWriter 會實作 IBufferWriter<byte>IBufferWriter<byte> 可讓您存取緩衝區來執行寫入,而不需額外的緩衝區複本。

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

先前的程式碼:

  • 使用 GetMemoryPipeWriter 要求至少 5 個位元組的緩衝區。
  • 將 ASCII 字串 "Hello" 的位元組寫入至傳回的 Memory<byte>
  • 呼叫 Advance 以指出寫入緩衝區的位元組數目。
  • 排清 PipeWriter 以將位元組傳送至基礎裝置。

先前的寫入方法會使用 PipeWriter 所提供的緩衝區。 此寫入方法也可能已使用 PipeWriter.WriteAsync,其狀況如下:

  • 會將現有的緩衝區複製到 PipeWriter
  • 視需要呼叫 GetSpanAdvance,並呼叫 FlushAsync
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

取消

FlushAsync 支援傳遞 CancellationToken。 如果權杖在排清擱置中時取消,則傳遞 CancellationToken 會導致 OperationCanceledExceptionPipeWriter.FlushAsync 支援透過 PipeWriter.CancelPendingFlush 取消目前排清作業,而不引發例外狀況。 呼叫 PipeWriter.CancelPendingFlush 會導致目前或下一個呼叫的 PipeWriter.FlushAsyncPipeWriter.WriteAsync 傳回 FlushResult,並將 IsCanceled 設定為 true。 這對於以非破壞性和非例外方式停止暫止排清的狀況十分有用。

PipeWriter 常見問題

  • GetSpanGetMemory 會傳回具有最低要求記憶體數量的緩衝區。 請勿假設確切的緩衝區大小。
  • 無法保證後續呼叫會傳回相同的緩衝區或具相同大小的緩衝區。
  • 呼叫 Advance 繼續寫入更多資料之後,必須要求新的緩衝區。 先前取得的緩衝區無法寫入。
  • 呼叫 GetMemoryGetSpan,但呼叫 FlushAsync 不完整且不安全。
  • 有未排清資料時呼叫 CompleteCompleteAsync,可能會導致記憶體損毀。

使用 PipeReader 和 PipeWriter 的秘訣

下列秘訣將協助您成功使用 System.IO.Pipelines 類別:

  • 請一律完成 PipeReaderPipeWriter,包括適用的例外狀況。
  • 呼叫 PipeReader.ReadAsync 後請一律呼叫 PipeReader.AdvanceTo
  • 撰寫時定期 awaitPipeWriter.FlushAsync,且一律檢查 FlushResult.IsCompleted。 如果 IsCompletedtrue,則中止寫入,因為這表示讀取器已完成,不再關心寫入的內容。
  • 撰寫您想要 PipeReader 存取的內容之後,請呼叫 PipeWriter.FlushAsync
  • 如果讀取器無法啟用,請勿在 FlushAsync 完成前呼叫 FlushAsync,因為這可能導致死結。
  • 請確定只有一個內容「擁有」PipeReaderPipeWriter,或存取上述項目。 這些類型不是安全執行緒。
  • 在呼叫 AdvanceTo 或完成 PipeReader 之後,永遠不要存取 ReadResult.Buffer

IDuplexPipe

IDuplexPipe 是同時支援讀取和寫入類型的合約。 例如,網路連線會以 IDuplexPipe 表示。

不同於 Pipe,包含 PipeReaderPipeWriterIDuplexPipe 通常代表完整雙面連線的一端。 這表示寫入 PipeWriter 的內容不會從 PipeReader 中讀取。

資料流

讀取或寫串流資料時,您通常會使用取消序列化程式來讀取資料,並使用序列化程式寫入資料。 其中大部分的讀取和寫入串流的 API 都有 Stream 參數。 若要更輕鬆地與這些現有的 API 整合,PipeReaderPipeWriter 會公開 AsStream 方法。 AsStream傳回 PipeReaderPipeWriter 周圍的 Stream 實作。

資料流範例

PipeReaderPipeWriter 執行個體例可以使用指定的 Stream 物件和選擇性的對應建立選項來建立靜態 Create 方法。

StreamPipeReaderOptions 允許使用下列參數來控制建立 PipeReader 執行個體:

StreamPipeWriterOptions 允許使用下列參數來控制建立 PipeWriter 執行個體:

重要

使用 PipeReader 方法建立 PipeWriterCreate 實例時,您必須考慮 Stream 物件存留期。 如果您在讀取器或寫入器完成串流之後需要存取資料流,您必須在建立選項上將 LeaveOpen 旗標設定為 true。 否則,資料流將會關閉。

下列程式碼示範如何使用資料流中的 Create 方法建立 PipeReaderPipeWriter 執行個體。

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

應用程式會使用 StreamReader 讀取 lorem-ipsum.txt 檔案做為資料流,且結尾必須是空白行。 FileStream 會傳遞至 PipeReader.Create,以具現化 PipeReader 物件。 主控台應用程式接著會使用 Console.OpenStandardOutput() 將標準輸出資料流傳遞至 PipeWriter.Create。 此範例支援取消