通过


System.IO.Pipelines

System.IO.Pipelines 是一个库,旨在简化 .NET 中的高性能 I/O。 该包面向 .NET Standard,实现广泛的兼容性、.NET Framework 和新式 .NET。 在现代 .NET 版本中, System.IO.Pipelines 包含在共享框架中,不需要单独的 NuGet 包。

该库也可用作 System.IO.Pipelines NuGet 包。

System.IO.Pipelines 可以解决什么问题?

分析流数据的应用由样板代码组成,后者由许多专门且不寻常的代码流组成。 样板代码和特殊情况代码很复杂且难以进行维护。

System.IO.Pipelines 已构建为:

  • 具有高性能的流数据分析功能。
  • 减少代码复杂性。

以下代码是从客户端接收以'\n'作为分隔符划分行的消息的 TCP 服务器的典型例子:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

前面的代码有几个问题:

  • 单次调用 ReadAsync 可能无法接收整条消息(行尾)。
  • 忽略了 stream.ReadAsync 的结果。 stream.ReadAsync 返回读取的数据量。
  • 它不能处理在单个 ReadAsync 调用中读取多行的情况。
  • 它为每次读取分配一个 byte 数组。

若要修复上述问题,请进行以下更改:

  • 缓冲传入的数据,直到找到新行。

  • 分析缓冲区中返回的所有行。

  • 该行可能大于 1 KB(1024 字节)。 代码需要调整输入缓冲区的大小,直到找到分隔符以适应缓冲区内的完整行。

    • 如果调整缓冲区的大小,当输入中出现较长的行时,将生成更多缓冲区副本。
    • 压缩用于读取行的缓冲区,以减少空余。
  • 请考虑使用缓冲池来避免重复分配内存。

  • 此代码解决了以下一些问题:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

前面的代码很复杂,不能解决所识别的所有问题。 高性能网络通常意味着编写复杂的代码以使性能最大化。 System.IO.Pipelines 的设计目的是使编写此类代码更容易。

管道

使用 Pipe 类创建一个 PipeWriter/PipeReader 对。 写入 PipeWriter 的所有数据都在 PipeReader 中可用。

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

管道基本用法

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

两个循环处理读取和写入:

  • FillPipeAsyncSocket 读取并写入 PipeWriter
  • ReadPipeAsyncPipeReader 读取并分析传入的行。

未分配显式缓冲区。 所有缓冲区管理都委托给 PipeReaderPipeWriter 实现。 委派缓冲区管理使使用代码更容易集中关注业务逻辑。

在第一个循环中:

在第二个循环中,PipeReader 使用由 PipeWriter 写入的缓冲区。 缓冲区来自套接字。 对 PipeReader.ReadAsync 的调用:

  • 返回包含两条重要信息的 ReadResult

    • ReadOnlySequence<T> 形式读取的数据。
    • 布尔值 IsCompleted,指示是否已到达数据结尾 (EOF)。

找到行尾(EOL)分隔符并解析该行后:

  • 该逻辑处理缓冲区以跳过已处理的内容。
  • 调用 PipeReader.AdvanceTo 以告知 PipeReader 已消耗和检查了多少数据。

读取器和写入器循环的结束是通过调用函数 PipeReader.CompletePipeWriter.Complete 实现的。 调用 Complete 释放由底层 Pipe 分配的内存。

反压和流量控制

理想情况下,读取和分析可协同工作:

  • 读取线程使用来自网络的数据并将其放入缓冲区。
  • 分析线程负责构造适当的数据结构。

通常,分析所花费的时间比仅从网络复制数据块所用时间更长:

  • 读取线程领先于分析线程。
  • 读取线程必须减缓或分配更多内存来存储用于分析线程的数据。

为了获得最佳性能,需要在频繁暂停和分配更多内存之间取得平衡。

为解决上述问题,Pipe 提供了两个设置来控制数据流:

包含 ResumeWriterThreshold 和 PauseWriterThreshold 的图示

PipeWriter.FlushAsync:

  • ValueTask<FlushResult> 中的数据量超过 Pipe 时,返回不完整的 PauseWriterThreshold
  • ValueTask<FlushResult> 低于 ResumeWriterThreshold 时完成。

使用两个值可防止快速循环,如果只使用一个值,则可能发生这种循环。

示例

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

管道调度器

通常在使用 asyncawait 时,异步代码会在 TaskScheduler 或当前 SynchronizationContext 上恢复。

在执行 I/O 时,对执行 I/O 的位置进行细粒度控制非常重要。 此控件允许高效利用 CPU 缓存。 高效的缓存对于 Web 服务器等高性能应用至关重要。 PipeScheduler 提供对异步回调运行位置的控制。 默认情况下:

  • 使用当前的 SynchronizationContext
  • 如果没有 SynchronizationContext,它将使用线程池运行回调。
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPoolPipeScheduler 实现,用于对线程池的回调进行排队。 PipeScheduler.ThreadPool 是默认选项,通常也是最佳选项。 PipeScheduler.Inline 可能会导致意外后果,如死锁。

管道重置

通常重用这个Pipe对象是高效的。 若要重置管道,请在 PipeReaderReset 完成时调用 PipeReaderPipeWriter

PipeReader

PipeReader 代表调用方管理内存。 在调用 之后始终调用 PipeReader.AdvanceTo。 这使 PipeReader 知道调用方何时用完内存,以便可以对其进行跟踪。 从ReadOnlySequence<T>返回的值在调用PipeReader.AdvanceTo时仍然有效。 调用 ReadOnlySequence<T> 后,不能使用 PipeReader.AdvanceTo

PipeReader.AdvanceTo 采用两个 SequencePosition 参数:

  • 第一个参数确定消耗的内存量。
  • 第二个参数确定观察到的缓冲区数。

将数据标记为“已使用”意味着管道可以将内存返回到底层缓冲池。 将数据标记为“已观察”可控制对 PipeReader.ReadAsync 的下一个调用的作用。 将所有内容都标记为“已观察”意味着下次对 PipeReader.ReadAsync 的调用将不会返回,直到有更多数据写入管道。 其他任何值都会使下一次调用PipeReader.ReadAsync立即返回观察到和未观察到的数据,但不包括已使用的数据。

读取流数据场景

读取流数据时会出现几个典型的模式:

  • 给定数据流时,分析单条消息。
  • 给定数据流时,分析所有可用消息。

这些示例使用 TryParseLines 方法解析来自 ReadOnlySequence<T> 的消息。 TryParseLines 分析单条消息并更新输入缓冲区,以从缓冲区中剪裁已分析的消息。 TryParseLines 不是 .NET 的一部分;这是以下各节中使用的用户编写方法。

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

读取单条消息

此代码从某个 PipeReader 消息中读取单个消息,并将其返回到调用方。

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

前面的代码:

  • 分析单条消息。
  • 更新已使用的 SequencePosition 并检查 SequencePosition 以指向已剪裁的输入缓冲区的开始。

因为 SequencePosition 从输入缓冲区中删除了已分析的消息,所以更新了两个 TryParseLines 参数。 通常,分析来自缓冲区的单条消息时,检查的位置应为以下位置之一:

  • 消息的结尾。
  • 如果未找到消息,则指向接收缓冲区的结尾。

单条消息案例最有可能出现错误。 传递给 检查 的错误值可能会导致内存溢出异常或进入无限循环。 有关详细信息,请参阅本文中的 PipeReader 常见问题部分。

重要

ReadSingleMessageAsync 不调用 PipeReader.CompleteAsync。 调用方负责完成该操作 PipeReader。 在 ReadSingleMessageAsync 内调用 PipeReader.CompleteAsync 表示无法读取更多数据,从而阻止读取后续消息。

读取多条消息

此代码从 PipeReader 读取所有消息,并对每个消息调用 ProcessMessageAsync

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

由于 ProcessMessagesAsync 拥有完整的消息读取循环,因此它在完成时调用 PipeReader.CompleteAsync 。 与单消息情况不同,调用方无需完成读取器。 ProcessMessagesAsync 完全拥有 PipeReader 的生命周期。

取消

PipeReader.ReadAsync:

private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

PipeReader 常见问题

  • 将错误的值传递给 consumedexamined 可能导致读取已处理过的数据。

  • buffer.End按检查传递可能会导致:

    • 数据停滞
    • 如果不消耗数据,将会引发内存不足(OOM)异常。 例如,当一次处理来自缓冲区的单条消息时,可能会出现 PipeReader.AdvanceTo(position, buffer.End)
  • 将错误值传递给 consumedexamined 可能导致无限循环。 例如,PipeReader.AdvanceTo(buffer.Start)如果buffer.Start没有更改,则会导致下一次调用PipeReader.ReadAsync在新数据到达之前立即返回。

  • 将错误的值传递给consumedexamined可能导致无限缓冲(最终OOM)。

  • 调用ReadOnlySequence<T>后使用PipeReader.AdvanceTo可能会导致内存损坏(释放后使用)。

  • 调用失败 Complete/CompleteAsync 可能会导致内存泄漏。

  • 在处理缓冲区之前检查 ReadResult.IsCompleted 并退出读取逻辑会导致数据丢失。 循环退出条件应基于 ReadResult.Buffer.IsEmptyReadResult.IsCompleted。 如果错误执行此操作,可能会导致无限循环。

有问题的代码

数据丢失

ReadResult 被设置为 IsCompleted 时,true 可能会返回最后一段数据。 在退出读取循环之前不读取该数据会导致数据丢失。

警告

不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 下面的示例提供了说明 PipeReader 常见问题

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

警告

不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 提供了前面的示例来解释 PipeReader 常见问题

无限循环

如果 Result.IsCompleted 存在 true 但缓冲区中从未出现完整消息,则以下逻辑可能会导致无限循环。

警告

不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 下面的示例提供了说明 PipeReader 常见问题

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

警告

不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 提供了前面的示例来解释 PipeReader 常见问题

下面是另一段具有相同问题的代码。 该代码在检查 ReadResult.IsCompleted 之前检查非空缓冲区。 因为它在else if,因此,如果缓冲区中从未存在完整的消息,它将一直循环。

警告

不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 下面的示例提供了说明 PipeReader 常见问题

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

警告

不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 提供了前面的示例来解释 PipeReader 常见问题

应用程序无响应

无条件地使用buffer.Endexamined位置调用PipeReader.AdvanceTo,可能会导致程序在解析单个消息时变得无响应。 对 PipeReader.AdvanceTo 的下次调用将在以下情况下返回:

  • 更多数据已写入管道中。
  • 新的数据之前未曾检查过。

警告

不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 下面的示例提供了说明 PipeReader 常见问题

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

警告

不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 提供了前面的示例来解释 PipeReader 常见问题

内存不足 (OOM)

根据以下条件,此代码会持续缓冲,直到 OutOfMemoryException 发生:

  • 没有最大消息大小。
  • PipeReader 返回的数据不会生成完整的消息。 例如,另一方正在编写大型消息(例如 4 GB 消息)。

警告

不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 下面的示例提供了说明 PipeReader 常见问题

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

警告

不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 提供了前面的示例来解释 PipeReader 常见问题

内存损坏

编写读取缓冲区的帮助程序时,请在调用 Advance之前复制返回的任何有效负载。 以下示例返回已放弃的 Pipe 内存,并可能将其重新用于下一个操作(可读/写)。

警告

不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 下面的示例提供了说明 PipeReader 常见问题

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

警告

不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 提供了前面的示例来解释 PipeReader 常见问题

PipeWriter

PipeWriter 负责管理缓冲区,以代表调用方执行写入操作。 PipeWriter 可实现 IBufferWriter<byte>IBufferWriter<byte> 提供对缓冲区的访问,以执行写入,而无需额外的缓冲区副本。

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

之前的代码:

  • 使用 PipeWriterGetMemory 请求至少 5 个字节的缓冲区。
  • 将 ASCII 字符串 "Hello" 的字节写入到被返回的 Memory<byte> 中。
  • 调用 Advance 以指示写入缓冲区的字节数。
  • 冲刷 PipeWriter,从而将字节发送到基础设备。

以前的写入方法使用 PipeWriter 提供的缓冲区。 它还可以使用 PipeWriter.WriteAsync,例如:

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

取消

FlushAsync 支持传递 CancellationToken。 如果令牌在刷新挂起时被取消,则传递 CancellationToken 将导致 OperationCanceledExceptionPipeWriter.FlushAsync 支持通过 PipeWriter.CancelPendingFlush 取消当前刷新操作,不引发异常。 调用 PipeWriter.CancelPendingFlush 将导致对 PipeWriter.FlushAsyncPipeWriter.WriteAsync 的当前或下次调用返回 FlushResult,并将 IsCanceled 设置为 true。 这对于以非破坏性和非异常方式停止生成刷新非常有用。

PipeWriter 常见问题

  • GetSpanGetMemory 返回至少具有请求内存量的缓冲区。 请勿假设确切的缓冲区大小 。
  • 不保证连续调用返回相同的缓冲区或相同大小的缓冲区。
  • 在调用 Advance 之后,必须请求一个新的缓冲区来继续写入更多数据。 无法将数据写入到已获取的缓冲区中。
  • 如果未完成对 GetMemory 的调用,则调用 GetSpanFlushAsync 将不安全。
  • 调用 CompleteCompleteAsync 时,如果存在未刷新数据,可能会导致内存损坏。

PipeReader 和 PipeWriter 的提示

使用以下提示以成功运用 System.IO.Pipelines 类:

IDuplexPipe

IDuplexPipe 是一种同时支持读取和写入的类型契约。 例如,网络连接通过 IDuplexPipe 表示。

与包含 PipePipeReaderPipeWriter 不同,IDuplexPipe 表示全双工连接的一个部分。 向 PipeWriter 写入的内容不会从 PipeReader 读取。

在读取或写入流数据时,通常使用反序列化程序读取数据,并使用序列化程序写入数据。 大多数读取和写入流 API 都有一个 Stream 参数。 为了更轻松地与这些现有 API 集成,PipeReaderPipeWriter 公开了一个 AsStream 方法。 AsStream 返回围绕 StreamPipeReaderPipeWriter 实现。

流示例

使用给定PipeReader对象和可选相应创建选项的静态PipeWriter方法创建CreateStream实例。

StreamPipeReaderOptions 允许使用以下参数控制 PipeReader 实例的创建:

StreamPipeWriterOptions 允许使用以下参数控制 PipeWriter 实例的创建:

重要

使用PipeReader方法创建PipeWriterCreate实例时,请考虑对象Stream生存期。 如果在读取器或编写器完成后需要访问流,请将 LeaveOpen 标志设置为 true 在创建选项上。 否则,流将被关闭。

此代码演示如何使用PipeReader流中的方法创建PipeWriterCreate实例。

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

应用程序使用 StreamReader 以流形式读取 lorem-ipsum.txt 文件,并且必须以空白行结尾。 FileStream 传递给 PipeReader.Create,后者实例化 PipeReader 对象。 然后,控制台应用程序使用 PipeWriter.Create 将其标准输出流传递到 Console.OpenStandardOutput()。 示例支持取消