Share via


.NET'te System.IO.Pipelines

System.IO.Pipelines , .NET'te yüksek performanslı G/Ç yapmayı kolaylaştırmak için tasarlanmış bir kitaplıktır. Bu, tüm .NET uygulamaları üzerinde çalışan .NET Standard'i hedefleyen bir kitaplıktır.

Kitaplığı System.IO.Pipelines Nuget paketinde bulabilirsiniz.

System.IO.Pipelines hangi sorunu çözer?

Akış verilerini ayrıştıran uygulamalar, birçok özel ve olağan dışı kod akışına sahip ortak kodlardan oluşur. Ortak ve özel durum kodu karmaşıktır ve bakımı zordur.

System.IO.Pipelines şu şekilde tasarlandı:

  • Yüksek performanslı ayrıştırma akış verilerine sahip olun.
  • Kod karmaşıklığını azaltın.

Aşağıdaki kod, bir istemciden satırla sınırlandırılmış iletiler (sınırlandırılmış '\n') alan bir TCP sunucusu için tipiktir:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

Yukarıdaki kodda çeşitli sorunlar vardır:

  • İletinin tamamı (satır sonu) öğesine yapılan tek bir çağrıda ReadAsyncalınamayabilir.
  • sonucunu stream.ReadAsyncyoksayıyor. stream.ReadAsync okunan veri miktarı döndürür.
  • Tek ReadAsync bir çağrıda birden çok satırın okunması durumunu işlemez.
  • Her okuma ile bir byte dizi ayırır.

Önceki sorunları düzeltmek için aşağıdaki değişiklikler gereklidir:

  • Yeni bir satır bulunana kadar gelen verileri arabelleğe alın.

  • Arabellekte döndürülen tüm satırları ayrıştırın.

  • Çizgi 1 KB'tan (1024 bayt) büyük olabilir. Tam satırı arabelleğe sığdırmak için sınırlayıcı bulunana kadar kodun giriş arabelleğinin yeniden boyutlandırılması gerekir.

    • Arabellek yeniden boyutlandırılırsa, girişte daha uzun satırlar göründüğünde daha fazla arabellek kopyası oluşturulur.
    • Boşa harcanan alanı azaltmak için satırları okumak için kullanılan arabelleği sıkıştırın.
  • Belleği tekrar tekrar ayırmamak için arabellek havuzu kullanmayı göz önünde bulundurun.

  • Aşağıdaki kod bu sorunlardan bazılarını giderir:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

Önceki kod karmaşıktır ve tanımlanan tüm sorunları gidermez. Yüksek performanslı ağ genellikle performansı en üst düzeye çıkarmak için karmaşık kod yazma anlamına gelir. System.IO.Pipelines bu tür kod yazmayı kolaylaştırmak için tasarlanmıştır.

Boru

sınıfı Pipe bir PipeWriter/PipeReader çift oluşturmak için kullanılabilir. içine PipeWriter yazılan tüm veriler içinde PipeReaderkullanılabilir:

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Kanal temel kullanımı

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

İki döngü vardır:

  • FillPipeAsync ve 'den Socket okur ve öğesine PipeWriteryazar.
  • ReadPipeAsyncPipeReader ve gelen satırları ayrıştırarak okur.

Ayrılmış açık arabellek yok. Tüm arabellek yönetimi ve PipeWriter uygulamalarına PipeReader devredilir. Arabellek yönetimi için temsilci seçmek, kod kullanılmasının yalnızca iş mantığına odaklanmasını kolaylaştırır.

İlk döngüde:

İkinci döngüde PipeReader , tarafından PipeWriteryazılan arabellekleri tüketir. Arabellekler yuvadan gelir. çağrısı:PipeReader.ReadAsync

  • İki önemli bilgi parçası içeren bir ReadResult döndürür:

    • biçiminde ReadOnlySequence<byte>okunan veriler.
    • Veri sonuna (EOF) ulaşılıp ulaşılmadığını gösteren boole IsCompleted değeri.

Satır sonu (EOL) sınırlayıcısını bulduktan ve satırı ayrıştırdıktan sonra:

  • Mantık, zaten işlenmiş olanları atlamak için arabelleği işler.
  • PipeReader.AdvanceTo , ne kadar verinin tüketildiğini ve incelendiğini söylemek PipeReader için çağrılır.

Okuyucu ve yazıcı döngüleri çağrılarak Completesona erer. Complete temel alınan Kanal'ın ayrılan belleği serbest bırakmasına izin verir.

Geri basınç ve akış denetimi

İdeal olarak, okuma ve ayrıştırma birlikte çalışır:

  • Okuma iş parçacığı, ağdan verileri tüketir ve arabelleklere yerleştirir.
  • Ayrıştırma iş parçacığı uygun veri yapılarını oluşturmakla sorumludur.

Ayrıştırma genellikle ağdan veri bloklarını kopyalamaktan daha uzun sürer:

  • Okuma iş parçacığı ayrıştırma iş parçacığının önüne geçer.
  • Okuma iş parçacığının ayrıştırma iş parçacığının verilerini depolamak için yavaşlamasına veya daha fazla bellek ayırmasına gerek vardır.

En iyi performans için sık duraklamalar ile daha fazla bellek ayırma arasında bir denge vardır.

Yukarıdaki sorunu çözmek için, Pipe veri akışını denetlemek için iki ayarı vardır:

  • PauseWriterThreshold: Duraklatılacak çağrılardan FlushAsync önce ne kadar verinin arabelleğe alınması gerektiğini belirler.
  • ResumeWriterThreshold: Devam etmek için yapılan çağrılardan önce okuyucunun ne kadar veri gözlemlediğini PipeWriter.FlushAsync belirler.

Diagram with ResumeWriterThreshold and PauseWriterThreshold

PipeWriter.FlushAsync:

  • içindeki veri miktarı kesiştiğinde PipePauseWriterThresholdtamamlanmamış ValueTask<FlushResult> bir değer döndürür.
  • değerinden ResumeWriterThresholddaha düşük olduğunda tamamlanırValueTask<FlushResult>.

Hızlı döngüleri önlemek için iki değer kullanılır ve bu değerlerden biri kullanılırsa ortaya çıkabilir.

Örnekler

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

Genellikle ve awaitkullanırkenasync, zaman uyumsuz kod veya TaskScheduler geçerli SynchronizationContextüzerinde devam eder.

G/Ç yapılırken G/Ç'nin nerede gerçekleştirildiği üzerinde ayrıntılı denetim sahibi olmak önemlidir. Bu denetim, CPU önbelleklerinden etkili bir şekilde yararlanmaya olanak tanır. Verimli önbelleğe alma, web sunucuları gibi yüksek performanslı uygulamalar için kritik öneme sahiptir. PipeScheduler zaman uyumsuz geri çağırmaların nerede çalıştırıldığı üzerinde denetim sağlar. Varsayılan olarak:

  • Geçerli SynchronizationContext kullanılır.
  • SynchronizationContextyoksa, geri çağırmaları çalıştırmak için iş parçacığı havuzunu kullanır.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPool , iş parçacığı havuzuna geri çağırmaları kuyruğa alan uygulamadır PipeScheduler . PipeScheduler.ThreadPool varsayılan ve genel olarak en iyi seçenektir. PipeScheduler.Inline kilitlenmeler gibi istenmeyen sonuçlara neden olabilir.

Kanal sıfırlama

Nesneyi yeniden kullanmak sık sık verimlidir Pipe . Kanalı sıfırlamak için hem hem de PipeReader tamamlandıktan sonra çağırın.PipeReaderResetPipeWriter

PipeReader

PipeReader çağıranın adına belleği yönetir. çağrısı yaptıktan sonra her zaman arayın PipeReader.AdvanceToPipeReader.ReadAsync. Bu, çağıranın PipeReader bellekle ne zaman yapıldığını bilmelerini ve böylece izlenebilmesini sağlar. döndürülen ReadOnlySequence<byte>PipeReader.ReadAsync yalnızca çağrısına PipeReader.AdvanceTokadar geçerlidir. çağrısı PipeReader.AdvanceToyaptıktan sonra kullanmak ReadOnlySequence<byte> geçersizdir.

PipeReader.AdvanceTo iki SequencePosition bağımsız değişken alır:

  • İlk bağımsız değişken, ne kadar bellek tüketildiğini belirler.
  • İkinci bağımsız değişken, arabelleğin ne kadarının gözlemleneceğini belirler.

Verileri tüketilen olarak işaretlemek, kanalın belleği temel alınan arabellek havuzuna döndürebileceği anlamına gelir. Verileri gözlemlenen olarak işaretlemek, bir sonraki çağrının PipeReader.ReadAsync ne yaptığını denetler. Her şeyi gözlemlendi olarak işaretlemek, kanala daha fazla veri yazılana PipeReader.ReadAsync kadar sonraki çağrısının döndürülmeyeceği anlamına gelir. Diğer tüm değerler, gözlemlenen ve gözlemlenmeyen verilerle hemen döndürülmek üzere PipeReader.ReadAsync bir sonraki çağrıyı yapar, ancak zaten tüketilmiş olan verileri döndürmez.

Akış verisi senaryolarını okuma

Akış verilerini okumaya çalışırken ortaya çıkan birkaç tipik desen vardır:

  • Veri akışı verilip tek bir iletiyi ayrıştırın.
  • Bir veri akışı göz önünde bulundurulduğunda, kullanılabilir tüm iletileri ayrıştırın.

Aşağıdaki örneklerde, bir ReadOnlySequence<byte>'den iletileri ayrıştırma yöntemi kullanılırTryParseLines. TryParseLines tek bir iletiyi ayrıştırıp giriş arabelleğinden ayrıştırılan iletiyi kırpmak için güncelleştirir. TryParseLines .NET'in bir parçası değildir, aşağıdaki bölümlerde kullanılan kullanıcı tarafından yazılmış bir yöntemdir.

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Tek bir iletiyi okuma

Aşağıdaki kod' dan PipeReader tek bir iletiyi okur ve çağırana döndürür.

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

Yukarıdaki kod:

  • Tek bir iletiyi ayrıştırıyor.
  • Tüketilen SequencePosition ve incelenen SequencePosition Güncelleştirmeler kırpılan giriş arabelleğinin başlangıcına işaret etmek için.

İki SequencePosition bağımsız değişken, ayrıştırılmış iletiyi giriş arabelleğinden kaldırdığından TryParseLines güncelleştirilir. Genellikle, arabellekten tek bir iletiyi ayrıştırırken, incelenen konum aşağıdakilerden biri olmalıdır:

  • İletinin sonu.
  • İleti bulunamazsa alınan arabelleğin sonu.

Tek bir ileti durumunda en fazla hata olasılığı vardır. İncelenmesi gereken yanlış değerlerin geçirilmesi bellek yetersiz özel durumu veya sonsuz döngüye neden olabilir. Daha fazla bilgi için bu makaledeki PipeReader yaygın sorunları bölümüne bakın.

Birden çok iletiyi okuma

Aşağıdaki kod, bir'den PipeReader gelen tüm iletileri okur ve her birine çağrı ProcessMessageAsync yapar.

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

İptal

PipeReader.ReadAsync:

  • bir CancellationTokengeçirmeyi destekler.
  • bekleyen bir okuma olduğunda iptal edilirse CancellationToken bir OperationCanceledException atar.
  • aracılığıyla PipeReader.CancelPendingReadgeçerli okuma işlemini iptal etmenin bir yolunu destekler ve bu da özel durum oluşturulmasını önler. ÇağırmaPipeReader.CancelPendingRead, geçerli veya sonraki çağrısının PipeReader.ReadAsync ile olarak IsCanceled ayarlanmış bir ReadResult döndürmesine trueneden olur. Bu, mevcut okuma döngüsünü yıkıcı olmayan ve istisnai olmayan bir şekilde durdurmak için yararlı olabilir.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

PipeReader yaygın sorunları

  • Yanlış değerlerin veya değerlerinin geçirilmesi consumedexamined , zaten okunan verilerin okunmasıyla sonuçlanabilir.

  • İncelendiği şekilde geçiş buffer.End aşağıdakilere neden olabilir:

    • Durdurulan veriler
    • Veriler tüketilmezse büyük olasılıkla son bir Yetersiz Bellek (OOM) özel durumu. Örneğin, PipeReader.AdvanceTo(position, buffer.End) arabellekten bir kerede tek bir iletiyi işlerken.
  • Yanlış değerlerin öğesine consumed geçirilmesi veya examined sonsuz döngüye neden olabilir. Örneğin, değişmediyse, PipeReader.AdvanceTo(buffer.Start)buffer.Start yeni veriler gelmeden hemen önce bir sonraki çağrının döndürülmesi gerekir PipeReader.ReadAsync .

  • Yanlış değerlerin 'e consumed geçirilmesi veya examined sonsuz arabelleğe alma (nihai OOM) ile sonuçlanabilir.

  • Çağrıdan PipeReader.AdvanceTo sonra komutunun ReadOnlySequence<byte> kullanılması bellek bozulmasına neden olabilir (ücretsiz kullanımdan sonra kullanın).

  • Çağrı PipeReader.Complete/CompleteAsync yapılamaması bellek sızıntısına neden olabilir.

  • Arabelleği işlemeden önce okuma mantığının denetlenip ReadResult.IsCompleted çıkılması veri kaybına neden olur. Döngü çıkış koşulu ve ReadResult.IsCompletedtemel ReadResult.Buffer.IsEmpty almalıdır. Bunu yanlış yapmak sonsuz döngüye neden olabilir.

Sorunlu kod

Veri kaybı

ReadResult olarak ayarlandığında trueverinin IsCompleted son kesimini döndürebilir. Okuma döngüsünden çıkmadan önce bu verilerin okunmaması veri kaybına neden olur.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.

Sonsuz döngü

Aşağıdaki mantık, ise ancak arabellekte hiçbir zaman tam bir ileti yoksa sonsuz bir döngüye Result.IsCompletedtrue neden olabilir.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.

Aynı sorunla ilgili başka bir kod parçası aşağıdadır. denetlemeden önce boş olmayan bir arabelleği denetler ReadResult.IsCompleted. içinde olduğundan, arabellekte else ifhiçbir zaman tam bir ileti yoksa sonsuza kadar döngüye alır.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.

Yanıt vermeyen uygulama

konumunda ile buffer.Endexamined koşulsuz çağrılmasıPipeReader.AdvanceTo, uygulamanın tek bir iletiyi ayrıştırırken yanıt vermemeye başlamasına neden olabilir. Sonraki arama PipeReader.AdvanceTo şu zamana kadar döndürülmeyecek:

  • Kanala yazılan daha fazla veri var.
  • Yeni veriler daha önce incelenmedi.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.

Yetersiz Bellek (OOM)

Aşağıdaki koşullarla, aşağıdaki kod bir OutOfMemoryException gerçekleşene kadar arabelleğe almayı tutar:

  • İleti boyutu üst sınırı yoktur.
  • 'dan PipeReader döndürülen veriler tam bir ileti oluşturmaz. Örneğin, diğer taraf büyük bir ileti yazdığından (örneğin, 4 GB'lık bir ileti) tam bir ileti oluşturmaz.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.

Bellek Bozulması

Arabelleği okuyan yardımcılar yazarken, döndürülen yük çağrılmadan Advanceönce kopyalanmalıdır. Aşağıdaki örnek, atılan belleği Pipe döndürür ve bir sonraki işlem (okuma/yazma) için yeniden kullanabilir.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.

PipeWriter

, PipeWriter arayan adına yazılacak arabellekleri yönetir. PipeWriter uygular IBufferWriter<byte>. IBufferWriter<byte> ek arabellek kopyaları olmadan yazma işlemleri gerçekleştirmek için arabelleklere erişim elde etmek mümkün hale getirir.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

Önceki kod:

  • kullanarak 'den en az 5 baytlık bir arabellek istemektedir PipeWriterGetMemory.
  • ASCII dizesi "Hello" için baytları döndürülen Memory<byte>öğesine yazar.
  • Arabelleğe kaç bayt yazıldığını belirten çağrılar Advance .
  • Baytları PipeWritertemel alınan cihaza gönderen öğesini temizler.

Önceki yazma yöntemi tarafından PipeWritersağlanan arabellekleri kullanır. Şunu da kullanmış PipeWriter.WriteAsyncolabilir:

  • Varolan arabelleği öğesine PipeWriterkopyalar.
  • uygun şekilde öğesini Advance çağırır GetSpanve öğesini çağırırFlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

İptal

FlushAsync bir CancellationTokengeçirmeyi destekler. Bekleyen bir CancellationToken temizleme işlemi varken belirteç iptal edilirse bir sonuç OperationCanceledException geçirilir. PipeWriter.FlushAsync , özel durum oluşturmadan aracılığıyla PipeWriter.CancelPendingFlush geçerli temizleme işlemini iptal etmenin bir yolunu destekler. Çağırma PipeWriter.CancelPendingFlush , veya öğesine yapılan geçerli veya sonraki çağrının PipeWriter.FlushAsyncPipeWriter.WriteAsync olarak ayarlanmış olarak bir FlushResultIsCanceled döndürmesine trueneden olur. Bu, boşaltmayı yıkıcı olmayan ve istisnai olmayan bir şekilde durdurmak için yararlı olabilir.

PipeWriter yaygın sorunları

  • GetSpan ve GetMemory en az istenen bellek miktarına sahip bir arabellek döndürür. Tam arabellek boyutlarını varsaymayın .
  • Ardışık çağrıların aynı arabelleği veya aynı boyutlu arabelleği döndüreceğinin garantisi yoktur.
  • Daha fazla veri yazmaya devam etmek için çağrıldıktan Advance sonra yeni bir arabellek istenmelidir. Daha önce alınan arabellek için yazılamaz.
  • Çağrısı GetMemory veya GetSpan tamamlanmamış bir arama FlushAsync olduğunda güvenli değildir.
  • CompleteAsync Veya Complete şişirilmemiş veriler olduğunda çağrılması bellek bozulmasına neden olabilir.

PipeReader ve PipeWriter kullanmak için İpuçları

Aşağıdaki ipuçları sınıfları başarıyla kullanmanıza System.IO.Pipelines yardımcı olur:

  • Uygun olduğunda bir özel durum da dahil olmak üzere PipeReader ve PipeWriter'ı her zaman tamamlayın.
  • çağrısı yaptıktan sonra her zaman arayın PipeReader.AdvanceToPipeReader.ReadAsync.
  • Yazarken düzenli aralıklarla awaitPipeWriter.FlushAsync ve her zaman denetleyin FlushResult.IsCompleted. okuyucunun tamamlandığını ve artık ne yazıldığını umursamadığını gösterdiğinden , ise IsCompletedtrueyazmayı durdurun.
  • Erişimi olmasını istediğiniz PipeReader bir şey yazdıktan sonra arama PipeWriter.FlushAsync yapın.
  • Okuyucu bitene kadar FlushAsync başlayamazsa aramayın FlushAsync çünkü bu kilitlenmeye neden olabilir.
  • Veya'ya yalnızca bir bağlam "sahip" PipeReaderPipeWriter olduğundan veya bu bağlamlara eriştiğine emin olun. Bu türler iş parçacığı açısından güvenli değildir.
  • öğesini çağırdıktan AdvanceTo veya tamamladıktan sonra hiçbir zaman öğesine ReadResult.Buffer erişemezPipeReader.

IDuplexPipe

IDuplexPipe, hem okumayı hem de yazmayı destekleyen türler için bir sözleşmedir. Örneğin, bir ağ bağlantısı ile IDuplexPipetemsil edilir.

ve içeren PipeReader sürümünden farklı PipeolarakPipeWriter, IDuplexPipe tam çift yönlü bağlantının tek bir tarafını temsil eder. Bu, öğesine PipeWriter yazılanların'dan PipeReaderokunmayacağı anlamına gelir.

Akışlar

Akış verilerini okurken veya yazarken genellikle seri hale getirici kullanarak verileri okur ve seri hale getirici kullanarak veri yazarsınız. Bu okuma ve yazma akışı API'lerinin çoğunun parametresi Stream vardır. Bu mevcut API'lerle PipeReader tümleştirmeyi kolaylaştırmak ve PipeWriter bir AsStream yöntemi kullanıma sunma. AsStreamveya PipeWriterçevresinde PipeReader bir Stream uygulama döndürür.

Akış örneği

PipeReaderve PipeWriter örnekleri, bir Stream nesne ve isteğe bağlı olarak ilgili oluşturma seçenekleri verilen statik Create yöntemler kullanılarak oluşturulabilir.

aşağıdaki StreamPipeReaderOptions parametrelerle örneğin oluşturulması PipeReader üzerinde denetime izin verir:

aşağıdaki StreamPipeWriterOptions parametrelerle örneğin oluşturulması PipeWriter üzerinde denetime izin verir:

Önemli

yöntemleri kullanarak Create ve PipeWriter örnekleri oluştururken PipeReader nesne ömrünü Stream göz önünde bulundurmanız gerekir. Okuyucu veya yazıcıyla işlem tamamlandıktan sonra akışa erişmeniz gerekiyorsa, oluşturma seçeneklerinde bayrağını true olarak ayarlamanız LeaveOpen gerekir. Aksi takdirde akış kapatılır.

Aşağıdaki kod, bir akıştan yöntemleri kullanarak ve PipeWriter örneklerinin PipeReader oluşturulmasını Create gösterir.

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

UygulamaStreamReader, lorem-ipsum.txt dosyasını akış olarak okumak için kullanır ve boş bir satırla bitmelidir. FileStream öğesine geçirilir PipeReader.Createve bu da bir PipeReader nesnenin örneğini oluşturur. Konsol uygulaması daha sonra standart çıkış akışını kullanarak Console.OpenStandardOutput()öğesine PipeWriter.Create geçirir. Örnek iptali destekler.