英語で読む

次の方法で共有


.NET の System.IO.Pipelines

System.IO.Pipelines は、.NET でハイ パフォーマンスの I/O をより簡単に行えるように設計されたライブラリです。 これは、すべての .NET 実装で動作する .NET Standard を対象とするライブラリです。

ライブラリは System.IO.Pipelines Nuget パッケージで入手できます。

System.IO.Pipelines によって解決される問題

ストリーミング データを解析するアプリは、多くの特殊な通常とは異なるコード フローを持つ定型コードで構成されます。 定型および特殊なケースのコードは複雑で、保守が困難です。

System.IO.Pipelines は、以下のようになるように設計されています。

  • ハイ パフォーマンスのストリーミング データ解析を実現する。
  • コードの複雑さを軽減する。

次のコードは、クライアントから ('\n' で区切られた) 行区切りメッセージを受信する TCP サーバーでは一般的なものです。

C#
async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

上記のコードには、以下のようないくつかの問題があります。

  • メッセージ全体 (行の終わり) が、ReadAsync への 1 回の呼び出しで受信されない場合がある。
  • stream.ReadAsync の結果が無視される。 stream.ReadAsync で、読み取られたデータの量が返される。
  • 複数の行が 1 回の ReadAsync 呼び出しで読み取られるケースが処理されない。
  • 読み取りごとに byte 配列が割り当てられる。

上記の問題を解決するには、次の変更が必要です。

  • 改行が検出されるまで、受信データをバッファーに格納します。

  • バッファーで返されたすべての行を解析します。

  • 行が 1 KB (1024 バイト) より大きい可能性があります。 コードでは、バッファー内の完全な行に収まるように、区切り記号が見つかるまで入力バッファーのサイズを変更する必要があります。

    • バッファーのサイズが変更された場合、入力により長い行が表示されると、より多くのバッファー コピーが作成されます。
    • 無駄な領域を減らすには、行の読み取りに使用されるバッファーを小さくします。
  • メモリが繰り返し割り当てられないようにするために、バッファー プーリングの使用を検討します。

  • 次のコードでは、これらの問題の一部に対処します。

C#
async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

上記のコードは複雑で、特定されたすべての問題には対処していません。 ハイ パフォーマンス ネットワークは、通常、パフォーマンスを最大化するために複雑なコードを記述することを意味します。 System.IO.Pipelines は、この種のコードをより簡単に記述できるように設計されています。

パイプ

Pipe クラスを使用して、PipeWriter/PipeReader ペアを作成できます。 PipeWriter に書き込まれたすべてのデータは、PipeReader で利用できます。

C#
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

パイプの基本的な使用方法

C#
async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

次の 2 つのループがあります。

  • FillPipeAsync では Socket から読み取り、PipeWriter に書き込みます。
  • ReadPipeAsync では PipeReader から読み取り、受信行を解析します。

明示的なバッファーは割り当てられていません。 すべてのバッファー管理は、PipeReaderPipeWriter の実装に委任されます。 バッファー管理を委任することで、ビジネス ロジックのみに焦点を当てるコードがより簡単に消費されるようになります。

最初のループでは、次のようになります。

  • PipeWriter.GetMemory(Int32) は、基になるライターからメモリを取得するために呼び出されます。
  • PipeWriter.Advance(Int32) は、バッファーに書き込まれたデータの量を PipeWriter に示すために呼び出されます。
  • PipeWriter.FlushAsync は、データを PipeReader で使用できるようにするために呼び出されます。

2 番目のループでは、PipeWriter によって書き込まれたバッファーが PipeReader で消費されます。 バッファーはソケットから取得されます。 PipeReader.ReadAsync の呼び出しでは、次のようになります。

  • 次の 2 つの重要な情報を含む、ReadResult を返します。

    • ReadOnlySequence<byte> の形式で読み取られたデータ。
    • データの終わり (EOF) に到達したかどうかを示すブール値 IsCompleted

行の終わり (EOL) の区切り記号が検出され、行が解析された後は、次のようになります。

  • ロジックでバッファーが処理され、既に処理されているものがスキップされます。
  • PipeReader.AdvanceTo は、どのくらいの量のデータが使用され、検査されたかを PipeReader に示すために呼び出されます。

リーダーとライターのループは、Complete を呼び出すことによって終了します。 Complete は、基になるパイプで割り当てられたメモリを解放できるようにします。

バックプレッシャとフロー制御

読み取りと解析を連動させるのが理想的です。

  • 読み取りスレッドでは、ネットワークからのデータを消費し、それをバッファーに格納します。
  • 解析スレッドでは、適切なデータ構造の構築を担当します。

通常、解析には、ネットワークからデータのブロックをコピーするだけの場合より、時間がかかります。

  • 読み取りスレッドは解析スレッドより前になります。
  • 読み取りスレッドの速度を落とすか、解析スレッド用のデータを格納するためにより多くのメモリを割り当てる必要があります。

最適なパフォーマンスが得られるように、頻繁な一時停止間のバランスが保たれ、より多くのメモリが割り当てられます。

この問題を解決するために、Pipe には、データのフローを制御するための次の 2 つの設定があります。

  • PauseWriterThreshold:FlushAsync の呼び出しが一時停止する前に、バッファーに格納する必要があるデータ量を判別します。
  • ResumeWriterThreshold:PipeWriter.FlushAsync の呼び出しが再開される前に、リーダーが監視する必要があるデータの量を判別します。

ResumeWriterThreshold と PauseWriterThreshold を含む図

PipeWriter.FlushAsync=

  • Pipe のデータ量が PauseWriterThreshold を超えたときに、不完全な ValueTask<FlushResult> を返します。
  • ResumeWriterThreshold より低くなったときに、ValueTask<FlushResult> を完了します。

2 つの値は、1 つの値が使用された場合に発生する可能性がある、急速な循環を防ぐために使用されます。

使用例

C#
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

通常、async および await を使用する場合、非同期コードは TaskScheduler または現在の SynchronizationContext で再開されます。

I/O を行う場合は、I/O が実行される場所をきめ細かく制御することが重要です。 この制御により、CPU キャッシュを効果的に利用できます。 効率的なキャッシュは、Web サーバーなどのハイ パフォーマンス アプリに不可欠です。 PipeScheduler では、非同期コールバックが実行される場所を制御できます。 既定では:

  • 現在の SynchronizationContext が使用されます。
  • SynchronizationContext がない場合は、スレッド プールを使用してコールバックを実行します。
C#
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPool は、スレッド プールへのコールバックをキューに登録する PipeScheduler の実装です。 PipeScheduler.ThreadPool は既定値であり、一般的に最適な選択肢です。 PipeScheduler.Inline を使用すると、デッドロックなどの意図しない結果となる可能性があります。

パイプのリセット

多くの場合、Pipe オブジェクトを再利用するのが効率的です。 パイプをリセットするには、PipeReaderPipeWriter の両方が完了したときに PipeReader Reset を呼び出します。

PipeReader

PipeReader では、呼び出し元の代わりにメモリを管理します。 PipeReader.ReadAsync を呼び出した後に、常にPipeReader.AdvanceTo を呼び出します。 これにより、PipeReader では、呼び出し元がメモリを使用して実行されるタイミングを把握し、追跡できるようになります。 PipeReader.ReadAsync から返された ReadOnlySequence<byte> が有効なのは、PipeReader.AdvanceTo が呼び出されるまでのみとなります。 PipeReader.AdvanceTo を呼び出した後、ReadOnlySequence<byte> を使用することはできません。

PipeReader.AdvanceTo では、次の 2 つの SequencePosition 引数を受け取ります。

  • 最初の引数では、消費されたメモリの量を判別します。
  • 2 番目の引数では、監視されたバッファーの量を判別します。

データを消費済みとしてマークすることは、パイプでメモリを基になるバッファープ ールに返すことができることを意味します。 データを監視済みとしてマークすると、PipeReader.ReadAsync の次の呼び出しで行われる内容が制御されます。 すべてを監視済みとしてマークすることは、パイプにデータがさらに書き込まれるまで、PipeReader.ReadAsync の次の呼び出しでは何も返されないことを意味します。 それ以外の値を指定すると、PipeReader.ReadAsync の次の呼び出しで、監視されたデータ 監視されていないデータがすぐに返されますが、既に消費されているデータは除きます。

ストリーミング データの読み取りシナリオ

ストリーミング データを読み取ろうとすると発生する、一般的なパターンがいくつかあります。

  • 指定したデータのストリームで、1 つのメッセージを解析する。
  • 指定したデータのストリームで、使用可能なメッセージをすべて解析する。

次の例では、ReadOnlySequence<byte> からのメッセージを解析するために TryParseLines メソッドを使用しています。 TryParseLines では 1 つのメッセージを解析し、入力バッファーを更新して、解析されたメッセージをバッファーからトリミングします。 TryParseLines は .NET の一部ではありません。これは、次のセクションで使用されるユーザーが記述するメソッドです。

C#
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

1 つのメッセージを読み取る

次のコードでは、PipeReader からの 1 つのメッセージを読み取り、呼び出し元に返します。

C#
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

上記のコードでは次の操作が行われます。

  • 1 つのメッセージを解析します。
  • 消費された SequencePosition と検査された SequencePosition を更新し、トリミングされた入力バッファーの先頭を指すようにします。

2 つの SequencePosition 引数が更新されます。これは、TryParseLines で解析されたメッセージが入力バッファーから削除されるためです。 一般に、バッファーからの 1 つのメッセージを解析する場合、検査位置は次のいずれかである必要があります。

  • メッセージの終わり。
  • メッセージが検出されなかった場合は、受信されたバッファーの終わり。

1 つのメッセージ ケースでは、エラーが発生する可能性が最も高くなります。 examined に間違った値を渡すと、メモリ不足の例外または無限ループが発生する可能性があります。 詳細については、この記事の「PipeReader の一般的な問題」セクションを参照してください。

複数のメッセージの読み取り

次のコードでは、PipeReader からのすべてのメッセージを読み取り、それぞれに対して ProcessMessageAsync を呼び出します。

C#
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

キャンセル

PipeReader.ReadAsync=

  • CancellationToken を渡すことをサポートします。
  • 読み取りが保留中のときに CancellationToken が取り消された場合は、OperationCanceledException をスローします。
  • PipeReader.CancelPendingRead を使用して現在の読み取り操作を取り消す方法をサポートします。これにより、例外の発生を回避できます。 PipeReader.CancelPendingRead を呼び出すと、PipeReader.ReadAsync の現在の呼び出しまたは次の呼び出しで、IsCanceledtrue に設定された ReadResult が返されます。 これは、既存の読み取りループを非破壊的で非例外的な方法で停止する際に役立つ場合があります。
C#
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

PipeReader の一般的な問題

  • consumed または examined に間違った値を渡すと、既に読み取られているデータが読み取られる場合があります。

  • buffer.End を検査済みとして渡すと、次のようになる場合があります。

    • データが停止する
    • データが消費されていない場合、最終的にメモリ不足 (OOM) 例外が発生する可能性があります。 たとえば、バッファーから一度に 1 つのメッセージを処理する場合は、PipeReader.AdvanceTo(position, buffer.End) です。
  • consumed または examined に間違った値を渡すと、無限ループが発生する可能性があります。 たとえば、buffer.Start が変更されていない場合、PipeReader.AdvanceTo(buffer.Start) では、新しいデータが到着する直前に PipeReader.ReadAsync への次の呼び出しで制御が返されます。

  • consumed または examined に間違った値を渡すと、無限バッファーリング (最終的には OOM) が発生する可能性があります。

  • PipeReader.AdvanceTo を呼び出した後に ReadOnlySequence<byte> を使用すると、メモリが破損する可能性があります (解放後使用)。

  • PipeReader.Complete/CompleteAsync を呼び出せないと、メモリ リークが発生する可能性があります。

  • バッファーを処理する前に ReadResult.IsCompleted を確認し、読み取りロジックを終了すると、データが失われます。 ループの終了条件は、ReadResult.Buffer.IsEmpty および ReadResult.IsCompleted に基づいている必要があります。 これを誤って実行すると、無限ループになる可能性があります。

問題のあるコード

データ損失

IsCompletedtrue に設定されている場合、ReadResult でデータの最終セグメントを返すことができます。 読み取りループを終了する前にデータを読み取らないと、データが失われます。

警告

次のコードは使用しないでください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。

C#
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

警告

上記のコードは使用しない でください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。

無限ループ

次のロジックでは、Result.IsCompletedtrue でも、バッファー内に完全なメッセージがない場合は、無限ループが発生する可能性があります。

警告

次のコードは使用しないでください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。

C#
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

警告

上記のコードは使用しない でください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。

同じ問題があるもう 1 つのコードを次に示します。 ReadResult.IsCompleted を確認する前に、空でないバッファーがあるかどうかが確認されます。 これは else if にあるため、バッファー内に完全なメッセージがない場合は、無限にループされます。

警告

次のコードは使用しないでください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。

C#
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

警告

上記のコードは使用しない でください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。

応答しないアプリケーション

examined の位置で buffer.End を使用して PipeReader.AdvanceTo を無条件に呼び出すと、1 つのメッセージを解析するときにアプリケーションが応答しなくなる可能性があります。 PipeReader.AdvanceTo の次の呼び出しでは、以下のようになるまで何も返されません。

  • パイプにさらにデータが書き込まれている。
  • また、新しいデータが以前に検査されていない。

警告

次のコードは使用しないでください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。

C#
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

警告

上記のコードは使用しない でください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。

メモリ不足 (OOM)

次のような状況の場合、以下のコードでは、OutOfMemoryException が発生するまでバッファーが保持されます。

  • 最大メッセージ サイズがない。
  • PipeReader から返されたデータで、完全なメッセージが作成されない。 たとえば、他の側で大きなメッセージ (4 GB のメッセージなど) を書き込んでいるため、完全なメッセージが作成されていない。

警告

次のコードは使用しないでください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。

C#
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

警告

上記のコードは使用しない でください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。

メモリの破損

バッファーを読み取るヘルパーを記述するときは、返されたすべてのペイロードをコピーしてから Advance を呼び出す必要があります。 次の例では、Pipe で破棄されたメモリを返し、次の操作 (読み取り/書き込み) でそれを再利用する可能性があります。

警告

次のコードは使用しないでください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。

C#
public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
C#
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

警告

上記のコードは使用しない でください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。

PipeWriter

PipeWriter では、呼び出し元の代わりに書き込むバッファーを管理します。 PipeWriter は、IBufferWriter<byte> を実装します。 IBufferWriter<byte> は、バッファーのコピーを追加せずに、書き込みを実行するためにバッファーにアクセスできるようにします。

C#
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

上記のコードでは、次のようになります。

  • GetMemory を使用して、PipeWriter から少なくとも 5 バイトのバッファーを要求します。
  • ASCII 文字列である "Hello" のバイトを、返された Memory<byte> に書き込みます。
  • Advance を呼び出して、バッファーに書き込まれたバイト数を示します。
  • 基になるデバイスにバイトを送信する、PipeWriter をフラッシュします。

上記の書き込みメソッドでは、PipeWriter によって提供されるバッファーが使用されています。 PipeWriter.WriteAsync を使用して、以下を行うこともできます。

  • 既存のバッファーを PipeWriter にコピーします。
  • GetSpan を呼び出し、必要に応じて Advance を呼び出してから、FlushAsync を呼び出します。
C#
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

キャンセル

FlushAsync では、CancellationToken を渡すことがサポートされます。 CancellationToken を渡すと、フラッシュの保留中にトークンが取り消された場合に、OperationCanceledException となります。 PipeWriter.FlushAsync では、例外を発生させることなく、PipeWriter.CancelPendingFlush を使用して、現在のフラッシュ操作を取り消す方法がサポートされます。 PipeWriter.CancelPendingFlush を呼び出すと、PipeWriter.FlushAsync または PipeWriter.WriteAsync の現在の呼び出しあるいは次の呼び出しで、IsCanceledtrue に設定された FlushResult が返されます。 これは、非破壊的で非例外的な方法で生成されるフラッシュを停止する際に役立つ場合があります。

PipeWriter の一般的な問題

  • GetSpan および GetMemory では、少なくとも要求された量のメモリを持つバッファーを返します。 正確なバッファー サイズを想定しないでください
  • 連続する呼び出しで同じバッファーまたは同じサイズのバッファーが返される保証はありません。
  • さらにデータの書き込みを続行するには、Advance を呼び出した後に新しいバッファーを要求する必要があります。 以前に取得したバッファーに書き込むことはできません。
  • FlushAsync の呼び出しが不完全な場合に GetMemory または GetSpan を呼び出すのは安全ではありません。
  • フラッシュされていないデータがある場合に Complete または CompleteAsync を呼び出すと、メモリが破損する可能性があります。

PipeReader と PipeWriter を使うためのヒント

次のヒントは、System.IO.Pipelines クラスをうまく使うために役立ちます。

  • PipeReaderPipeWriter は、該当する場合は例外を含めて、常に完了させてください。
  • PipeReader.ReadAsync を呼び出した後に、常にPipeReader.AdvanceTo を呼び出します。
  • 書き込み中は定期的に awaitPipeWriter.FlushAsync を行い、FlushResult.IsCompleted を常に確認します。 IsCompletedtrue の場合は、リーダーが完了し、書き込まれた内容がどうでもよいことを示すので、書き込みを中止します。
  • PipeReader にアクセスさせるものを書き込んだ後、PipeWriter.FlushAsync を呼び出します。
  • FlushAsync の完了までにリーダーを開始できない場合は、デッドロックの原因となるため、FlushAsync を呼び出さないでください。
  • PipeReaderPipeWriter には、1 つのコンテキストのみがそれらを "所有" またはアクセスするようにします。 これらの型はスレッドセーフではありません。
  • AdvanceTo を呼び出したり、PipeReader が完了した後に ReadResult.Buffer にアクセスしないでください。

IDuplexPipe

IDuplexPipe は、読み取りと書き込みの両方をサポートする種類のコントラクトです。 たとえば、ネットワーク接続は IDuplexPipe によって表されます。

PipeReaderPipeWriter を含む Pipe とは異なり、IDuplexPipe は全二重接続の片側を表します。 つまり、PipeWriter に書き込まれるものは、PipeReader からは読み取られません。

ストリーム

ストリーム データの読み取りまたは書き込みを行う場合、通常は、デシリアライザーを使用してデータを読み取り、シリアライザーを使用してデータを書き込みます。 これらの読み取りおよび書き込みストリーム API のほとんどに、Stream パラメーターがあります。 これらの既存の API との統合をより容易にするために、PipeReader および PipeWriterAsStream メソッドが公開されます。 AsStream では、PipeReader または PipeWriter に関する Stream 実装を返します。

ストリームの例

PipeReader および PipeWriter インスタンスは、静的な Create メソッドを使用し、Stream オブジェクトとそれに対応する任意の作成オプションを指定して作成できます。

StreamPipeReaderOptions では、次のパラメーターを使用して PipeReader インスタンスの作成を制御できます。

  • StreamPipeReaderOptions.BufferSize はプールからメモリを借りるときに使用される最小バッファー サイズ (バイト単位) であり、既定値は 4096 です。
  • StreamPipeReaderOptions.LeaveOpen フラグによって、PipeReader の完了後、基礎となるストリームを開いたままにするかどうかが決定されます。既定値は false です。
  • StreamPipeReaderOptions.MinimumReadSize は、新しいバッファーが割り当てられる前のバッファー内に残るバイト数のしきい値を表します。既定値は 1024 です。
  • StreamPipeReaderOptions.Pool はメモリを割り当てるときに使用される MemoryPool<byte> であり、既定値は null です。

StreamPipeWriterOptions では、次のパラメーターを使用して PipeWriter インスタンスの作成を制御できます。

  • StreamPipeWriterOptions.LeaveOpen フラグによって、PipeWriter の完了後、基礎となるストリームを開いたままにするかどうかが決定されます。既定値は false です。
  • StreamPipeWriterOptions.MinimumBufferSize は、Pool からメモリをレンタルしているときに使用する最小バッファー サイズを表します。既定値は 4096 です。
  • StreamPipeWriterOptions.Pool はメモリを割り当てるときに使用される MemoryPool<byte> であり、既定値は null です。

重要

Create メソッドを使用して PipeReader および PipeWriter インスタンスを作成するとき、Stream オブジェクトの有効期間を考慮する必要があります。 リーダーまたはライターでストリームの利用が終わった後にストリームにアクセスする必要がある場合、作成オプションで LeaveOpen フラグを true に設定する必要があります。 設定しない場合、ストリームは閉じられます。

次のコードでは、ストリームからの Create メソッドを利用し、PipeReader および PipeWriter インスタンスが作成されます。

C#
using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

このアプリケーションによって StreamReader が使用され、ストリームとして lorem-ipsum.txt ファイルが読み取られます。これは空白行で終了する必要があります。 FileStream は、PipeReader オブジェクトをインスタンス化する PipeReader.Create に渡されます。 次に、コンソール アプリケーションから Console.OpenStandardOutput() を使用して PipeWriter.Create にその標準出力ストリームが渡されます。 この例ではキャンセルがサポートされます。