Pelatihan
Modul
Menyambungkan perintah ke dalam alur - Training
Dalam modul ini, Anda akan mempelajari cara menghubungkan perintah ke dalam alur.
Browser ini sudah tidak didukung.
Mutakhirkan ke Microsoft Edge untuk memanfaatkan fitur, pembaruan keamanan, dan dukungan teknis terkini.
System.IO.Pipelines adalah pustaka yang dirancang untuk mempermudah melakukan I/O berkinerja tinggi di .NET. Ini adalah pustaka yang menargetkan .NET Standard yang berfungsi pada semua implementasi .NET.
Pustaka tersedia dalam paket Nuget System.IO.Pipelines.
Aplikasi yang mengurai data streaming terdiri dari kode boilerplate yang memiliki banyak alur kode khusus dan tidak biasa. Boilerplate dan kode kasus khusus rumit dan sulit untuk dipertahankan.
System.IO.Pipelines
dirancang untuk:
Kode berikut ini khas untuk server TCP yang menerima pesan yang dibatasi baris (dibatasi oleh '\n'
) dari klien:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Kode sebelumnya memiliki beberapa masalah:
ReadAsync
.stream.ReadAsync
. stream.ReadAsync
mengembalikan berapa banyak data yang dibaca.ReadAsync
.byte
dengan setiap bacaan.Untuk memperbaiki masalah sebelumnya, diperlukan perubahan berikut:
Buffer data masuk hingga baris baru ditemukan.
Uraikan semua garis yang dikembalikan dalam buffer.
Ada kemungkinan bahwa garis lebih besar dari 1 KB (1024 byte). Kode perlu mengubah ukuran buffer input sampai pemisah ditemukan agar sesuai dengan baris lengkap di dalam buffer.
Mempertimbangkan untuk menggunakan pengumpulan buffer untuk menghindari alokasi memori berulang kali.
Kode berikut membahas beberapa masalah ini:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Kode sebelumnya rumit dan tidak mengatasi semua masalah yang diidentifikasi. Jaringan berkinerja tinggi biasanya berarti menulis kode kompleks untuk memaksimalkan performa. System.IO.Pipelines
dirancang untuk membuat penulisan jenis kode ini lebih mudah.
Kelas Pipe dapat digunakan untuk membuat pasangan PipeWriter/PipeReader
. Semua data yang ditulis ke PipeWriter
tersedia di dalam PipeReader
:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Terdapat dua kasus:
FillPipeAsync
membaca dari Socket
dan menulis ke PipeWriter
.ReadPipeAsync
membaca dari PipeReader
dan mengurai baris masuk.Tidak ada buffer eksplisit yang dialokasikan. Semua manajemen buffer didelegasikan ke implementasi PipeReader
dan PipeWriter
. Mendelegasikan manajemen buffer memudahkan penggunaan kode untuk hanya berfokus pada logika bisnis.
Di perulangan pertama:
PipeWriter
seberapa banyak data yang ditulis ke buffer.PipeReader
.Dalam perulangan kedua, PipeReader
mengonsumsi buffer yang ditulis oleh PipeWriter
. Buffer berasal dari soket. Panggilan ke PipeReader.ReadAsync
:
Mengembalikan ReadResult yang berisi dua informasi penting:
ReadOnlySequence<byte>
.IsCompleted
yang menunjukkan apakah akhir data (EOF) telah tercapai.Setelah menemukan pemisah akhir baris (EOL) dan mengurai baris:
PipeReader.AdvanceTo
dipanggil untuk memberi tahu PipeReader
seberapa banyak data yang telah digunakan dan diperiksa.Pembaca dan penulis mengulang berakhir dengan memanggil Complete
. Complete
memungkinkan Pipa yang mendasar merilis memori yang dialokasikannya.
Idealnya, membaca dan mengurai bekerja sama:
Biasanya, penguraian membutuhkan lebih banyak waktu daripada hanya menyalin blok data dari jaringan:
Untuk performa optimal, terdapat keseimbangan antara jeda yang sering dan mengalokasikan lebih banyak memori.
Untuk mengatasi masalah sebelumnya, Pipe
memiliki dua pengaturan untuk mengontrol aliran data:
PipeWriter.FlushAsync
dilanjutkan.ValueTask<FlushResult>
yang tidak lengkap saat jumlah data dalam silang Pipe
PauseWriterThreshold
.ValueTask<FlushResult>
ketika menjadi lebih rendah dari ResumeWriterThreshold
.Dua nilai digunakan untuk mencegah bersepeda cepat, yang dapat terjadi jika satu nilai digunakan.
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
Biasanya saat menggunakan async
dan await
, kode asinkron dilanjutkan pada TaskScheduler atau saat ini SynchronizationContext.
Saat melakukan I/O, penting untuk memiliki kontrol halus atas tempat I/O dilakukan. Kontrol ini memungkinkan memanfaatkan cache CPU secara efektif. Penembolokan yang efisien sangat penting untuk aplikasi berkinerja tinggi seperti server web. PipeScheduler menyediakan kontrol atas tempat panggilan balik asinkron berjalan. Secara default:
SynchronizationContext
, hal tersebut menggunakan kumpulan utas untuk menjalankan panggilan balik.public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool adalah implementasi PipeScheduler yang mengantrekan panggilan balik ke kumpulan utas. PipeScheduler.ThreadPool
adalah default dan umumnya pilihan terbaik. PipeScheduler.Inline dapat menyebabkan konsekuensi yang tidak diinginkan seperti kebuntuan.
Sering kali efisien untuk menggunakan kembali objek Pipe
. Untuk mengatur ulang pipa, panggil PipeReader Reset saat PipeReader
dan PipeWriter
selesai.
PipeReader mengelola memori atas nama pemanggil. Selalu panggil PipeReader.AdvanceTo setelah memanggil PipeReader.ReadAsync. Hal ini memberi tahu PipeReader
kapan penelepon dilakukan dengan memori sehingga dapat dilacak. ReadOnlySequence<byte>
dikembalikan dari PipeReader.ReadAsync
hanya valid sampai panggilan PipeReader.AdvanceTo
. Hal ini ilegal untuk digunakan ReadOnlySequence<byte>
setelah memanggil PipeReader.AdvanceTo
.
PipeReader.AdvanceTo
mengambil dua argumen SequencePosition:
Menandai data sebagai digunakan berarti bahwa pipa dapat mengembalikan memori ke kumpulan buffer yang mendasar. Menandai data sebagai diamati mengontrol apa yang akan dilakukan panggilan PipeReader.ReadAsync
berikutnya. Menandai semuanya seperti yang diamati berarti bahwa panggilan berikutnya ke PipeReader.ReadAsync
tidak akan kembali sampai ada lebih banyak data yang ditulis ke pipa. Nilai lain akan melakukan panggilan berikutnya untuk PipeReader.ReadAsync
segera kembali dengan data yang diamati dan tidak ditangguhkan, tetapi bukan data yang telah digunakan.
Terdapat beberapa pola khas yang muncul saat mencoba membaca data streaming:
Contoh berikut menggunakan metode TryParseLines
untuk mengurai pesan dari ReadOnlySequence<byte>
. TryParseLines
mengurai satu pesan dan memperbarui buffer input untuk memangkas pesan yang diurai dari buffer. TryParseLines
bukan bagian dari .NET, ini adalah metode tertulis pengguna yang digunakan di bagian berikut.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Kode berikut membaca satu pesan dari PipeReader
dan mengembalikannya ke pemanggil.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Kode sebelumnya:
SequencePosition
dan diperiksa SequencePosition
untuk menunjuk ke awal buffer input yang dipangkas.Dua argumen SequencePosition
diperbarui karena TryParseLines
menghapus pesan yang diurai dari buffer input. Umumnya, saat mengurai satu pesan dari buffer, posisi yang diperiksa harus menjadi salah satu hal berikut:
Kasus pesan tunggal memiliki potensi kesalahan paling besar. Meneruskan nilai yang salah untuk diperiksa dapat mengakibatkan pengecualian kehabisan memori atau perulangan tak terbatas. Untuk informasi selengkapnya, lihat bagian Masalah umum PipeReader di artikel ini.
Kode berikut membaca semua pesan dari PipeReader
dan panggilan ProcessMessageAsync
pada masing-masing pesan.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader.ReadAsync
:
CancellationToken
dibatalkan saat ada pembacaan tertunda.PipeReader.CancelPendingRead
menyebabkan panggilan PipeReader.ReadAsync
saat ini atau berikutnya untuk mengembalikan ReadResult dengan IsCanceled
yang diatur ke true
. Hal ini dapat berguna untuk menghentikan perulangan baca yang ada dengan cara yang tidak merusak dan tidak luar biasa.private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Meneruskan nilai yang salah ke consumed
atau examined
dapat mengakibatkan membaca data yang sudah dibaca.
Lulus buffer.End
sebagaimana diperiksa dapat mengakibatkan:
PipeReader.AdvanceTo(position, buffer.End)
saat memproses satu pesan pada satu waktu dari buffer.Meneruskan nilai yang salah ke consumed
atau examined
dapat mengakibatkan perulangan tak terbatas. Misalnya, PipeReader.AdvanceTo(buffer.Start)
jika buffer.Start
belum berubah akan menyebabkan panggilan berikutnya ke PipeReader.ReadAsync
segera kembali sebelum data baru tiba.
Meneruskan nilai yang salah ke consumed
atau examined
dapat mengakibatkan buffering tak terbatas (akhirnya OOM).
Menggunakan ReadOnlySequence<byte>
setelah panggilan dapat mengakibatkan PipeReader.AdvanceTo
kerusakan memori (digunakan setelah bebas).
Gagal memanggil PipeReader.Complete/CompleteAsync
dapat mengakibatkan kebocoran memori.
Memeriksa ReadResult.IsCompleted dan keluar dari logika pembacaan sebelum memproses buffer mengalihkan kehilangan data. Kondisi keluar perulangan harus didasarkan pada ReadResult.Buffer.IsEmpty
dan ReadResult.IsCompleted
. Melakukan ini dengan tidak benar dapat mengakibatkan perulangan tak terbatas.
❌Kehilangan data
ReadResult
dapat mengembalikan segmen akhir data saat IsCompleted
diatur ke true
. Tidak membaca data tersebut sebelum keluar dari perulangan baca akan mengakibatkan kehilangan data.
Peringatan
JANGAN gunakan kode berikut. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel berikut disediakan untuk menjelaskan masalah Umum PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Peringatan
JANGAN gunakan kode sebelumnya. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel sebelumnya disediakan untuk menjelaskan masalah Umum PipeReader.
❌Loop tak terbatas
Logika berikut dapat mengakibatkan perulangan tak terbatas jika Result.IsCompleted
ada true
tetapi tidak pernah ada pesan lengkap dalam buffer.
Peringatan
JANGAN gunakan kode berikut. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel berikut disediakan untuk menjelaskan masalah Umum PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Peringatan
JANGAN gunakan kode sebelumnya. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel sebelumnya disediakan untuk menjelaskan masalah Umum PipeReader.
Berikut adalah bagian lain dari kode dengan masalah yang sama. Hal ini memeriksa buffer yang tidak kosong sebelum memeriksa ReadResult.IsCompleted
. Karena dalam else if
, itu akan berulang selamanya jika tidak pernah ada pesan lengkap dalam buffer.
Peringatan
JANGAN gunakan kode berikut. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel berikut disediakan untuk menjelaskan masalah Umum PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Peringatan
JANGAN gunakan kode sebelumnya. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel sebelumnya disediakan untuk menjelaskan masalah Umum PipeReader.
❌Aplikasi yang tidak responsif
Panggilan PipeReader.AdvanceTo
tanpa syarat dengan buffer.End
dalam posisi examined
dapat mengakibatkan aplikasi menjadi tidak responsif saat mengurai satu pesan. Panggilan berikutnya untuk PipeReader.AdvanceTo
tidak akan kembali hingga:
Peringatan
JANGAN gunakan kode berikut. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel berikut disediakan untuk menjelaskan masalah Umum PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Peringatan
JANGAN gunakan kode sebelumnya. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel sebelumnya disediakan untuk menjelaskan masalah Umum PipeReader.
❌Kehabisan Memori (OOM)
Dengan kondisi berikut, kode berikut terus buffering hingga OutOfMemoryException terjadi:
PipeReader
tidak membuat pesan lengkap. Misalnya, itu tidak membuat pesan lengkap karena sisi lain menulis pesan besar (Misalnya, pesan 4 GB).Peringatan
JANGAN gunakan kode berikut. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel berikut disediakan untuk menjelaskan masalah Umum PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Peringatan
JANGAN gunakan kode sebelumnya. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel sebelumnya disediakan untuk menjelaskan masalah Umum PipeReader.
❌Korupsi Memori
Saat menulis pembantu yang membaca buffer, payload yang dikembalikan harus disalin sebelum memanggil Advance
. Contoh berikut akan mengembalikan memori Pipe
yang telah dibuang dan dapat menggunakannya kembali untuk operasi berikutnya (baca/tulis).
Peringatan
JANGAN gunakan kode berikut. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel berikut disediakan untuk menjelaskan masalah Umum PipeReader.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Peringatan
JANGAN gunakan kode sebelumnya. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel sebelumnya disediakan untuk menjelaskan masalah Umum PipeReader.
PipeWriter mengelola buffer untuk menulis atas nama pemanggil. PipeWriter
penerapan IBufferWriter<byte>
. IBufferWriter<byte>
memungkinkan untuk mendapatkan akses ke buffer untuk melakukan penulisan tanpa salinan buffer tambahan.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Kode sebelumnya:
PipeWriter
GetMemory."Hello"
yang dikembalikan Memory<byte>
.PipeWriter
, yang mengirim byte ke perangkat yang mendasar.Metode penulisan sebelumnya menggunakan buffer yang disediakan oleh PipeWriter
. Hal ini juga bisa menggunakan PipeWriter.WriteAsync, yang:
PipeWriter
.GetSpan
, Advance
sebagaimana merujuk dan memanggil FlushAsync.async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
FlushAsync mendukung melewati CancellationToken. Meneruskan hasil CancellationToken
di OperationCanceledException
jika token dibatalkan saat ada flush tertunda. PipeWriter.FlushAsync
mendukung cara untuk membatalkan operasi flush saat ini melalui PipeWriter.CancelPendingFlush tanpa menaikkan pengecualian. Panggilan PipeWriter.CancelPendingFlush
menyebabkan panggilan saat ini atau berikutnya ke PipeWriter.FlushAsync
atau PipeWriter.WriteAsync
untuk mengembalikan FlushResult dengan IsCanceled
yang diatur ke true
. Hal ini dapat berguna untuk menghentikan flush hasil dengan cara yang tidak merusak dan tidak luar biasa.
GetMemory
atau GetSpan
saat ada panggilan yang tidak lengkap ke FlushAsync
tidak aman.Complete
atau CompleteAsync
saat ada data yang dibongkar dapat mengakibatkan kerusakan memori.Tips berikut akan membantu Anda berhasil menggunakan kelas System.IO.Pipelines:
await
PipeWriter.FlushAsync saat menulis, dan selalu periksa FlushResult.IsCompleted. Batalkan penulisan jika IsCompleted
adalah true
, seperti yang menunjukkan pembaca selesai dan tidak lagi peduli tentang apa yang ditulis.PipeReader
yang ingin Anda akses.FlushAsync
jika pembaca tidak dapat memulai sampai FlushAsync
selesai, karena dapat menyebabkan kebuntuan.PipeReader
atau PipeWriter
atau mengaksesnya. Jenis-jenis ini tidak aman untuk utas.AdvanceTo
atau menyelesaikan PipeReader
.IDuplexPipe adalah kontrak untuk jenis yang mendukung pembacaan dan penulisan. Misalnya, koneksi jaringan akan diwakili oleh IDuplexPipe
.
Tidak seperti Pipe
, yang berisi PipeReader
dan PipeWriter
, IDuplexPipe
mewakili satu sisi koneksi dupleks penuh. Hal itu berarti apa yang ditulis ke PipeWriter
tidak akan dibaca dari PipeReader
.
Saat membaca atau menulis data aliran, Anda biasanya membaca data menggunakan de-serializer dan menulis data menggunakan serializer. Sebagian besar API aliran baca dan tulis ini memiliki parameter Stream
. Untuk mempermudah integrasi dengan API yang ada ini, PipeReader
dan PipeWriter
mengekspos metode AsStream. AsStream mengembalikan implementasi Stream
di sekitar PipeReader
atau PipeWriter
.
Instans PipeReader
dan PipeWriter
dapat dibuat menggunakan metode statis Create
yang diberikan objek Stream dan opsi pembuatan yang sesuai opsional.
StreamPipeReaderOptions izinkan kontrol atas pembuatan instans PipeReader
dengan parameter berikut:
4096
.PipeReader
selesai, dan default ke false
.1024
.MemoryPool<byte>
saat mengalokasikan memori, dan default ke null
.StreamPipeWriterOptions izinkan kontrol atas pembuatan instans PipeWriter
dengan parameter berikut:
PipeWriter
selesai, dan default ke false
.4096
.MemoryPool<byte>
saat mengalokasikan memori, dan default ke null
.Penting
Saat membuat instans PipeReader
dan PipeWriter
menggunakan metode Create
, Anda perlu mempertimbangkan masa pakai objek Stream
. Jika Anda memerlukan akses ke aliran setelah pembaca atau penulis selesai dengannya, Anda harus mengatur bendera LeaveOpen
ke true
pada opsi pembuatan. Jika tidak, aliran akan ditutup.
Kode berikut menunjukkan pembuatan instans PipeReader
dan PipeWriter
menggunakan metode Create
dari aliran.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
Aplikasi ini menggunakan StreamReader untuk membaca file lorem-ipsum.txt sebagai aliran, dan harus diakhir dengan baris kosong. FileStream diteruskan ke PipeReader.Create, yang membuat instans objek PipeReader
. Aplikasi konsol kemudian meneruskan aliran output standarnya untuk PipeWriter.Create yang menggunakan Console.OpenStandardOutput(). Contoh mendukung pembatalan.
Umpan balik .NET
.NET adalah proyek sumber terbuka. Pilih tautan untuk memberikan umpan balik:
Pelatihan
Modul
Menyambungkan perintah ke dalam alur - Training
Dalam modul ini, Anda akan mempelajari cara menghubungkan perintah ke dalam alur.