System.IO.Pipelines di .NET

System.IO.Pipelines adalah pustaka yang dirancang untuk mempermudah melakukan I/O berkinerja tinggi di .NET. Ini adalah pustaka yang menargetkan .NET Standard yang berfungsi pada semua implementasi .NET.

Pustaka tersedia dalam paket Nuget System.IO.Pipelines.

Masalah yang diselesaikan System.IO.Pipelines

Aplikasi yang mengurai data streaming terdiri dari kode boilerplate yang memiliki banyak alur kode khusus dan tidak biasa. Boilerplate dan kode kasus khusus rumit dan sulit untuk dipertahankan.

System.IO.Pipelines dirancang untuk:

  • Memiliki data streaming penguraian performa tinggi.
  • Mengurangi kompleksitas kode.

Kode berikut ini khas untuk server TCP yang menerima pesan yang dibatasi baris (dibatasi oleh '\n') dari klien:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

Kode sebelumnya memiliki beberapa masalah:

  • Seluruh pesan (akhir baris) mungkin tidak diterima dalam satu panggilan ke ReadAsync.
  • Kode ini mengabaikan hasil stream.ReadAsync. stream.ReadAsync mengembalikan berapa banyak data yang dibaca.
  • Kode ini tidak menangani kasus di mana beberapa baris dibaca dalam satu panggilan ReadAsync.
  • Kode ini mengalokasikan array byte dengan setiap bacaan.

Untuk memperbaiki masalah sebelumnya, diperlukan perubahan berikut:

  • Buffer data masuk hingga baris baru ditemukan.

  • Uraikan semua garis yang dikembalikan dalam buffer.

  • Ada kemungkinan bahwa garis lebih besar dari 1 KB (1024 byte). Kode perlu mengubah ukuran buffer input sampai pemisah ditemukan agar sesuai dengan baris lengkap di dalam buffer.

    • Jika buffer diubah ukurannya, lebih banyak salinan buffer dibuat karena garis yang lebih panjang muncul di input.
    • Untuk mengurangi ruang yang terbuang, padatkan buffer yang digunakan untuk membaca baris.
  • Mempertimbangkan untuk menggunakan pengumpulan buffer untuk menghindari alokasi memori berulang kali.

  • Kode berikut membahas beberapa masalah ini:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

Kode sebelumnya rumit dan tidak mengatasi semua masalah yang diidentifikasi. Jaringan berkinerja tinggi biasanya berarti menulis kode kompleks untuk memaksimalkan performa. System.IO.Pipelines dirancang untuk membuat penulisan jenis kode ini lebih mudah.

Pipa

Kelas Pipe dapat digunakan untuk membuat pasangan PipeWriter/PipeReader. Semua data yang ditulis ke PipeWriter tersedia di dalam PipeReader:

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Penggunaan dasar pipa

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

Terdapat dua kasus:

  • FillPipeAsync membaca dari Socket dan menulis ke PipeWriter.
  • ReadPipeAsync membaca dari PipeReader dan mengurai baris masuk.

Tidak ada buffer eksplisit yang dialokasikan. Semua manajemen buffer didelegasikan ke implementasi PipeReader dan PipeWriter. Mendelegasikan manajemen buffer memudahkan penggunaan kode untuk hanya berfokus pada logika bisnis.

Di perulangan pertama:

Dalam perulangan kedua, PipeReader mengonsumsi buffer yang ditulis oleh PipeWriter. Buffer berasal dari soket. Panggilan ke PipeReader.ReadAsync:

  • Mengembalikan ReadResult yang berisi dua informasi penting:

    • Data yang dibaca dalam bentuk ReadOnlySequence<byte>.
    • Boolean IsCompleted yang menunjukkan apakah akhir data (EOF) telah tercapai.

Setelah menemukan pemisah akhir baris (EOL) dan mengurai baris:

  • Logika memproses buffer untuk melewati apa yang sudah diproses.
  • PipeReader.AdvanceTo dipanggil untuk memberi tahu PipeReader seberapa banyak data yang telah digunakan dan diperiksa.

Pembaca dan penulis mengulang berakhir dengan memanggil Complete. Complete memungkinkan Pipa yang mendasar merilis memori yang dialokasikannya.

Backpressure dan kontrol alur

Idealnya, membaca dan mengurai bekerja sama:

  • Utas baca mengonsumsi data dari jaringan dan memasukkannya ke dalam buffer.
  • Utas penguraian bertanggung jawab untuk membangun struktur data yang sesuai.

Biasanya, penguraian membutuhkan lebih banyak waktu daripada hanya menyalin blok data dari jaringan:

  • Utas bacaan di depan utas penguraian.
  • Utas baca harus memperlambat atau mengalokasikan lebih banyak memori untuk menyimpan data untuk utas penguraian.

Untuk performa optimal, terdapat keseimbangan antara jeda yang sering dan mengalokasikan lebih banyak memori.

Untuk mengatasi masalah sebelumnya, Pipe memiliki dua pengaturan untuk mengontrol aliran data:

  • PauseWriterThreshold: Menentukan seberapa banyak data yang harus di-buffer sebelum panggilan untuk FlushAsync dijeda.
  • ResumeWriterThreshold: Menentukan seberapa banyak data yang harus diamati pembaca sebelum panggilan untuk PipeWriter.FlushAsync dilanjutkan.

Diagram with ResumeWriterThreshold and PauseWriterThreshold

PipeWriter.FlushAsync:

  • Mengembalikan ValueTask<FlushResult> yang tidak lengkap saat jumlah data dalam silang PipePauseWriterThreshold.
  • Selesaikan ValueTask<FlushResult> ketika menjadi lebih rendah dari ResumeWriterThreshold.

Dua nilai digunakan untuk mencegah bersepeda cepat, yang dapat terjadi jika satu nilai digunakan.

Contoh

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

Biasanya saat menggunakan async dan await, kode asinkron dilanjutkan pada TaskScheduler atau saat ini SynchronizationContext.

Saat melakukan I/O, penting untuk memiliki kontrol halus atas tempat I/O dilakukan. Kontrol ini memungkinkan memanfaatkan cache CPU secara efektif. Penembolokan yang efisien sangat penting untuk aplikasi berkinerja tinggi seperti server web. PipeScheduler menyediakan kontrol atas tempat panggilan balik asinkron berjalan. Secara default:

  • Saat ini SynchronizationContext digunakan.
  • Jika tidak ada SynchronizationContext, hal tersebut menggunakan kumpulan utas untuk menjalankan panggilan balik.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPool adalah implementasi PipeScheduler yang mengantrekan panggilan balik ke kumpulan utas. PipeScheduler.ThreadPool adalah default dan umumnya pilihan terbaik. PipeScheduler.Inline dapat menyebabkan konsekuensi yang tidak diinginkan seperti kebuntuan.

Reset pipa

Sering kali efisien untuk menggunakan kembali objek Pipe. Untuk mengatur ulang pipa, panggil PipeReaderReset saat PipeReader dan PipeWriter selesai.

PipeReader

PipeReader mengelola memori atas nama pemanggil. Selalu panggil PipeReader.AdvanceTo setelah memanggil PipeReader.ReadAsync. Hal ini memberi tahu PipeReader kapan penelepon dilakukan dengan memori sehingga dapat dilacak. ReadOnlySequence<byte> dikembalikan dari PipeReader.ReadAsync hanya valid sampai panggilan PipeReader.AdvanceTo. Hal ini ilegal untuk digunakan ReadOnlySequence<byte> setelah memanggil PipeReader.AdvanceTo.

PipeReader.AdvanceTo mengambil dua argumen SequencePosition:

  • Argumen pertama menentukan berapa banyak memori yang digunakan.
  • Argumen kedua menentukan berapa banyak buffer yang diamati.

Menandai data sebagai digunakan berarti bahwa pipa dapat mengembalikan memori ke kumpulan buffer yang mendasar. Menandai data sebagai diamati mengontrol apa yang akan dilakukan panggilan PipeReader.ReadAsync berikutnya. Menandai semuanya seperti yang diamati berarti bahwa panggilan berikutnya ke PipeReader.ReadAsync tidak akan kembali sampai ada lebih banyak data yang ditulis ke pipa. Nilai lain akan melakukan panggilan berikutnya untuk PipeReader.ReadAsync segera kembali dengan data yang diamati dan tidak ditangguhkan, tetapi bukan data yang telah digunakan.

Membaca skenario data streaming

Terdapat beberapa pola khas yang muncul saat mencoba membaca data streaming:

  • Mengingat aliran data, uraikan satu pesan.
  • Mengingat aliran data, uraikan semua pesan yang tersedia.

Contoh berikut menggunakan metode TryParseLines untuk mengurai pesan dari ReadOnlySequence<byte>. TryParseLines mengurai satu pesan dan memperbarui buffer input untuk memangkas pesan yang diurai dari buffer. TryParseLines bukan bagian dari .NET, ini adalah metode tertulis pengguna yang digunakan di bagian berikut.

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Membaca satu pesan

Kode berikut membaca satu pesan dari PipeReader dan mengembalikannya ke pemanggil.

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

Kode sebelumnya:

  • Mengurai satu pesan.
  • Memperbarui yang dikonsumsi SequencePosition dan diperiksa SequencePosition untuk menunjuk ke awal buffer input yang dipangkas.

Dua argumen SequencePosition diperbarui karena TryParseLines menghapus pesan yang diurai dari buffer input. Umumnya, saat mengurai satu pesan dari buffer, posisi yang diperiksa harus menjadi salah satu hal berikut:

  • Akhir pesan.
  • Akhir buffer yang diterima jika tidak ada pesan yang ditemukan.

Kasus pesan tunggal memiliki potensi kesalahan paling besar. Meneruskan nilai yang salah untuk diperiksa dapat mengakibatkan pengecualian kehabisan memori atau perulangan tak terbatas. Untuk informasi selengkapnya, lihat bagian Masalah umum PipeReader di artikel ini.

Membaca beberapa pesan

Kode berikut membaca semua pesan dari PipeReader dan panggilan ProcessMessageAsync pada masing-masing pesan.

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Pembatalan

PipeReader.ReadAsync:

  • Mendukung melewati CancellationToken.
  • Melempar OperationCanceledException jika CancellationToken dibatalkan saat ada pembacaan tertunda.
  • Mendukung cara untuk membatalkan operasi baca saat ini melalui PipeReader.CancelPendingRead, yang menghindari peningkatan pengecualian. Panggilan PipeReader.CancelPendingRead menyebabkan panggilan PipeReader.ReadAsync saat ini atau berikutnya untuk mengembalikan ReadResult dengan IsCanceled yang diatur ke true. Hal ini dapat berguna untuk menghentikan perulangan baca yang ada dengan cara yang tidak merusak dan tidak luar biasa.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Masalah umum PipeReader

  • Meneruskan nilai yang salah ke consumed atau examined dapat mengakibatkan membaca data yang sudah dibaca.

  • Lulus buffer.End sebagaimana diperiksa dapat mengakibatkan:

    • Data terhenti
    • Mungkin pengecualian Kehabisan Memori (OOM) akhir jika data tidak digunakan. Misalnya, PipeReader.AdvanceTo(position, buffer.End) saat memproses satu pesan pada satu waktu dari buffer.
  • Meneruskan nilai yang salah ke consumed atau examined dapat mengakibatkan perulangan tak terbatas. Misalnya, PipeReader.AdvanceTo(buffer.Start) jika buffer.Start belum berubah akan menyebabkan panggilan berikutnya ke PipeReader.ReadAsync segera kembali sebelum data baru tiba.

  • Meneruskan nilai yang salah ke consumed atau examined dapat mengakibatkan buffering tak terbatas (akhirnya OOM).

  • Menggunakan ReadOnlySequence<byte> setelah panggilan dapat mengakibatkan PipeReader.AdvanceTo kerusakan memori (digunakan setelah bebas).

  • Gagal memanggil PipeReader.Complete/CompleteAsync dapat mengakibatkan kebocoran memori.

  • Memeriksa ReadResult.IsCompleted dan keluar dari logika pembacaan sebelum memproses buffer mengalihkan kehilangan data. Kondisi keluar perulangan harus didasarkan pada ReadResult.Buffer.IsEmpty dan ReadResult.IsCompleted. Melakukan ini dengan tidak benar dapat mengakibatkan perulangan tak terbatas.

Kode bermasalah

Kehilangan data

ReadResult dapat mengembalikan segmen akhir data saat IsCompleted diatur ke true. Tidak membaca data tersebut sebelum keluar dari perulangan baca akan mengakibatkan kehilangan data.

Peringatan

JANGAN gunakan kode berikut. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel berikut disediakan untuk menjelaskan masalah Umum PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Peringatan

JANGAN gunakan kode sebelumnya. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel sebelumnya disediakan untuk menjelaskan masalah Umum PipeReader.

Loop tak terbatas

Logika berikut dapat mengakibatkan perulangan tak terbatas jika Result.IsCompleted ada true tetapi tidak pernah ada pesan lengkap dalam buffer.

Peringatan

JANGAN gunakan kode berikut. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel berikut disediakan untuk menjelaskan masalah Umum PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Peringatan

JANGAN gunakan kode sebelumnya. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel sebelumnya disediakan untuk menjelaskan masalah Umum PipeReader.

Berikut adalah bagian lain dari kode dengan masalah yang sama. Hal ini memeriksa buffer yang tidak kosong sebelum memeriksa ReadResult.IsCompleted. Karena dalam else if, itu akan berulang selamanya jika tidak pernah ada pesan lengkap dalam buffer.

Peringatan

JANGAN gunakan kode berikut. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel berikut disediakan untuk menjelaskan masalah Umum PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Peringatan

JANGAN gunakan kode sebelumnya. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel sebelumnya disediakan untuk menjelaskan masalah Umum PipeReader.

Aplikasi yang tidak responsif

Panggilan PipeReader.AdvanceTo tanpa syarat dengan buffer.End dalam posisi examined dapat mengakibatkan aplikasi menjadi tidak responsif saat mengurai satu pesan. Panggilan berikutnya untuk PipeReader.AdvanceTo tidak akan kembali hingga:

  • Terdapat lebih banyak data yang ditulis ke pipa.
  • Dan data baru sebelumnya tidak diperiksa.

Peringatan

JANGAN gunakan kode berikut. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel berikut disediakan untuk menjelaskan masalah Umum PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Peringatan

JANGAN gunakan kode sebelumnya. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel sebelumnya disediakan untuk menjelaskan masalah Umum PipeReader.

Kehabisan Memori (OOM)

Dengan kondisi berikut, kode berikut terus buffering hingga OutOfMemoryException terjadi:

  • Tidak ada ukuran pesan maksimum.
  • Data yang dikembalikan dari PipeReader tidak membuat pesan lengkap. Misalnya, itu tidak membuat pesan lengkap karena sisi lain menulis pesan besar (Misalnya, pesan 4 GB).

Peringatan

JANGAN gunakan kode berikut. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel berikut disediakan untuk menjelaskan masalah Umum PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Peringatan

JANGAN gunakan kode sebelumnya. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel sebelumnya disediakan untuk menjelaskan masalah Umum PipeReader.

Korupsi Memori

Saat menulis pembantu yang membaca buffer, payload yang dikembalikan harus disalin sebelum memanggil Advance. Contoh berikut akan mengembalikan memori Pipe yang telah dibuang dan dapat menggunakannya kembali untuk operasi berikutnya (baca/tulis).

Peringatan

JANGAN gunakan kode berikut. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel berikut disediakan untuk menjelaskan masalah Umum PipeReader.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Peringatan

JANGAN gunakan kode sebelumnya. Menggunakan sampel ini akan mengakibatkan kehilangan data, macet, masalah keamanan, dan TIDAK boleh disalin. Sampel sebelumnya disediakan untuk menjelaskan masalah Umum PipeReader.

PipeWriter

PipeWriter mengelola buffer untuk menulis atas nama pemanggil. PipeWriter penerapan IBufferWriter<byte>. IBufferWriter<byte> memungkinkan untuk mendapatkan akses ke buffer untuk melakukan penulisan tanpa salinan buffer tambahan.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

Kode sebelumnya:

  • Meminta buffer setidaknya 5 byte dari penggunaan PipeWriterGetMemory.
  • Menulis byte untuk string ASCII "Hello" yang dikembalikan Memory<byte>.
  • Panggilan Advance untuk menunjukkan berapa banyak byte yang ditulis ke buffer.
  • Menghapus PipeWriter, yang mengirim byte ke perangkat yang mendasar.

Metode penulisan sebelumnya menggunakan buffer yang disediakan oleh PipeWriter. Hal ini juga bisa menggunakan PipeWriter.WriteAsync, yang:

  • Menyalin buffer yang ada ke PipeWriter.
  • Panggilan GetSpan, Advance sebagaimana merujuk dan memanggil FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

Pembatalan

FlushAsync mendukung melewati CancellationToken. Meneruskan hasil CancellationToken di OperationCanceledException jika token dibatalkan saat ada flush tertunda. PipeWriter.FlushAsync mendukung cara untuk membatalkan operasi flush saat ini melalui PipeWriter.CancelPendingFlush tanpa menaikkan pengecualian. Panggilan PipeWriter.CancelPendingFlush menyebabkan panggilan saat ini atau berikutnya ke PipeWriter.FlushAsync atau PipeWriter.WriteAsync untuk mengembalikan FlushResult dengan IsCanceled yang diatur ke true. Hal ini dapat berguna untuk menghentikan flush hasil dengan cara yang tidak merusak dan tidak luar biasa.

Masalah umum PipeWriter

  • GetSpan dan GetMemory mengembalikan buffer dengan setidaknya jumlah memori yang diminta. Jangan asumsikan ukuran buffer yang tepat.
  • Tidak ada jaminan bahwa panggilan berturut-turut akan mengembalikan buffer yang sama atau buffer berukuran sama.
  • Buffer baru harus diminta setelah memanggil Advance untuk terus menulis lebih banyak data. Buffer yang diperoleh sebelumnya tidak dapat ditulis.
  • Panggilan GetMemory atau GetSpan saat ada panggilan yang tidak lengkap ke FlushAsync tidak aman.
  • Memanggil Complete atau CompleteAsync saat ada data yang dibongkar dapat mengakibatkan kerusakan memori.

Tips untuk menggunakan PipeReader dan PipeWriter

Tips berikut akan membantu Anda berhasil menggunakan kelas System.IO.Pipelines:

  • Selalu selesaikan PipeReader dan PipeWriter, termasuk pengecualian jika berlaku.
  • Selalu panggil PipeReader.AdvanceTo setelah memanggil PipeReader.ReadAsync.
  • Secara berkala awaitPipeWriter.FlushAsync saat menulis, dan selalu periksa FlushResult.IsCompleted. Batalkan penulisan jika IsCompleted adalah true, seperti yang menunjukkan pembaca selesai dan tidak lagi peduli tentang apa yang ditulis.
  • Lakukan panggilan PipeWriter.FlushAsync setelah menulis sesuatu PipeReader yang ingin Anda akses.
  • Jangan panggil FlushAsync jika pembaca tidak dapat memulai sampai FlushAsync selesai, karena dapat menyebabkan kebuntuan.
  • Pastikan bahwa hanya satu konteks yang "memiliki" PipeReader atau PipeWriter atau mengaksesnya. Jenis-jenis ini tidak aman untuk utas.
  • Jangan pernah mengakses ReadResult.Buffer setelah memanggil AdvanceTo atau menyelesaikan PipeReader.

IDuplexPipe

IDuplexPipe adalah kontrak untuk jenis yang mendukung pembacaan dan penulisan. Misalnya, koneksi jaringan akan diwakili oleh IDuplexPipe.

Tidak seperti Pipe, yang berisi PipeReader dan PipeWriter, IDuplexPipe mewakili satu sisi koneksi dupleks penuh. Hal itu berarti apa yang ditulis ke PipeWriter tidak akan dibaca dari PipeReader.

Aliran

Saat membaca atau menulis data aliran, Anda biasanya membaca data menggunakan de-serializer dan menulis data menggunakan serializer. Sebagian besar API aliran baca dan tulis ini memiliki parameter Stream. Untuk mempermudah integrasi dengan API yang ada ini, PipeReader dan PipeWriter mengekspos metode AsStream. AsStream mengembalikan implementasi Stream di sekitar PipeReader atau PipeWriter.

Contoh aliran

Instans PipeReader dan PipeWriter dapat dibuat menggunakan metode statis Create yang diberikan objek Stream dan opsi pembuatan yang sesuai opsional.

StreamPipeReaderOptions izinkan kontrol atas pembuatan instans PipeReader dengan parameter berikut:

StreamPipeWriterOptions izinkan kontrol atas pembuatan instans PipeWriter dengan parameter berikut:

Penting

Saat membuat instans PipeReader dan PipeWriter menggunakan metode Create, Anda perlu mempertimbangkan masa pakai objek Stream. Jika Anda memerlukan akses ke aliran setelah pembaca atau penulis selesai dengannya, Anda harus mengatur bendera LeaveOpen ke true pada opsi pembuatan. Jika tidak, aliran akan ditutup.

Kode berikut menunjukkan pembuatan instans PipeReader dan PipeWriter menggunakan metode Create dari aliran.

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

Aplikasi ini menggunakan StreamReader untuk membaca file lorem-ipsum.txt sebagai aliran, dan harus diakhir dengan baris kosong. FileStream diteruskan ke PipeReader.Create, yang membuat instans objek PipeReader. Aplikasi konsol kemudian meneruskan aliran output standarnya untuk PipeWriter.Create yang menggunakan Console.OpenStandardOutput(). Contoh mendukung pembatalan.