トレーニング
.NET の System.IO.Pipelines
System.IO.Pipelines は、.NET でハイ パフォーマンスの I/O をより簡単に行えるように設計されたライブラリです。 これは、すべての .NET 実装で動作する .NET Standard を対象とするライブラリです。
ライブラリは System.IO.Pipelines Nuget パッケージで入手できます。
ストリーミング データを解析するアプリは、多くの特殊な通常とは異なるコード フローを持つ定型コードで構成されます。 定型および特殊なケースのコードは複雑で、保守が困難です。
System.IO.Pipelines
は、以下のようになるように設計されています。
- ハイ パフォーマンスのストリーミング データ解析を実現する。
- コードの複雑さを軽減する。
次のコードは、クライアントから ('\n'
で区切られた) 行区切りメッセージを受信する TCP サーバーでは一般的なものです。
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
上記のコードには、以下のようないくつかの問題があります。
- メッセージ全体 (行の終わり) が、
ReadAsync
への 1 回の呼び出しで受信されない場合がある。 stream.ReadAsync
の結果が無視される。stream.ReadAsync
で、読み取られたデータの量が返される。- 複数の行が 1 回の
ReadAsync
呼び出しで読み取られるケースが処理されない。 - 読み取りごとに
byte
配列が割り当てられる。
上記の問題を解決するには、次の変更が必要です。
改行が検出されるまで、受信データをバッファーに格納します。
バッファーで返されたすべての行を解析します。
行が 1 KB (1024 バイト) より大きい可能性があります。 コードでは、バッファー内の完全な行に収まるように、区切り記号が見つかるまで入力バッファーのサイズを変更する必要があります。
- バッファーのサイズが変更された場合、入力により長い行が表示されると、より多くのバッファー コピーが作成されます。
- 無駄な領域を減らすには、行の読み取りに使用されるバッファーを小さくします。
メモリが繰り返し割り当てられないようにするために、バッファー プーリングの使用を検討します。
次のコードでは、これらの問題の一部に対処します。
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
上記のコードは複雑で、特定されたすべての問題には対処していません。 ハイ パフォーマンス ネットワークは、通常、パフォーマンスを最大化するために複雑なコードを記述することを意味します。 System.IO.Pipelines
は、この種のコードをより簡単に記述できるように設計されています。
Pipe クラスを使用して、PipeWriter/PipeReader
ペアを作成できます。 PipeWriter
に書き込まれたすべてのデータは、PipeReader
で利用できます。
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
次の 2 つのループがあります。
FillPipeAsync
ではSocket
から読み取り、PipeWriter
に書き込みます。ReadPipeAsync
ではPipeReader
から読み取り、受信行を解析します。
明示的なバッファーは割り当てられていません。 すべてのバッファー管理は、PipeReader
と PipeWriter
の実装に委任されます。 バッファー管理を委任することで、ビジネス ロジックのみに焦点を当てるコードがより簡単に消費されるようになります。
最初のループでは、次のようになります。
- PipeWriter.GetMemory(Int32) は、基になるライターからメモリを取得するために呼び出されます。
- PipeWriter.Advance(Int32) は、バッファーに書き込まれたデータの量を
PipeWriter
に示すために呼び出されます。 - PipeWriter.FlushAsync は、データを
PipeReader
で使用できるようにするために呼び出されます。
2 番目のループでは、PipeWriter
によって書き込まれたバッファーが PipeReader
で消費されます。 バッファーはソケットから取得されます。 PipeReader.ReadAsync
の呼び出しでは、次のようになります。
次の 2 つの重要な情報を含む、ReadResult を返します。
ReadOnlySequence<byte>
の形式で読み取られたデータ。- データの終わり (EOF) に到達したかどうかを示すブール値
IsCompleted
。
行の終わり (EOL) の区切り記号が検出され、行が解析された後は、次のようになります。
- ロジックでバッファーが処理され、既に処理されているものがスキップされます。
PipeReader.AdvanceTo
は、どのくらいの量のデータが使用され、検査されたかをPipeReader
に示すために呼び出されます。
リーダーとライターのループは、Complete
を呼び出すことによって終了します。 Complete
は、基になるパイプで割り当てられたメモリを解放できるようにします。
読み取りと解析を連動させるのが理想的です。
- 読み取りスレッドでは、ネットワークからのデータを消費し、それをバッファーに格納します。
- 解析スレッドでは、適切なデータ構造の構築を担当します。
通常、解析には、ネットワークからデータのブロックをコピーするだけの場合より、時間がかかります。
- 読み取りスレッドは解析スレッドより前になります。
- 読み取りスレッドの速度を落とすか、解析スレッド用のデータを格納するためにより多くのメモリを割り当てる必要があります。
最適なパフォーマンスが得られるように、頻繁な一時停止間のバランスが保たれ、より多くのメモリが割り当てられます。
この問題を解決するために、Pipe
には、データのフローを制御するための次の 2 つの設定があります。
- PauseWriterThreshold:FlushAsync の呼び出しが一時停止する前に、バッファーに格納する必要があるデータ量を判別します。
- ResumeWriterThreshold:
PipeWriter.FlushAsync
の呼び出しが再開される前に、リーダーが監視する必要があるデータの量を判別します。
Pipe
のデータ量がPauseWriterThreshold
を超えたときに、不完全なValueTask<FlushResult>
を返します。ResumeWriterThreshold
より低くなったときに、ValueTask<FlushResult>
を完了します。
2 つの値は、1 つの値が使用された場合に発生する可能性がある、急速な循環を防ぐために使用されます。
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
通常、async
および await
を使用する場合、非同期コードは TaskScheduler または現在の SynchronizationContext で再開されます。
I/O を行う場合は、I/O が実行される場所をきめ細かく制御することが重要です。 この制御により、CPU キャッシュを効果的に利用できます。 効率的なキャッシュは、Web サーバーなどのハイ パフォーマンス アプリに不可欠です。 PipeScheduler では、非同期コールバックが実行される場所を制御できます。 既定では:
- 現在の SynchronizationContext が使用されます。
SynchronizationContext
がない場合は、スレッド プールを使用してコールバックを実行します。
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool は、スレッド プールへのコールバックをキューに登録する PipeScheduler の実装です。 PipeScheduler.ThreadPool
は既定値であり、一般的に最適な選択肢です。 PipeScheduler.Inline を使用すると、デッドロックなどの意図しない結果となる可能性があります。
多くの場合、Pipe
オブジェクトを再利用するのが効率的です。 パイプをリセットするには、PipeReader
と PipeWriter
の両方が完了したときに PipeReader Reset を呼び出します。
PipeReader では、呼び出し元の代わりにメモリを管理します。 PipeReader.ReadAsync を呼び出した後に、常にPipeReader.AdvanceTo を呼び出します。 これにより、PipeReader
では、呼び出し元がメモリを使用して実行されるタイミングを把握し、追跡できるようになります。 PipeReader.ReadAsync
から返された ReadOnlySequence<byte>
が有効なのは、PipeReader.AdvanceTo
が呼び出されるまでのみとなります。 PipeReader.AdvanceTo
を呼び出した後、ReadOnlySequence<byte>
を使用することはできません。
PipeReader.AdvanceTo
では、次の 2 つの SequencePosition 引数を受け取ります。
- 最初の引数では、消費されたメモリの量を判別します。
- 2 番目の引数では、監視されたバッファーの量を判別します。
データを消費済みとしてマークすることは、パイプでメモリを基になるバッファープ ールに返すことができることを意味します。 データを監視済みとしてマークすると、PipeReader.ReadAsync
の次の呼び出しで行われる内容が制御されます。 すべてを監視済みとしてマークすることは、パイプにデータがさらに書き込まれるまで、PipeReader.ReadAsync
の次の呼び出しでは何も返されないことを意味します。 それ以外の値を指定すると、PipeReader.ReadAsync
の次の呼び出しで、監視されたデータと 監視されていないデータがすぐに返されますが、既に消費されているデータは除きます。
ストリーミング データを読み取ろうとすると発生する、一般的なパターンがいくつかあります。
- 指定したデータのストリームで、1 つのメッセージを解析する。
- 指定したデータのストリームで、使用可能なメッセージをすべて解析する。
次の例では、ReadOnlySequence<byte>
からのメッセージを解析するために TryParseLines
メソッドを使用しています。 TryParseLines
では 1 つのメッセージを解析し、入力バッファーを更新して、解析されたメッセージをバッファーからトリミングします。 TryParseLines
は .NET の一部ではありません。これは、次のセクションで使用されるユーザーが記述するメソッドです。
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
次のコードでは、PipeReader
からの 1 つのメッセージを読み取り、呼び出し元に返します。
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
上記のコードでは次の操作が行われます。
- 1 つのメッセージを解析します。
- 消費された
SequencePosition
と検査されたSequencePosition
を更新し、トリミングされた入力バッファーの先頭を指すようにします。
2 つの SequencePosition
引数が更新されます。これは、TryParseLines
で解析されたメッセージが入力バッファーから削除されるためです。 一般に、バッファーからの 1 つのメッセージを解析する場合、検査位置は次のいずれかである必要があります。
- メッセージの終わり。
- メッセージが検出されなかった場合は、受信されたバッファーの終わり。
1 つのメッセージ ケースでは、エラーが発生する可能性が最も高くなります。 examined に間違った値を渡すと、メモリ不足の例外または無限ループが発生する可能性があります。 詳細については、この記事の「PipeReader の一般的な問題」セクションを参照してください。
次のコードでは、PipeReader
からのすべてのメッセージを読み取り、それぞれに対して ProcessMessageAsync
を呼び出します。
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader.ReadAsync
=
- CancellationToken を渡すことをサポートします。
- 読み取りが保留中のときに
CancellationToken
が取り消された場合は、OperationCanceledException をスローします。 - PipeReader.CancelPendingRead を使用して現在の読み取り操作を取り消す方法をサポートします。これにより、例外の発生を回避できます。
PipeReader.CancelPendingRead
を呼び出すと、PipeReader.ReadAsync
の現在の呼び出しまたは次の呼び出しで、IsCanceled
がtrue
に設定された ReadResult が返されます。 これは、既存の読み取りループを非破壊的で非例外的な方法で停止する際に役立つ場合があります。
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
consumed
またはexamined
に間違った値を渡すと、既に読み取られているデータが読み取られる場合があります。buffer.End
を検査済みとして渡すと、次のようになる場合があります。- データが停止する
- データが消費されていない場合、最終的にメモリ不足 (OOM) 例外が発生する可能性があります。 たとえば、バッファーから一度に 1 つのメッセージを処理する場合は、
PipeReader.AdvanceTo(position, buffer.End)
です。
consumed
またはexamined
に間違った値を渡すと、無限ループが発生する可能性があります。 たとえば、buffer.Start
が変更されていない場合、PipeReader.AdvanceTo(buffer.Start)
では、新しいデータが到着する直前にPipeReader.ReadAsync
への次の呼び出しで制御が返されます。consumed
またはexamined
に間違った値を渡すと、無限バッファーリング (最終的には OOM) が発生する可能性があります。PipeReader.AdvanceTo
を呼び出した後にReadOnlySequence<byte>
を使用すると、メモリが破損する可能性があります (解放後使用)。PipeReader.Complete/CompleteAsync
を呼び出せないと、メモリ リークが発生する可能性があります。バッファーを処理する前に ReadResult.IsCompleted を確認し、読み取りロジックを終了すると、データが失われます。 ループの終了条件は、
ReadResult.Buffer.IsEmpty
およびReadResult.IsCompleted
に基づいている必要があります。 これを誤って実行すると、無限ループになる可能性があります。
❌データ損失
IsCompleted
が true
に設定されている場合、ReadResult
でデータの最終セグメントを返すことができます。 読み取りループを終了する前にデータを読み取らないと、データが失われます。
警告
次のコードは使用しないでください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
警告
上記のコードは使用しない でください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。
❌無限ループ
次のロジックでは、Result.IsCompleted
が true
でも、バッファー内に完全なメッセージがない場合は、無限ループが発生する可能性があります。
警告
次のコードは使用しないでください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
警告
上記のコードは使用しない でください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。
同じ問題があるもう 1 つのコードを次に示します。 ReadResult.IsCompleted
を確認する前に、空でないバッファーがあるかどうかが確認されます。 これは else if
にあるため、バッファー内に完全なメッセージがない場合は、無限にループされます。
警告
次のコードは使用しないでください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
警告
上記のコードは使用しない でください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。
❌応答しないアプリケーション
examined
の位置で buffer.End
を使用して PipeReader.AdvanceTo
を無条件に呼び出すと、1 つのメッセージを解析するときにアプリケーションが応答しなくなる可能性があります。 PipeReader.AdvanceTo
の次の呼び出しでは、以下のようになるまで何も返されません。
- パイプにさらにデータが書き込まれている。
- また、新しいデータが以前に検査されていない。
警告
次のコードは使用しないでください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
警告
上記のコードは使用しない でください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。
❌メモリ不足 (OOM)
次のような状況の場合、以下のコードでは、OutOfMemoryException が発生するまでバッファーが保持されます。
- 最大メッセージ サイズがない。
PipeReader
から返されたデータで、完全なメッセージが作成されない。 たとえば、他の側で大きなメッセージ (4 GB のメッセージなど) を書き込んでいるため、完全なメッセージが作成されていない。
警告
次のコードは使用しないでください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
警告
上記のコードは使用しない でください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。
❌メモリの破損
バッファーを読み取るヘルパーを記述するときは、返されたすべてのペイロードをコピーしてから Advance
を呼び出す必要があります。 次の例では、Pipe
で破棄されたメモリを返し、次の操作 (読み取り/書き込み) でそれを再利用する可能性があります。
警告
次のコードは使用しないでください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
警告
上記のコードは使用しない でください。 このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。
PipeWriter では、呼び出し元の代わりに書き込むバッファーを管理します。 PipeWriter
は、IBufferWriter<byte>
を実装します。 IBufferWriter<byte>
は、バッファーのコピーを追加せずに、書き込みを実行するためにバッファーにアクセスできるようにします。
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
上記のコードでは、次のようになります。
- GetMemory を使用して、
PipeWriter
から少なくとも 5 バイトのバッファーを要求します。 - ASCII 文字列である
"Hello"
のバイトを、返されたMemory<byte>
に書き込みます。 - Advance を呼び出して、バッファーに書き込まれたバイト数を示します。
- 基になるデバイスにバイトを送信する、
PipeWriter
をフラッシュします。
上記の書き込みメソッドでは、PipeWriter
によって提供されるバッファーが使用されています。 PipeWriter.WriteAsync を使用して、以下を行うこともできます。
- 既存のバッファーを
PipeWriter
にコピーします。 GetSpan
を呼び出し、必要に応じてAdvance
を呼び出してから、FlushAsync を呼び出します。
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
FlushAsync では、CancellationToken を渡すことがサポートされます。 CancellationToken
を渡すと、フラッシュの保留中にトークンが取り消された場合に、OperationCanceledException
となります。 PipeWriter.FlushAsync
では、例外を発生させることなく、PipeWriter.CancelPendingFlush を使用して、現在のフラッシュ操作を取り消す方法がサポートされます。 PipeWriter.CancelPendingFlush
を呼び出すと、PipeWriter.FlushAsync
または PipeWriter.WriteAsync
の現在の呼び出しあるいは次の呼び出しで、IsCanceled
が true
に設定された FlushResult が返されます。 これは、非破壊的で非例外的な方法で生成されるフラッシュを停止する際に役立つ場合があります。
- GetSpan および GetMemory では、少なくとも要求された量のメモリを持つバッファーを返します。 正確なバッファー サイズを想定しないでください。
- 連続する呼び出しで同じバッファーまたは同じサイズのバッファーが返される保証はありません。
- さらにデータの書き込みを続行するには、Advance を呼び出した後に新しいバッファーを要求する必要があります。 以前に取得したバッファーに書き込むことはできません。
FlushAsync
の呼び出しが不完全な場合にGetMemory
またはGetSpan
を呼び出すのは安全ではありません。- フラッシュされていないデータがある場合に
Complete
またはCompleteAsync
を呼び出すと、メモリが破損する可能性があります。
次のヒントは、System.IO.Pipelines クラスをうまく使うために役立ちます。
- PipeReader と PipeWriter は、該当する場合は例外を含めて、常に完了させてください。
- PipeReader.ReadAsync を呼び出した後に、常にPipeReader.AdvanceTo を呼び出します。
- 書き込み中は定期的に
await
の PipeWriter.FlushAsync を行い、FlushResult.IsCompleted を常に確認します。IsCompleted
がtrue
の場合は、リーダーが完了し、書き込まれた内容がどうでもよいことを示すので、書き込みを中止します。 PipeReader
にアクセスさせるものを書き込んだ後、PipeWriter.FlushAsync を呼び出します。FlushAsync
の完了までにリーダーを開始できない場合は、デッドロックの原因となるため、FlushAsync
を呼び出さないでください。PipeReader
やPipeWriter
には、1 つのコンテキストのみがそれらを "所有" またはアクセスするようにします。 これらの型はスレッドセーフではありません。AdvanceTo
を呼び出したり、PipeReader
が完了した後に ReadResult.Buffer にアクセスしないでください。
IDuplexPipe は、読み取りと書き込みの両方をサポートする種類のコントラクトです。 たとえば、ネットワーク接続は IDuplexPipe
によって表されます。
PipeReader
と PipeWriter
を含む Pipe
とは異なり、IDuplexPipe
は全二重接続の片側を表します。 つまり、PipeWriter
に書き込まれるものは、PipeReader
からは読み取られません。
ストリーム データの読み取りまたは書き込みを行う場合、通常は、デシリアライザーを使用してデータを読み取り、シリアライザーを使用してデータを書き込みます。 これらの読み取りおよび書き込みストリーム API のほとんどに、Stream
パラメーターがあります。 これらの既存の API との統合をより容易にするために、PipeReader
および PipeWriter
で AsStream メソッドが公開されます。 AsStream では、PipeReader
または PipeWriter
に関する Stream
実装を返します。
PipeReader
および PipeWriter
インスタンスは、静的な Create
メソッドを使用し、Stream オブジェクトとそれに対応する任意の作成オプションを指定して作成できます。
StreamPipeReaderOptions では、次のパラメーターを使用して PipeReader
インスタンスの作成を制御できます。
- StreamPipeReaderOptions.BufferSize はプールからメモリを借りるときに使用される最小バッファー サイズ (バイト単位) であり、既定値は
4096
です。 - StreamPipeReaderOptions.LeaveOpen フラグによって、
PipeReader
の完了後、基礎となるストリームを開いたままにするかどうかが決定されます。既定値はfalse
です。 - StreamPipeReaderOptions.MinimumReadSize は、新しいバッファーが割り当てられる前のバッファー内に残るバイト数のしきい値を表します。既定値は
1024
です。 - StreamPipeReaderOptions.Pool はメモリを割り当てるときに使用される
MemoryPool<byte>
であり、既定値はnull
です。
StreamPipeWriterOptions では、次のパラメーターを使用して PipeWriter
インスタンスの作成を制御できます。
- StreamPipeWriterOptions.LeaveOpen フラグによって、
PipeWriter
の完了後、基礎となるストリームを開いたままにするかどうかが決定されます。既定値はfalse
です。 - StreamPipeWriterOptions.MinimumBufferSize は、Pool からメモリをレンタルしているときに使用する最小バッファー サイズを表します。既定値は
4096
です。 - StreamPipeWriterOptions.Pool はメモリを割り当てるときに使用される
MemoryPool<byte>
であり、既定値はnull
です。
重要
Create
メソッドを使用して PipeReader
および PipeWriter
インスタンスを作成するとき、Stream
オブジェクトの有効期間を考慮する必要があります。 リーダーまたはライターでストリームの利用が終わった後にストリームにアクセスする必要がある場合、作成オプションで LeaveOpen
フラグを true
に設定する必要があります。 設定しない場合、ストリームは閉じられます。
次のコードでは、ストリームからの Create
メソッドを利用し、PipeReader
および PipeWriter
インスタンスが作成されます。
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
このアプリケーションによって StreamReader が使用され、ストリームとして lorem-ipsum.txt ファイルが読み取られます。これは空白行で終了する必要があります。 FileStream は、PipeReader
オブジェクトをインスタンス化する PipeReader.Create に渡されます。 次に、コンソール アプリケーションから Console.OpenStandardOutput() を使用して PipeWriter.Create にその標準出力ストリームが渡されます。 この例ではキャンセルがサポートされます。
.NET に関するフィードバック
.NET はオープンソース プロジェクトです。 フィードバックを提供するにはリンクを選択します。