System.IO.Pipelines

System.IO.Pipelines , .NET'te yüksek performanslı G/Ç'yi kolaylaştırmak için tasarlanmış bir kitaplıktır. Paket geniş uyumluluk, .NET Framework ve modern .NET için .NET Standard'ı hedefler. Modern .NET sürümlerinde, System.IO.Pipelines paylaşılan çerçeveye dahildir ve ayrı bir NuGet paketi gerektirmez.

Kitaplık System.IO.Pipelines NuGet paketi olarak da kullanılabilir.

System.IO.Pipelines hangi sorunu çözer?

Akış verilerini ayrıştıran uygulamalar, birçok özel ve olağan dışı kod akışına sahip ortak kodlardan oluşur. Şablon ve özel durum kodu karmaşıktır ve bakımı zordur.

System.IO.Pipelines şu şekilde tasarlandı:

  • Yüksek performanslı ayrıştırma akış verilerine sahip olun.
  • Kod karmaşıklığını azaltın.

Bu kod, bir istemciden '\n' ile sınırlandırılmış satır-ayrılmış iletiler alan bir TCP sunucusu için tipiktir.

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

Yukarıdaki kodda çeşitli sorunlar vardır:

  • İletinin tamamı (satır sonu) ReadAsync fonksiyonuna yapılan tek bir çağrıda alınamayabilir.
  • Bu stream.ReadAsync sonucunu yok sayıyor. stream.ReadAsync okunan veri miktarı döndürür.
  • Bir ReadAsync çağrısında birden fazla satırın okunması durumunu işlemez.
  • Her bir okuma ile bir byte dizi ayırır.

Önceki sorunları düzeltmek için şu değişiklikleri yapın:

  • Yeni bir satır bulunana kadar gelen verileri arabelleğe alın.

  • Arabellekte döndürülen tüm satırları ayrıştırın.

  • Satır 1 KB'tan (1024 bayt) büyük olabilir. Tam satırı arabellek içine sığdırmak için, kodun giriş arabelleğini sınırlandırıcı bulunana kadar yeniden boyutlandırması gerekiyor.

    • Arabellek yeniden boyutlandırılırsa, girişte daha uzun satırlar göründüğünde daha fazla arabellek kopyası oluşturulur.
    • Boşa harcanan alanı azaltmak için satırları okumak için kullanılan arabelleği sıkıştırın.
  • Belleği tekrar tekrar ayırmamak için arabellek havuzu kullanmayı göz önünde bulundurun.

  • Bu kod şu sorunlardan bazılarını giderir:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

Önceki kod karmaşıktır ve tanımlanan tüm sorunları gidermez. Yüksek performanslı ağ genellikle performansı en üst düzeye çıkarmak için karmaşık kod yazma anlamına gelir. System.IO.Pipelines bu tür kod yazmayı kolaylaştırmak için tasarlanmıştır.

Boru

Pipe sınıfını kullanarak bir PipeWriter/PipeReader çifti oluşturun. PipeWriter içine yazılan tüm veriler PipeReader içinde mevcuttur.

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Boru temel kullanımı

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

İki döngü okuma ve yazmayı işler:

  • FillPipeAsync öğesinden Socket okur ve PipeWriter öğesine yazar.
  • ReadPipeAsync, PipeReader'den okur ve gelen satırları ayrıştırır.

Belirli arabellekler tahsis edilmez. Tüm arabellek yönetimi, PipeReader ve PipeWriter uygulamalarına devredilir. Arabellek yönetimini delege etmek, kodun yalnızca iş mantığına odaklanmasını kolaylaştırır.

İlk döngüde:

İkinci döngüde PipeReaderPipeWriter tarafından yazılan arabellekleri tüketir. Arabellekler soketten gelir. çağrısı:PipeReader.ReadAsync

  • İki önemli bilgi parçası içeren bir ReadResult döndürür:

    • ReadOnlySequence<T> biçiminde okunan veriler.
    • Veri sonuna (EOF) ulaşılıp ulaşılmadığını gösteren boole IsCompleted değeri.

Satır sonu (EOL) sınırlayıcısını bulduktan ve satırı ayrıştırdıktan sonra:

  • Mantık, önceden işlenmiş olanları atlamak için arabelleği işler.
  • PipeReader.AdvanceTo, ne kadar verinin tüketildiğini ve incelendiğini PipeReader'e bildirmek için çağrılır.

Okuyucu ve yazıcı döngüleri, PipeReader.Complete ve PipeWriter.Complete çağrılarıyla sona erer. Complete çağrısı, Pipe tarafından ayrılmış olan belleği serbest bırakır.

Geri basınç ve akış denetimi

İdeal olarak, okuma ve ayrıştırma birlikte çalışır:

  • Okuma iş parçacığı, ağdan verileri alır ve arabelleklere yerleştirir.
  • Ayrıştırma iş parçacığı uygun veri yapılarını oluşturmakla sorumludur.

Ayrıştırma genellikle ağdan veri bloklarını kopyalamaktan daha uzun sürer:

  • Okuma iş parçacığı, ayrıştırma iş parçacığının önüne geçer.
  • Okuma iş parçacığının ayrıştırma iş parçacığının verilerini depolamak için yavaşlamasına veya daha fazla bellek ayırmasına gerek vardır.

En iyi performans için sık duraklamalar ile daha fazla bellek ayırma arasında bir denge vardır.

Yukarıdaki sorunu çözmek için, Pipe veri akışını denetlemek için iki ayarı vardır:

ResumeWriterThreshold ve PauseWriterThreshold ile diyagram

PipeWriter.FlushAsync:

  • Verinin ValueTask<FlushResult> içindeki miktarı Pipe sınırını aştığında, tamamlanmamış bir PauseWriterThreshold döndürür.
  • ValueTask<FlushResult> ResumeWriterThreshold değerinden daha düşük olduğunda tamamlanır.

Hızlı döngülerin meydana gelmesini önlemek için iki değer kullanılır; bu, yalnızca bir değer kullanıldığında gerçekleşebilir.

Örnekler

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

Boru Zamanlayıcı

Genellikle async ve await kullanırken, uyumsuz kod ya TaskScheduler ya da geçerli SynchronizationContext üzerinde devam eder.

G/Ç yapılırken G/Ç'nin nerede gerçekleştirildiği üzerinde ayrıntılı denetim sahibi olmak önemlidir. Bu denetim, CPU önbelleklerinden etkili bir şekilde yararlanmaya olanak tanır. Verimli önbelleğe alma, web sunucuları gibi yüksek performanslı uygulamalar için kritik öneme sahiptir. PipeScheduler zaman uyumsuz geri çağırmaların nerede çalıştırıldığı üzerinde denetim sağlar. Varsayılan olarak:

  • Geçerli SynchronizationContext kullanılır.
  • Eğer SynchronizationContext yoksa, geri çağrımları çalıştırmak için iş parçacığı havuzunu kullanır.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPool iş parçacığı havuzuna geri çağırmaları kuyruğa alan bir uygulamadır PipeScheduler. PipeScheduler.ThreadPool varsayılan ve genel olarak en iyi seçenektir. PipeScheduler.Inline kilitlenmeler gibi istenmeyen sonuçlara neden olabilir.

Boru sıfırlama

Nesneyi yeniden kullanma Pipe işlemi genellikle verimlidir. Kanalı sıfırlamak için, hem PipeReader hem de Reset tamamlandıktan sonra PipeReaderPipeWriter çağırın.

PipeReader

PipeReader çağıranın adına belleği yönetir. Bu, çağıranın belleği kullanmayı bitirdiğini PipeReader'a bildirir, böylece izlenebilir. ReadOnlySequence<T> PipeReader.ReadAsync'den döndürülen değer, yalnızca PipeReader.AdvanceTo çağrısına kadar geçerlidir. çağrısı ReadOnlySequence<T>yaptıktan sonra kullanmak PipeReader.AdvanceTo geçersizdir.

PipeReader.AdvanceTo iki SequencePosition parametre alır:

  • İlk bağımsız değişken, ne kadar bellek tüketildiğini belirler.
  • İkinci bağımsız değişken, arabelleğin ne kadarının gözlemlendiğini belirler.

Verileri tüketilen olarak işaretlemek, boru hattının belleği altındaki arabellek havuzuna geri döndürebileceği anlamına gelir. Verileri gözlemlenmiş olarak işaretlemek, bir sonraki PipeReader.ReadAsync çağrısının ne yapacağını kontrol eder. Her şeyi gözlemlenmiş olarak işaretlemek, boruya daha fazla veri yazılana kadar PipeReader.ReadAsync fonksiyonunun bir sonraki çağrısının geri dönmeyeceği anlamına gelir. Diğer herhangi bir değer, gözlemlenen PipeReader.ReadAsync gözlemlenmeyen verilerle hemen dönmek için bir sonraki çağrıyı yapar, ancak zaten tüketilmiş olan verileri döndürmez.

Akış verisi senaryolarını okuma

Akış verileri okunurken birkaç tipik desen ortaya çıkar:

  • Veri akışı verildiğinde, tek bir iletiyi ayrıştırın.
  • Bir veri akışı göz önünde bulundurulduğunda, kullanılabilir tüm iletileri ayrıştırın.

Bu örnekler, TryParseLines bir ReadOnlySequence<T>'den iletileri ayrıştırma yöntemini kullanır. TryParseLines tek bir iletiyi ayrıştırır ve giriş arabelleğini, ayrıştırılan iletiyi arabelleğinden çıkarmak için günceller. TryParseLines .NET'in bir parçası değildir; bu, aşağıdaki bölümlerde kullanılan kullanıcı tarafından yazılmış bir yöntemdir.

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Tek bir iletiyi okuma

Bu kod, PipeReader'den tek bir mesaj okur ve bunu çağırana döndürür.

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

Yukarıdaki kod:

  • Tek bir iletiyi ayrıştırıyor.
  • Tüketilen SequencePosition ve incelenen SequencePosition öğesini kırpılan giriş arabelleğinin başlangıcına işaret eden şekilde güncelleştirir.

İki SequencePosition bağımsız değişken, TryParseLines ayrıştırılmış iletiyi giriş arabelleğinden kaldırdığından güncellenir. Genellikle, arabellekten tek bir iletiyi ayrıştırırken, incelenen konum aşağıdakilerden biri olmalıdır:

  • İletinin sonu.
  • Alınan arabelleğin sonu, eğer ileti bulunamazsa.

Tek bir ileti durumunda en fazla hata olasılığı vardır. Examined'a yanlış değerlerin geçilmesi bellek yetersizliği özel durumu veya sonsuz döngüyle sonuçlanabilir. Daha fazla bilgi için bu makaledeki PipeReader yaygın sorunları bölümüne bakın.

Önemli

ReadSingleMessageAsync, PipeReader.CompleteAsync çağrısı yapmaz. Çağıran, PipeReader öğesini tamamlamaktan sorumludur. PipeReader.CompleteAsync'in ReadSingleMessageAsync içinde çağrılması daha fazla veri okunamamasını sinyaller ve sonraki mesajların okunmasını önler.

Birden çok iletiyi okuma

Bu kod, PipeReader'den tüm iletileri okur ve her birine ProcessMessageAsync çağrısı yapar.

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

ProcessMessagesAsync İleti okuma döngüsünün tamamına sahip olduğundan, tamamlandığında çağrısı PipeReader.CompleteAsync yapılır. Tek mesaj durumuyla farklı olarak, çağıranın okuyucuyu tamamlaması gerekmez. ProcessMessagesAsync, PipeReader'in yaşam döngüsü üzerinde tam sahiplik alır.

İptal

PipeReader.ReadAsync:

  • bir CancellationTokengeçirmeyi destekler.
  • OperationCanceledException ifadesini, bekleyen bir okuma varken CancellationToken iptal edilirse atar.
  • aracılığıyla PipeReader.CancelPendingRead geçerli okuma işlemini iptal etmenin bir yolunu destekler ve bu da istisna oluşturulmasını önler. Çağrı PipeReader.CancelPendingRead, geçerli veya sonraki PipeReader.ReadAsync çağrısının, ReadResult'nin IsCanceled olarak ayarlandığı bir true döndürmesine neden olur. Bu, var olan okuma döngüsünü yıkıcı olmayan ve istisnai olmayan bir şekilde durdurmak için kullanışlıdır.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

PipeReader yaygın sorunları

  • Yanlış değerlerin consumed veya examined'e geçirilmesi, daha önce okunan verilerin tekrar okunmasıyla sonuçlanabilir.

  • İncelendiği gibi geçiş buffer.End aşağıdakilere neden olabilir:

    • Durdurulan veriler
    • Veriler tüketilmezse sonunda bellek yetersizliği (OOM) özel durumu oluşabilir. Örneğin, PipeReader.AdvanceTo(position, buffer.End) arabellekten bir kerede tek bir iletiyi işlerken.
  • Yanlış değerlerin consumed veya examined öğesine geçirilmesi sonsuz döngüye neden olabilir. Örneğin, buffer.Start değişmediyse, bir sonraki PipeReader.ReadAsync çağrısı, yeni veriler gelmeden hemen önce döner.

  • Yanlış değerlerin consumed veya examined'e geçirilmesi sonsuz arabelleğe alma (sonunda bellek yetersizliği) ile sonuçlanabilir.

  • ReadOnlySequence<T> çağrısından sonra PipeReader.AdvanceTo kullanmak bellek bozulmasına neden olabilir (serbest bırakıldıktan sonra kullanma).

  • Çağrı Complete/CompleteAsync yapılamaması bellek sızıntısına neden olabilir.

  • Okuma mantığının kontrol edilip ReadResult.IsCompleted ve arabelleği işlemeye başlamadan önce sonlandırılması, veri kaybına neden olur. Döngü çıkış koşulu, ReadResult.Buffer.IsEmpty ve ReadResult.IsCompleted temel alınarak belirlenmelidir. Bunu yanlış yapmak sonsuz döngüye neden olabilir.

Sorunlu kod

Veri kaybı

ReadResult olarak ayarlandığında IsCompletedverinin true son kesimini döndürebilir. Okuma döngüsünden çıkmadan önce bu verilerin okunmaması veri kaybına neden olur.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için aşağıdaki örnek sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için yukarıdaki örnek sağlanmıştır.

Sonsuz döngü

"Aşağıdaki mantık, eğer Result.IsCompletedtrue ise ve arabellekte hiçbir zaman tam bir ileti yoksa, sonsuz döngüye neden olabilir."

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için aşağıdaki örnek sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için yukarıdaki örnek sağlanmıştır.

Aynı sorunla ilgili başka bir kod parçası aşağıdadır. Boş olmayan bir arabelleği kontrol ettikten sonra ReadResult.IsCompleted’yu kontrol eder. else if içinde olduğundan, arabellekte asla tam bir ileti yoksa sonsuz döngüde kalır.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için aşağıdaki örnek sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için yukarıdaki örnek sağlanmıştır.

Yanıt vermeyen uygulama

examined konumunda PipeReader.AdvanceTobuffer.End ile koşulsuz çağırmak, uygulamanın tek bir iletiyi ayrıştırırken yanıt vermemeye başlamasına neden olabilir. Sonraki PipeReader.AdvanceTo çağrısı şu ana kadar geri dönmeyecek:

  • Kanala yazılan daha fazla veri var.
  • Yeni veriler daha önce incelenmedi.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için aşağıdaki örnek sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için yukarıdaki örnek sağlanmıştır.

Yetersiz Bellek (OOM)

Aşağıdaki koşullarla, bu kod bir OutOfMemoryException durumu gerçekleşene kadar arabelleğe almaya devam eder.

  • İleti boyutu üst sınırı yoktur.
  • 'dan PipeReader döndürülen veriler tam bir ileti oluşturmaz. Örneğin, diğer taraf büyük bir ileti (örneğin, 4 GB ileti) yazıyor.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için aşağıdaki örnek sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için yukarıdaki örnek sağlanmıştır.

Bellek Bozulması

Arabelleği okuyan yardımcıları yazarken, Advance çağrılmadan önce döndürülen veri yükünü kopyalayın. Aşağıdaki örnek, Pipe tarafından atılan belleği geri kazandırır ve belki sonraki işlem (okuma/yazma) için yeniden kullanılabilir.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için aşağıdaki örnek sağlanmıştır.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için yukarıdaki örnek sağlanmıştır.

PipeWriter

, PipeWriter arayan adına yazılacak arabellekleri yönetir. PipeWriter uygular IBufferWriter<byte>. IBufferWriter<byte> ek arabellek kopyaları olmadan yazma işlemleri gerçekleştirmek için arabelleklere erişim sağlar.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

Önceki kod:

  • PipeWriter kullanarak GetMemory'den en az 5 baytlık bir arabellek istemektedir.
  • ASCII dizesi "Hello"'nin baytlarını döndürülen Memory<byte> öğesine yazar.
  • Arabelleğe kaç bayt yazıldığını belirtmek için Advance çağrıları yapılır.
  • PipeWriter'yi boşaltır ve baytları alt cihazına gönderir.

Daha önceki yazma yöntemi, PipeWriter tarafından sağlanan arabellekleri kullanır. Ayrıca şunu da kullanabilir PipeWriter.WriteAsync:

  • Mevcut arabelleği PipeWriter'a kopyalar.
  • GetSpan ve Advance öğelerini uygun şekilde çağırır, ve FlushAsync öğesini çağırır.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

İptal

FlushAsync bir CancellationTokengeçirmeyi destekler. Bir CancellationToken geçirildiğinde, bekleyen bir temizleme işlemi sırasında belirteç iptal edilirse, bir OperationCanceledException sonucu oluşur. PipeWriter.FlushAsync, istisna oluşturulmadan PipeWriter.CancelPendingFlush aracılığıyla geçerli temizleme işlemini iptal etmenin bir yolunu destekler. PipeWriter.CancelPendingFlush fonksiyonu çağrımı, geçerli ya da sonraki PipeWriter.FlushAsync veya PipeWriter.WriteAsync çağrısının, FlushResult olarak ayarlanmış bir IsCanceled ve true döndürmesine neden olur. Bu, boşaltmayı yıkıcı olmayan ve istisnai olmayan bir şekilde durdurmak için yararlıdır.

PipeWriter yaygın sorunları

  • GetSpan ve GetMemory en az istenen bellek miktarına sahip bir arabellek döndürür. Kesin arabellek boyutlarını asla varsaymayın.
  • Ardışık çağrıların aynı arabelleği veya aynı boyuttaki arabelleği döndürmesi garanti değildir.
  • Daha fazla veri yazmaya devam etmek için Advance çağrıldıktan sonra yeni bir arabellek istenmelidir. Daha önce alınan arabellek üzerine yazılamaz.
  • GetMemory'ye hâlâ tamamlanmamış bir çağrı varken GetSpan veya FlushAsync çağrısı yapmak güvenli değildir.
  • Çağrı Complete veya CompleteAsync boşaltılmamış veriler olduğunda bellek bozulmasına neden olabilir.

PipeReader ve PipeWriter İpuçları

Başarıyla System.IO.Pipelines sınıflarını kullanmak için şu ipuçlarını kullanın:

  • Uygun olduğunda bir özel durum da dahil olmak üzere PipeReader ve PipeWriter'ı her zaman tamamlayın.
  • PipeReader.AdvanceTo çağrısını yaptıktan sonra her zaman PipeReader.ReadAsync çağrısını yapın.
  • Yazarken düzenli aralıklarla awaitPipeWriter.FlushAsync ve FlushResult.IsCompleted her zaman denetleyin. Eğer IsCompletedtrue ise, bu durum okuyucunun tamamlandığını ve artık ne yazıldığını umursamadığını gösterdiğinden yazmayı durdurun.
  • Erişim sahibi olmasını istediğiniz PipeWriter.FlushAsync bir şey yazdıktan sonra arayınPipeReader.
  • FlushAsync 'i aramayın; eğer okuyucu FlushAsync bitene kadar başlayamazsa, bu bir kilitlenmeye neden olabilir.
  • Bir bağlamın PipeReader veya PipeWriter etiketlerine "sahip olduğundan" veya eriştiğinden emin olun. Bu türler iş parçacığı açısından güvenli değildir.
  • ReadResult.Buffer'yı çağırdıktan veya PipeReader.AdvanceTo'yi tamamladıktan sonra asla PipeReader öğesine erişmeyin.

IDuplexPipe

IDuplexPipe hem okumayı hem de yazmayı destekleyen türler için bir sözleşmedir. Örneğin, bir ağ bağlantısı ile IDuplexPipetemsil edilir.

Pipe, PipeReader ve PipeWriter içeren sürümden farklı olarak, IDuplexPipe tam çift yönlü bağlantının tek bir tarafını temsil eder. PipeWriter öğesine yazdıkların PipeReader öğesinden okunmayacak.

Akışlar

Akış verilerini okurken veya yazarken genellikle seri-dışına çıkarıcı kullanarak verileri okur ve seri hale getirici kullanarak veri yazarsınız. Bu okuma ve yazma akışı API'lerinin çoğunun parametresi Stream vardır. Bu mevcut API'lerle tümleştirmeyi kolaylaştırmak için, PipeReader ve PipeWriter bir AsStream yöntemi kullanıma sunar. AsStreamveya Streamçevresinde PipeReader bir PipeWriter uygulama döndürür.

Akış örneği

Bir Stream nesne ve isteğe bağlı Stream oluşturma seçenekleri ile birlikte, statik Create yöntemleri kullanarak PipeReader ve PipeWriter örnekleri oluşturun.

StreamPipeReaderOptions aşağıdaki parametrelerle PipeReader örneğinin oluşturulması üzerinde denetim sağlar.

StreamPipeWriterOptions aşağıdaki parametrelerle PipeWriter örneğinin oluşturulması üzerinde denetim sağlar.

Önemli

Yöntemleri kullanarak PipeReader ve PipeWriter örnekleri oluştururken Create nesne ömrünü Stream göz önünde bulundurun. Okuyucu veya yazıcıyla işiniz bittikten sonra akışa erişmeniz gerekiyorsa, oluşturma seçeneklerinde LeaveOpen bayrağını true olarak ayarlayın. Aksi takdirde akış kapatılır.

Bu kod, bir akıştan Create yöntemlerini kullanarak PipeReader ve PipeWriter örnekleri oluşturmayı gösterir.

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

Uygulama, bir 'yi lorem-ipsum.txt dosyasını akış olarak okumak için kullanır ve boş bir satırla bitmelidir. FileStream öğesi, PipeReader.Create öğesine geçirilir ve bu da bir PipeReader nesnesinin örneğini oluşturur. Konsol uygulaması daha sonra standart çıkış akışını PipeWriter.Create kullanarak Console.OpenStandardOutput() öğesine aktarır. Örnek iptali destekler.