Lezen in het Engels

Delen via


System.IO.Pipelines in .NET

System.IO.Pipelines is een bibliotheek die is ontworpen om het eenvoudiger te maken om I/O met hoge prestaties in .NET uit te voeren. Het is een bibliotheek die is gericht op .NET Standard die werkt op alle .NET-implementaties.

De bibliotheek is beschikbaar in het Nuget-pakket System.IO.Pipelines .

Welk probleem lost System.IO.Pipelines op

Apps die streaminggegevens parseren, bestaan uit standaardcode met veel gespecialiseerde en ongebruikelijke codestromen. De standaard- en speciale casecode is complex en moeilijk te onderhouden.

System.IO.Pipelines is ontworpen voor:

  • Streaminggegevens met hoge prestaties parseren.
  • Verminder de complexiteit van code.

De volgende code is gebruikelijk voor een TCP-server die berichten met regelscheidingstekens ontvangt (gescheiden door) '\n'van een client:

C#
async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

De voorgaande code heeft verschillende problemen:

  • Het hele bericht (einde van regel) wordt mogelijk niet ontvangen in één aanroep naar ReadAsync.
  • Het negeert het resultaat van stream.ReadAsync. stream.ReadAsync retourneert hoeveel gegevens zijn gelezen.
  • Het behandelt niet het geval waarbij meerdere regels in één ReadAsync aanroep worden gelezen.
  • Er wordt een byte matrix met elke leesbewerking toegewezen.

Om de voorgaande problemen op te lossen, zijn de volgende wijzigingen vereist:

  • Buffer de binnenkomende gegevens totdat er een nieuwe regel wordt gevonden.

  • Parseert alle regels die in de buffer worden geretourneerd.

  • Het is mogelijk dat de lijn groter is dan 1 kB (1024 bytes). De code moet het formaat van de invoerbuffer wijzigen totdat het scheidingsteken wordt gevonden om de volledige lijn in de buffer te kunnen aanpassen.

    • Als het formaat van de buffer wordt gewijzigd, worden er meer bufferkopieën gemaakt naarmate er langere regels in de invoer worden weergegeven.
    • Om verspilde ruimte te verminderen, compergeer de buffer die wordt gebruikt voor leeslijnen.
  • Overweeg het gebruik van bufferpooling om het toewijzen van geheugen herhaaldelijk te voorkomen.

  • Met de volgende code worden enkele van deze problemen opgelost:

C#
async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

De vorige code is complex en heeft geen betrekking op alle geïdentificeerde problemen. Netwerken met hoge prestaties betekenen meestal het schrijven van complexe code om de prestaties te maximaliseren. System.IO.Pipelines is ontworpen om het schrijven van dit type code eenvoudiger te maken.

Pijp

De Pipe klasse kan worden gebruikt om een PipeWriter/PipeReader paar te maken. Alle gegevens die in de PipeWriter gegevens zijn geschreven, zijn beschikbaar in het PipeReadervolgende:

C#
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Basisgebruik van pijpen

C#
async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

Er zijn twee lussen:

  • FillPipeAsync leest van de Socket en schrijft naar de PipeWriter.
  • ReadPipeAsync leest van de PipeReader binnenkomende regels en parseert deze.

Er zijn geen expliciete buffers toegewezen. Alle bufferbeheer wordt gedelegeerd aan de PipeReader en PipeWriter implementaties. Het delegeren van bufferbeheer maakt het eenvoudiger om code te gebruiken om zich alleen te richten op de bedrijfslogica.

In de eerste lus:

In de tweede lus verbruikt de PipeReader buffers die zijn geschreven door PipeWriter. De buffers zijn afkomstig van de socket. De oproep naar PipeReader.ReadAsync:

  • Retourneert een ReadResult met twee belangrijke gegevens:

    • De gegevens die zijn gelezen in de vorm van ReadOnlySequence<byte>.
    • Een Booleaanse waarde IsCompleted die aangeeft of het einde van de gegevens (EOF) is bereikt.

Nadat u het einde van het regelscheidingsteken (EOL) hebt gevonden en de regel hebt geparserd:

  • De logica verwerkt de buffer om over te slaan wat al is verwerkt.
  • PipeReader.AdvanceTo wordt aangeroepen om te vertellen PipeReader hoeveel gegevens er zijn verbruikt en onderzocht.

De lezer- en schrijflussen eindigen door aan te roepen Complete. Complete laat de onderliggende pipe het geheugen vrijgeven dat is toegewezen.

Backpressure en stroombesturing

In het ideale voorbeeld werken lezen en parseren samen:

  • De leesthread verbruikt gegevens uit het netwerk en plaatst deze in buffers.
  • De parseringsthread is verantwoordelijk voor het samenstellen van de juiste gegevensstructuren.

Het parseren kost doorgaans meer tijd dan het kopiëren van blokken gegevens uit het netwerk:

  • De leesthread loopt voor op de parseringsthread.
  • De leesthread moet vertragen of meer geheugen toewijzen om de gegevens voor de parseringsthread op te slaan.

Voor optimale prestaties is er een balans tussen frequente pauzes en het toewijzen van meer geheugen.

Om het voorgaande probleem op te lossen, heeft de Pipe twee instellingen voor het beheren van de gegevensstroom:

Diagram met ResumeWriterThreshold en PauseWriterThreshold

PipeWriter.FlushAsync:

  • Retourneert een onvolledige ValueTask<FlushResult> hoeveelheid gegevens in de Pipe kruisingen PauseWriterThreshold.
  • ValueTask<FlushResult> Wordt voltooid wanneer deze lager wordt dan ResumeWriterThreshold.

Er worden twee waarden gebruikt om snelle cyclussen te voorkomen, die kunnen optreden als één waarde wordt gebruikt.

Voorbeelden

C#
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

Wanneer u doorgaans asynchrone code gebruikt async en awaitasynchroon wordt hervat op een TaskScheduler of de huidige SynchronizationContextcode.

Bij het uitvoeren van I/O is het belangrijk om nauwkeurige controle te hebben over waar de I/O wordt uitgevoerd. Met dit besturingselement kunt u effectief profiteren van CPU-caches. Efficiënte caching is essentieel voor krachtige apps zoals webservers. PipeScheduler biedt controle over waar asynchrone callbacks worden uitgevoerd. Standaard:

  • De huidige SynchronizationContext wordt gebruikt.
  • Als er geen SynchronizationContextis, wordt de thread-pool gebruikt om callbacks uit te voeren.
C#
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPool is de PipeScheduler implementatie waarmee callbacks naar de thread-pool worden geplaatst. PipeScheduler.ThreadPool is de standaardinstelling en over het algemeen de beste keuze. PipeScheduler.Inline kan onbedoelde gevolgen veroorzaken, zoals impasses.

Pijp opnieuw instellen

Het is vaak efficiënt om het Pipe object opnieuw te gebruiken. Als u de pipe opnieuw wilt instellen, roept PipeReader Reset u aan wanneer zowel de PipeReader als PipeWriter de pijp is voltooid.

PipeReader

PipeReader beheert het geheugen namens de beller. Bel altijd na het bellenPipeReader.ReadAsync.PipeReader.AdvanceTo Hiermee wordt PipeReader aangegeven wanneer de beller klaar is met het geheugen, zodat deze kan worden bijgehouden. De ReadOnlySequence<byte> geretourneerde PipeReader.ReadAsync waarde is alleen geldig tot de aanroep .PipeReader.AdvanceTo Het is illegaal om te gebruiken ReadOnlySequence<byte> na het bellen PipeReader.AdvanceTo.

PipeReader.AdvanceTo heeft twee SequencePosition argumenten:

  • Het eerste argument bepaalt hoeveel geheugen is verbruikt.
  • Het tweede argument bepaalt hoeveel van de buffer is waargenomen.

Het markeren van gegevens als verbruikt betekent dat de pijp het geheugen kan retourneren naar de onderliggende buffergroep. Als u gegevens markeert zoals waargenomen, bepaalt u wat de volgende aanroep doet PipeReader.ReadAsync . Als u alles markeert zoals waargenomen, betekent dit dat de volgende aanroep PipeReader.ReadAsync niet wordt geretourneerd totdat er meer gegevens naar de pijp worden geschreven. Elke andere waarde zal de volgende aanroep maken om onmiddellijk te PipeReader.ReadAsync retourneren met de waargenomen en niet-geobserveerde gegevens, maar niet met gegevens die al zijn gebruikt.

Scenario's voor streaminggegevens lezen

Er zijn een aantal typische patronen die zich voordoen bij het lezen van streaminggegevens:

  • Met een gegevensstroom parseert u één bericht.
  • Met een gegevensstroom parseert u alle beschikbare berichten.

In de volgende voorbeelden wordt de methode gebruikt voor het TryParseLines parseren van berichten uit een ReadOnlySequence<byte>. TryParseLines parseert één bericht en werkt de invoerbuffer bij om het geparseerde bericht uit de buffer te knippen. TryParseLines maakt geen deel uit van .NET, het is een door de gebruiker geschreven methode die in de volgende secties wordt gebruikt.

C#
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Eén bericht lezen

De volgende code leest één bericht van een PipeReader en retourneert het bericht naar de aanroeper.

C#
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

Met de voorgaande code wordt:

  • Parseert één bericht.
  • Werkt de verbruikte en onderzochte SequencePosition SequencePosition gegevens bij zodat deze verwijzen naar het begin van de ingekorte invoerbuffer.

De twee SequencePosition argumenten worden bijgewerkt omdat TryParseLines het geparseerde bericht uit de invoerbuffer wordt verwijderd. Over het algemeen moet bij het parseren van één bericht uit de buffer de onderzochte positie een van de volgende zijn:

  • Het einde van het bericht.
  • Het einde van de ontvangen buffer als er geen bericht is gevonden.

De case met één bericht heeft het meeste potentieel voor fouten. Het doorgeven van de verkeerde waarden die moeten worden onderzocht , kan resulteren in een uitzondering op het geheugen of een oneindige lus. Zie de sectie Algemene problemen van PipeReader in dit artikel voor meer informatie.

Meerdere berichten lezen

Met de volgende code worden alle berichten van een PipeReader en aanroepen ProcessMessageAsync op elke code gelezen.

C#
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Opzegging

PipeReader.ReadAsync:

  • Ondersteunt het doorgeven van een CancellationToken.
  • Genereert een OperationCanceledException als de CancellationToken status wordt geannuleerd terwijl er een leesbewerking in behandeling is.
  • Ondersteunt een manier om de huidige leesbewerking te annuleren via PipeReader.CancelPendingRead, waardoor er geen uitzondering wordt gegenereerd. Het aanroepen PipeReader.CancelPendingRead zorgt ervoor dat de huidige of volgende aanroep PipeReader.ReadAsync een ReadResult met IsCanceled set retourneert.true Dit kan handig zijn voor het stoppen van de bestaande leeslus op een niet-destructieve en niet-uitzonderlijke manier.
C#
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Veelvoorkomende problemen met PipeReader

  • Het doorgeven van de verkeerde waarden aan consumed of examined kan leiden tot het lezen van gegevens die al zijn gelezen.

  • Het doorgeven buffer.End als onderzocht kan het volgende tot gevolg hebben:

    • Vastgelopen gegevens
    • Mogelijk een OOM-uitzondering (Out of Memory) als er geen gegevens worden gebruikt. Bijvoorbeeld bij PipeReader.AdvanceTo(position, buffer.End) het verwerken van één bericht tegelijk vanuit de buffer.
  • Het doorgeven van de verkeerde waarden aan consumed of examined kan resulteren in een oneindige lus. Als buffer.Start dit bijvoorbeeld niet is gewijzigd, PipeReader.AdvanceTo(buffer.Start) wordt de volgende aanroep PipeReader.ReadAsync onmiddellijk geretourneerd voordat nieuwe gegevens binnenkomen.

  • Het doorgeven van de verkeerde waarden aan consumed of examined kan resulteren in oneindige buffering (uiteindelijke OOM).

  • Als u de ReadOnlySequence<byte> aanroep na aanroep PipeReader.AdvanceTo gebruikt, kan dit leiden tot beschadiging van het geheugen (gebruik na gratis gebruik).

  • Het niet aanroepen PipeReader.Complete/CompleteAsync kan leiden tot een geheugenlek.

  • Het controleren ReadResult.IsCompleted en afsluiten van de leeslogica voordat de buffer wordt verwerkt, resulteert in gegevensverlies. De voorwaarde voor het afsluiten van de lus moet zijn gebaseerd op ReadResult.Buffer.IsEmpty en ReadResult.IsCompleted. Als u dit onjuist doet, kan dit resulteren in een oneindige lus.

Problematische code

Gegevensverlies

Het ReadResult laatste segment van de gegevens kan worden geretourneerd wanneer IsCompleted deze is ingesteld op true. Als u deze gegevens niet leest voordat u de leeslus afsluit, gaan gegevens verloren.

Waarschuwing

Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.

C#
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Waarschuwing

Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.

Oneindige lus

De volgende logica kan resulteren in een oneindige lus als de Result.IsCompleted is true , maar er nooit een volledig bericht in de buffer staat.

Waarschuwing

Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.

C#
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Waarschuwing

Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.

Hier volgt nog een stukje code met hetzelfde probleem. Er wordt gecontroleerd op een niet-lege buffer voordat deze wordt gecontroleerd ReadResult.IsCompleted. Omdat het zich in een else ifbevindt, wordt het voor altijd herhaald als er nooit een volledig bericht in de buffer staat.

Waarschuwing

Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.

C#
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Waarschuwing

Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.

Niet-reagerende toepassing

Onvoorwaardelijke aanroepen PipeReader.AdvanceTo met buffer.End in de examined positie kan ertoe leiden dat de toepassing niet meer reageert bij het parseren van één bericht. De volgende aanroep om pas terug te PipeReader.AdvanceTo keren:

  • Er zijn meer gegevens naar de pijp geschreven.
  • En de nieuwe gegevens zijn nog niet eerder onderzocht.

Waarschuwing

Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.

C#
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Waarschuwing

Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.

Onvoldoende geheugen (OOM)

Met de volgende voorwaarden blijft de volgende code bufferen totdat er een OutOfMemoryException probleem optreedt:

  • Er is geen maximale berichtgrootte.
  • De gegevens die door de PipeReader gegevens worden geretourneerd, maken geen volledig bericht. Het maakt bijvoorbeeld geen volledig bericht omdat de andere kant een groot bericht schrijft (bijvoorbeeld een bericht van 4 GB).

Waarschuwing

Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.

C#
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Waarschuwing

Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.

Geheugenbeschadiging

Bij het schrijven van helpers die de buffer lezen, moet elke geretourneerde nettolading worden gekopieerd voordat u aanroept Advance. In het volgende voorbeeld wordt geheugen geretourneerd dat is Pipe verwijderd en opnieuw kan worden gebruikt voor de volgende bewerking (lezen/schrijven).

Waarschuwing

Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.

C#
public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
C#
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Waarschuwing

Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.

PipeWriter

De PipeWriter beheert buffers voor het schrijven namens de beller. PipeWriter implementeert IBufferWriter<byte>. IBufferWriter<byte> maakt het mogelijk om toegang te krijgen tot buffers om schrijfbewerkingen uit te voeren zonder extra bufferkopieën.

C#
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

De vorige code:

  • Vraagt een buffer aan van ten minste 5 bytes van het PipeWriter gebruik GetMemory.
  • Schrijft bytes voor de ASCII-tekenreeks naar de geretourneerde Memory<byte>tekenreeks"Hello".
  • Aanroepen Advance om aan te geven hoeveel bytes naar de buffer zijn geschreven.
  • Hiermee worden de PipeWriterbytes naar het onderliggende apparaat verzonden.

De vorige schrijfmethode maakt gebruik van de buffers die door de PipeWriter. Het kan ook hebben gebruikt PipeWriter.WriteAsync, wat:

  • Kopieert de bestaande buffer naar de PipeWriter.
  • Oproepen GetSpan, Advance indien van toepassing en oproepen FlushAsync.
C#
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

Opzegging

FlushAsync ondersteunt het doorgeven van een CancellationToken. Het doorgeven van een CancellationToken resultaat in een OperationCanceledException als het token wordt geannuleerd terwijl er een leegmaken in behandeling is. PipeWriter.FlushAsync ondersteunt een manier om de huidige flush-bewerking te annuleren zonder PipeWriter.CancelPendingFlush een uitzondering op te geven. Het aanroepen PipeWriter.CancelPendingFlush veroorzaakt de huidige of volgende oproep naar PipeWriter.FlushAsync of PipeWriter.WriteAsync retourneert een FlushResult met IsCanceled de instelling true. Dit kan nuttig zijn voor het stoppen van de opbrengst op een niet-destructieve en niet-uitzonderlijke manier.

Veelvoorkomende problemen met PipeWriter

  • GetSpan en GetMemory retourneer een buffer met ten minste de aangevraagde hoeveelheid geheugen. Neem niet de exacte buffergrootten aan.
  • Er is geen garantie dat opeenvolgende aanroepen dezelfde buffer of dezelfde buffer met dezelfde grootte retourneren.
  • Er moet een nieuwe buffer worden aangevraagd na het aanroepen Advance om meer gegevens te kunnen schrijven. De eerder verkregen buffer kan niet worden weggeschreven naar.
  • Bellen GetMemory of GetSpan terwijl er een onvolledig gesprek is, FlushAsync is niet veilig.
  • Het aanroepen Complete of CompleteAsync terwijl er niet-geflusheerde gegevens zijn, kan leiden tot beschadiging van het geheugen.

Tips voor het gebruik van PipeReader en PipeWriter

Met de volgende tips kunt u de System.IO.Pipelines klassen gebruiken:

  • Voltooi altijd de PipeReader en PipeWriter, inclusief een uitzondering indien van toepassing.
  • PipeReader.AdvanceTo Bel altijd na het bellenPipeReader.ReadAsync.
  • Regelmatig tijdens await PipeWriter.FlushAsync het schrijven en altijd controleren FlushResult.IsCompleted. Schrijf af als IsCompleted dat zo is true, zoals dat aangeeft dat de lezer is voltooid en geeft niet langer om wat er is geschreven.
  • PipeWriter.FlushAsync Bel na het schrijven van iets waartoe u toegang wilt PipeReader hebben.
  • Roep niet FlushAsync aan als de lezer pas kan beginnen als FlushAsync deze is voltooid, omdat dit een impasse kan veroorzaken.
  • Zorg ervoor dat slechts één context eigenaar is van een PipeReader of PipeWriter of toegang heeft tot deze context. Deze typen zijn niet thread-safe.
  • Nooit toegang krijgen tot een ReadResult.Buffer na het aanroepen AdvanceTo of voltooien van de PipeReader.

IDuplexPipe

Het IDuplexPipe is een contract voor typen die zowel lezen als schrijven ondersteunen. Een netwerkverbinding wordt bijvoorbeeld vertegenwoordigd door een IDuplexPipe.

In tegenstelling tot Pipe, dat een PipeReader en een PipeWriterbevat, IDuplexPipe vertegenwoordigt een enkele zijde van een volledige duplex-verbinding. Dat betekent dat wat naar de PipeWriter brief wordt geschreven, niet wordt gelezen uit de PipeReader.

Stromen

Wanneer u streamgegevens leest of schrijft, leest u doorgaans gegevens met behulp van een de-serializer en schrijft u gegevens met behulp van een serializer. De meeste van deze stream-API's voor lezen en schrijven hebben een Stream parameter. Om het gemakkelijker te maken om te integreren met deze bestaande API's PipeReader en PipeWriter een AsStream methode beschikbaar te maken. AsStream retourneert een Stream implementatie rond de PipeReader of PipeWriter.

Stream-voorbeeld

PipeReader en PipeWriter exemplaren kunnen worden gemaakt met behulp van de statische Create methoden op basis van een Stream object en optionele overeenkomstige aanmaakopties.

De StreamPipeReaderOptions mogelijkheid voor controle over het maken van het PipeReader exemplaar met de volgende parameters:

  • StreamPipeReaderOptions.BufferSize is de minimale buffergrootte in bytes die worden gebruikt bij het huren van geheugen van de pool, en standaard ingesteld op 4096.
  • StreamPipeReaderOptions.LeaveOpen vlag bepaalt of de onderliggende stroom al dan niet open blijft nadat de PipeReader bewerking is voltooid en dat de standaardwaarden zijn ingesteld falseop .
  • StreamPipeReaderOptions.MinimumReadSize vertegenwoordigt de drempelwaarde van resterende bytes in de buffer voordat een nieuwe buffer wordt toegewezen, en wordt standaard ingesteld op 1024.
  • StreamPipeReaderOptions.Pool wordt gebruikt bij het MemoryPool<byte> toewijzen van geheugen en wordt standaard ingesteld op null.

De StreamPipeWriterOptions mogelijkheid voor controle over het maken van het PipeWriter exemplaar met de volgende parameters:

Belangrijk

Bij het maken PipeReader en PipeWriter uitvoeren van exemplaren met behulp van de Create methoden moet u rekening houden met de levensduur van het Stream object. Als u toegang tot de stream nodig hebt nadat de lezer of schrijver ermee klaar is, moet u de LeaveOpen vlag true instellen op de aanmaakopties. Anders wordt de stream gesloten.

In de volgende code ziet u hoe u exemplaren kunt maken en PipeWriter exemplaren gebruikt met behulp van PipeReader de Create methoden uit een stream.

C#
using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

De toepassing gebruikt een StreamReader bestand voor het lezen van het lorem-ipsum.txt als een stroom en moet eindigen met een lege regel. De FileStream wordt doorgegeven aan PipeReader.Create, waarmee een PipeReader object wordt geïnstitueerd. De consoletoepassing geeft vervolgens de standaarduitvoerstroom door aan PipeWriter.Create het gebruik van Console.OpenStandardOutput(). In het voorbeeld wordt annulering ondersteund.