System.IO.Pipelines a .NET-ben

System.IO.Pipelines egy olyan kódtár, amelynek célja, hogy megkönnyítse a nagy teljesítményű I/O használatát a .NET-ben. Ez egy .NET Standardot célzó kódtár, amely minden .NET-implementáción működik.

A kódtár a System.IO.Pipelines Nuget csomagban érhető el.

Milyen problémát old meg a System.IO.Pipelines?

A streamelési adatokat elemző alkalmazások olyan sablonkódból állnak, amely számos speciális és szokatlan kódfolyamattal rendelkezik. A kazánlemez és a speciális esetkód összetett és nehezen karbantartható.

System.IO.Pipelines a következőre lett kitalálták:

  • Nagy teljesítményű streamelési adatok elemzése.
  • A kód összetettségének csökkentése.

Az alábbi kód jellemzően egy olyan TCP-kiszolgálóra jellemző, amely sorhatárolt üzeneteket fogad egy ügyféltől (tagolt) '\n'az ügyféltől:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

Az előző kódnak több problémája is van:

  • Előfordulhat, hogy a teljes üzenet (a sor vége) nem érkezik meg egyetlen hívásban.ReadAsync
  • Figyelmen kívül hagyja a .stream.ReadAsync stream.ReadAsync azt adja vissza, hogy mennyi adat lett beolvasva.
  • Nem kezeli azt az esetet, amikor egyetlen ReadAsync hívásban több sor van beolvasva.
  • Egy tömböt byte foglal le minden egyes olvasással.

Az előző problémák megoldásához a következő módosítások szükségesek:

  • Pufferelje a bejövő adatokat, amíg új sort nem talál.

  • Elemezni kell a pufferben visszaadott összes sort.

  • Lehetséges, hogy a sor nagyobb, mint 1 KB (1024 bájt). A kódnak át kell méreteznie a bemeneti puffert, amíg meg nem találja a határolót, hogy elférjen a teljes sor a pufferben.

    • Ha a puffer átméretezve van, a rendszer további puffermásolatokat készít, amint hosszabb sorok jelennek meg a bemenetben.
    • Az elpazarolt hely csökkentése érdekében tömörítse az olvasóvonalakhoz használt puffert.
  • Fontolja meg a pufferkészletezés használatát a memória ismételt kiosztásának elkerülése érdekében.

  • Az alábbi kód az alábbi problémák némelyikét oldja meg:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

Az előző kód összetett, és nem kezeli az összes azonosított problémát. A nagy teljesítményű hálózatkezelés általában összetett kód írását jelenti a teljesítmény maximalizálása érdekében. System.IO.Pipelines úgy lett kialakítva, hogy megkönnyítse az ilyen típusú kód írását.

Cső

Az Pipe osztály használható párok PipeWriter/PipeReader létrehozására. A következő helyen található minden, a következőbe PipeWriterPipeReaderírt adat:

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Cső alapszintű használata

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

Két hurok van:

  • FillPipeAsync olvasása és Socket írása a PipeWriter.
  • ReadPipeAsync beolvassa a PipeReader bejövő sorokat, és elemzi azokat.

Nincsenek explicit pufferek lefoglalva. Minden pufferkezelés delegálva van az és PipeWriter az PipeReader implementációk számára. A pufferkezelés delegálása megkönnyíti, hogy a kód felhasználása kizárólag az üzleti logikára összpontosítson.

Az első ciklusban:

A második ciklusban a rendszer az PipeReader általa PipeWriterírt puffereket használja fel. A pufferek a foglalatból származnak. A következő hívás:PipeReader.ReadAsync

  • Olyan értéket ReadResult ad vissza, amely két fontos információt tartalmaz:

    • A beolvasott adatok a következő formában ReadOnlySequence<byte>: .
    • Logikai érték IsCompleted , amely azt jelzi, hogy elérte-e az adatok végét (EOF).

A sor végének (EOL) elválasztójának megkeresése és a vonal elemzése után:

  • A logika feldolgozza a puffert, hogy kihagyja a már feldolgozott elemeket.
  • PipeReader.AdvanceTo a meghívásával megállapíthatja, PipeReader hogy mennyi adatot felhasználtak és vizsgáltak meg.

Az olvasó és az író ciklusai a hívással Completevégződnek. Complete lehetővé teszi, hogy a mögöttes cső felszabadítsa a lefoglalt memóriát.

Backpressure és flow control

Ideális esetben az olvasás és az elemzés együtt működik:

  • Az olvasószál a hálózatból származó adatokat használja fel, és pufferekbe helyezi.
  • Az elemzési szál feladata a megfelelő adatstruktúrák létrehozása.

Az elemzés általában több időt vesz igénybe, mint az adatblokkok másolása a hálózatról:

  • Az olvasószál megelőzi az elemzési szálat.
  • Az olvasószálnak lassítania kell vagy több memóriát kell lefoglalnia az elemzési szál adatainak tárolásához.

Az optimális teljesítmény érdekében egyensúly van a gyakori szünetek és a több memória kiosztása között.

Az előző probléma megoldásához két Pipe beállítással szabályozhatja az adatáramlást:

  • PauseWriterThreshold: Meghatározza, hogy mennyi adatot kell pufferelni a szüneteltetni kívánt FlushAsync hívások előtt.
  • ResumeWriterThreshold: Meghatározza, hogy mennyi adatot kell megfigyelnie az olvasónak a folytatáshoz szükséges PipeWriter.FlushAsync hívások előtt.

Diagram with ResumeWriterThreshold and PauseWriterThreshold

PipeWriter.FlushAsync:

  • Hiányos ValueTask<FlushResult> értéket ad vissza, ha a keresztekben PauseWriterThresholdlévő Pipe adatok mennyisége .
  • Akkor fejeződik ValueTask<FlushResult> be, ha kisebb lesz, mint ResumeWriterThreshold.

A gyors kerékpározás megelőzésére két érték szolgál, amelyek egy érték használata esetén fordulhatnak elő.

Példák

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

Az aszinkron kód használata és awaithasználata async általában egy vagy az aktuális SynchronizationContextrendszeren TaskScheduler folytatódik.

Az I/O végrehajtásakor fontos, hogy részletesen szabályozva legyen az I/O végrehajtásának helye. Ez a vezérlő lehetővé teszi a cpu-gyorsítótárak hatékony kihasználását. A hatékony gyorsítótárazás kritikus fontosságú a nagy teljesítményű alkalmazások, például a webkiszolgálók esetében. PipeScheduler szabályozza, hogy hol futnak az aszinkron visszahívások. Alapértelmezés szerint:

  • A rendszer az aktuálisat SynchronizationContext használja.
  • Ha nincs SynchronizationContext, a visszahívások futtatásához a szálkészletet használja.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

A PipeScheduler.ThreadPool az a PipeScheduler implementáció, amely várólistára állítja a szálkészlet visszahívásait. PipeScheduler.ThreadPool az alapértelmezett és általában a legjobb választás. A PipeScheduler.Inline nem szándékos következményeket, például holtpontokat okozhat.

Cső alaphelyzetbe állítása

Gyakran hatékony az objektum újrafelhasználása Pipe . A cső alaphelyzetbe állításához hívja meg PipeReaderReset , ha mind a PipeReaderPipeWriter kettő befejeződött.

PipeReader

PipeReader a hívó nevében kezeli a memóriát. Hívás után PipeReader.ReadAsyncmindig hívjonPipeReader.AdvanceTo. Ez tudatja a PipeReader hívóval, hogy mikor fejezték be a memóriát, hogy nyomon lehessen követni. A ReadOnlySequence<byte> visszaadott érték PipeReader.ReadAsync csak a hívásig PipeReader.AdvanceToérvényes. A hívás PipeReader.AdvanceToután nem lehet használniReadOnlySequence<byte>.

PipeReader.AdvanceTo két SequencePosition argumentumot vesz fel:

  • Az első argumentum határozza meg, hogy mennyi memória volt használatban.
  • A második argumentum határozza meg, hogy a puffer mekkora részét figyelték meg.

Az adatok felhasználtként való megjelölése azt jelenti, hogy a cső vissza tudja adni a memóriát a mögöttes pufferkészletnek. Az adatok megfigyeltként való megjelölése szabályozza, hogy a következő hívás mit tesz PipeReader.ReadAsync . A megfigyelt adatok megjelölése azt jelenti, hogy a következő hívás nem fog visszatérni PipeReader.ReadAsync , amíg több adat nem íródott a csőbe. Bármely más érték a következő hívást kezdeményezi, hogy azonnal visszatérjen PipeReader.ReadAsync a megfigyelt és nem figyelt adatokkal, de a már felhasznált adatokkal nem.

Streamelési adatforgatókönyvek olvasása

A streamelési adatok olvasása során néhány tipikus minta jelenik meg:

  • Adatstreamek esetén elemezz egyetlen üzenetet.
  • Az adatstreamek alapján elemezni kell az összes elérhető üzenetet.

Az alábbi példák a metódust használják egy TryParseLines üzenet elemzéséhez.ReadOnlySequence<byte> TryParseLines Egyetlen üzenetet elemez, és frissíti a bemeneti puffert, hogy levágja az elemzett üzenetet a pufferből. TryParseLines nem része a .NET-nek, hanem a következő szakaszokban használt, felhasználó által írt metódus.

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Egyetlen üzenet olvasása

Az alábbi kód egyetlen üzenetet olvas be egy PipeReader üzenetből, és visszaadja a hívónak.

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

A fenti kód a következőket végzi el:

  • Egyetlen üzenet elemzése.
  • Frissítések a felhasznált SequencePosition és megvizsgált SequencePosition adatokat, hogy a levágott bemeneti puffer elejére mutasson.

A két SequencePosition argumentum azért frissül, mert TryParseLines eltávolítja az elemzett üzenetet a bemeneti pufferből. Ha egyetlen üzenetet elemez a pufferből, a vizsgált pozíciónak az alábbiak egyikének kell lennie:

  • Az üzenet vége.
  • Ha nem található üzenet, a fogadott puffer vége.

Az egyetlen üzenetes eset a legtöbb hibalehetőséget rejti magában. A nem megfelelő értékek átadása memóriakivételt vagy végtelen hurkot eredményezhet. További információ: A PipeReader gyakori problémáinak szakasza ebben a cikkben.

Több üzenet olvasása

Az alábbi kód beolvassa az összes üzenetet egy PipeReader üzenetből, és meghívja ProcessMessageAsync az egyes üzeneteket.

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Érvénytelenítés

PipeReader.ReadAsync:

  • Támogatja a CancellationToken.
  • OperationCanceledException Ha az CancellationToken olvasás függőben van, a megszakítást jelzi.
  • Támogatja az aktuális olvasási művelet PipeReader.CancelPendingReadmegszakításának módját, amely elkerüli a kivételt. A hívás PipeReader.CancelPendingRead hatására az aktuális vagy a következő hívás PipeReader.ReadAsync egy beállított értéket truead vissza ReadResultIsCanceled. Ez hasznos lehet a meglévő olvasási ciklus roncsolásmentes és kivételes módon történő leállításához.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

A PipeReader gyakori problémái

  • Ha rossz értékeket ad át a már beolvasott adatoknak consumed , vagy examined az azt eredményezheti, hogy már beolvassa az adatokat.

  • A vizsgált módon történő buffer.End átadás a következőt eredményezheti:

    • Elakadt adatok
    • Előfordulhat, hogy az adatok nem használnak fel végleges memóriakivételt (OOM). Ha például PipeReader.AdvanceTo(position, buffer.End) egyetlen üzenetet dolgoz fel egyszerre a pufferből.
  • Ha rossz értékeket ad át, consumed az examined végtelen ciklust eredményezhet. Ha például nem változik, PipeReader.AdvanceTo(buffer.Start)buffer.Start a következő hívás PipeReader.ReadAsync azonnal visszatér, mielőtt új adatok érkeznek.

  • Ha rossz értékeket ad át, consumed az examined végtelen pufferelést (végleges OOM) eredményezhet.

  • ReadOnlySequence<byte> A hívás utáni használat PipeReader.AdvanceTo memóriasérülést okozhat (ingyenes használat után).

  • A hívás PipeReader.Complete/CompleteAsync sikertelensége memóriavesztést okozhat.

  • Az olvasási logika ellenőrzése ReadResult.IsCompleted és kilépése a puffer feldolgozása előtt adatvesztést eredményez. A hurok kilépési feltételének a következőn ReadResult.Buffer.IsEmpty kell alapulnia: és ReadResult.IsCompleted. A helytelenül végzett művelet végtelen ciklust eredményezhet.

Problémás kód

Adatvesztés

Az ReadResult adat utolsó szegmensét adja vissza, ha IsCompleted be van állítva true. Ha nem olvassa be az adatokat az olvasási ciklusból való kilépés előtt, az adatvesztéshez vezet.

Figyelmeztetés

Ne használja a következő kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az alábbi minta a PipeReader gyakori problémáinak magyarázatára szolgál.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Figyelmeztetés

NE használja az előző kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az előző minta a PipeReader gyakori problémáinak magyarázatára szolgál.

Végtelen hurok

Az alábbi logika végtelen ciklust eredményezhet, ha az Result.IsCompleted is, true de a pufferben soha nem található teljes üzenet.

Figyelmeztetés

Ne használja a következő kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az alábbi minta a PipeReader gyakori problémáinak magyarázatára szolgál.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Figyelmeztetés

NE használja az előző kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az előző minta a PipeReader gyakori problémáinak magyarázatára szolgál.

Íme egy másik kód, ugyanazzal a problémával. Az ellenőrzés ReadResult.IsCompletedelőtt nem üres puffert keres. Mivel egy folyamatban else ifvan, a rendszer örökké hurokba kerül, ha soha nem jelenik meg teljes üzenet a pufferben.

Figyelmeztetés

Ne használja a következő kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az alábbi minta a PipeReader gyakori problémáinak magyarázatára szolgál.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Figyelmeztetés

NE használja az előző kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az előző minta a PipeReader gyakori problémáinak magyarázatára szolgál.

Nem válaszoló alkalmazás

Ha feltétel nélkül hív meg PipeReader.AdvanceTobuffer.End egy adott pozíciót, examined az alkalmazás nem válaszol egyetlen üzenet elemzésekor. A következő hívás nem fog visszatérni PipeReader.AdvanceTo , amíg:

  • Több adat van a csőbe írva.
  • Az új adatokat korábban nem vizsgálták meg.

Figyelmeztetés

Ne használja a következő kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az alábbi minta a PipeReader gyakori problémáinak magyarázatára szolgál.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Figyelmeztetés

NE használja az előző kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az előző minta a PipeReader gyakori problémáinak magyarázatára szolgál.

Memóriahiány (OOM)

A következő feltételek mellett a következő kód pufferelése addig tart, amíg a OutOfMemoryException következő nem következik be:

  • Nincs maximális üzenetméret.
  • A visszaadott PipeReader adatok nem teljes üzenetet alkotnak. Például nem készít teljes üzenetet, mert a másik oldal egy nagy üzenetet ír (például egy 4 GB-os üzenetet).

Figyelmeztetés

Ne használja a következő kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az alábbi minta a PipeReader gyakori problémáinak magyarázatára szolgál.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Figyelmeztetés

NE használja az előző kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az előző minta a PipeReader gyakori problémáinak magyarázatára szolgál.

Memóriasérülés

A puffert olvasó segítők írásakor a visszaadott hasznos adatokat a hívás Advanceelőtt át kell másolni. Az alábbi példa visszaadja az Pipe elvetett memóriát, és újra felhasználhatja a következő művelethez (olvasási/írási).

Figyelmeztetés

Ne használja a következő kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az alábbi minta a PipeReader gyakori problémáinak magyarázatára szolgál.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Figyelmeztetés

NE használja az előző kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az előző minta a PipeReader gyakori problémáinak magyarázatára szolgál.

PipeWriter

A PipeWriter pufferek kezelése a hívó nevében történő íráshoz. PipeWriterimplementálja.IBufferWriter<byte> IBufferWriter<byte> lehetővé teszi a pufferekhez való hozzáférést az írások további puffermásolatok nélkül történő végrehajtásához.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

Az előző kód:

  • Legalább 5 bájt puffert kér a PipeWriter felhasználótól GetMemory.
  • Bájtokat ír az ASCII-sztringhez "Hello" a visszaadott Memory<byte>értékre.
  • Hívások Advance a pufferbe írt bájtok számának jelzésére.
  • Kiüríti a PipeWriterbájtokat a mögöttes eszközre küldő bájtokat.

Az előző írási módszer a .PipeWriter A következőt is használhatta PipeWriter.WriteAsyncvolna:

  • Másolja a meglévő puffert a PipeWriter.
  • Hívások GetSpan, Advance szükség szerint és hívások FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

Érvénytelenítés

FlushAsync támogatja a CancellationToken. Eredmény OperationCanceledException átadásaCancellationToken, ha a jogkivonat megszakad, amíg egy kiürítés függőben van. PipeWriter.FlushAsync lehetővé teszi az aktuális kiürítési művelet PipeWriter.CancelPendingFlush megszakítását kivétel nélkül. A hívás PipeWriter.CancelPendingFlush hatására az aktuális vagy a következő hívás PipeWriter.FlushAsync egy adott értékre IsCanceled van állítvatrue.PipeWriter.WriteAsyncFlushResult Ez hasznos lehet a hozam öblítés non-destruktív és nem kivételes módon történő megállításához.

PipeWriter – gyakori problémák

  • GetSpan és GetMemory adjon vissza egy puffert legalább a kért memóriamennyiséggel. Ne feltételezze a puffer pontos méretét.
  • Nincs garancia arra, hogy az egymást követő hívások ugyanazt a puffert vagy azonos méretű puffert fogják visszaadni.
  • A további adatok írásának folytatásához a hívás Advance után új puffert kell kérni. A korábban beszerzett puffer nem írható be.
  • A hívás GetMemory vagy GetSpan a nem teljes hívás FlushAsync nem biztonságos.
  • A hívás Complete vagy CompleteAsync a nem befolyt adatok memóriasérülést okozhatnak.

Tippek a PipeReader és a PipeWriter használatához

Az alábbi tippek segítenek az System.IO.Pipelines osztályok sikeres használatához:

  • Mindig végezze el a PipeReadert és a PipeWritert, adott esetben kivételt is beleértve.
  • Hívás után PipeReader.ReadAsyncmindig hívjonPipeReader.AdvanceTo.
  • awaitPipeWriter.FlushAsync Rendszeresen írás közben, és mindig ellenőrizzeFlushResult.IsCompleted. Megszakítja az írást, ha IsCompleted van true, mivel ez azt jelzi, hogy az olvasó befejeződött, és már nem érdekli, hogy mi van megírva.
  • PipeWriter.FlushAsync Hívás után írjon valamit, amelyhez hozzá szeretne PipeReader férni.
  • Ne hívjon, FlushAsync ha az olvasó nem tud elindulni, amíg FlushAsync be nem fejeződik, mert ez holtpontot okozhat.
  • Győződjön meg arról, hogy csak egy környezet rendelkezik PipeReaderPipeWriter vagy fér hozzá hozzájuk. Ezek a típusok nem szálbiztosak.
  • Soha ne férhessen hozzá a ReadResult.Buffer hívás AdvanceTo vagy a befejezés után.PipeReader

IDuplexPipe

Ez IDuplexPipe egy olyan szerződés, amely az olvasást és az írást egyaránt támogatja. Egy hálózati kapcsolatot például egy IDuplexPipe.

Ellentétben Pipea teljes kétoldalas kapcsolat egyetlen oldalával, IDuplexPipe amely egy PipeReader és egy PipeWriterelemet tartalmaz. Ez azt jelenti, hogy amit írnak, az PipeWriter nem lesz olvasható a PipeReader.

Adatfolyamok

Streamadatok olvasása vagy írása során általában szerializálóval olvas be adatokat, és szerializálóval adatokat ír. Az olvasási és írási stream API-k többsége rendelkezik paraméterrel Stream . Hogy könnyebben integrálható legyen ezekkel a meglévő API-kkal, PipeReader és PipeWriter közzétehesse a metódust AsStream . AsStreamA egy implementációt Stream ad vissza a (vagyPipeWriter)PipeReader

Példa streamelésre

PipeReader és PipeWriter a példányok létrehozhatók statikus Create metódusokkal, adott Stream objektum és opcionálisan megfelelő létrehozási lehetőségek használatával.

A StreamPipeReaderOptions példány létrehozásának engedélyezése a PipeReader következő paraméterekkel:

  • StreamPipeReaderOptions.BufferSize az a minimális pufferméret bájtban, amelyet a készletből való memóriabérléskor használnak, és az alapértelmezett érték a következő: 4096.
  • StreamPipeReaderOptions.LeaveOpen jelölő határozza meg, hogy a mögöttes stream nyitva marad-e a PipeReader befejezés után, és az alapértelmezett érték a következő lesz false.
  • StreamPipeReaderOptions.MinimumReadSizeA pufferben maradó bájtok küszöbértékét jelöli az új puffer lefoglalása előtt, az alapértelmezett érték pedig a következő.1024
  • StreamPipeReaderOptions.PoolMemoryPool<byte> a memória kiosztásakor használatos, alapértelmezett értéke pedig a következő: null.

A StreamPipeWriterOptions példány létrehozásának engedélyezése a PipeWriter következő paraméterekkel:

Fontos

A metódusok létrehozásakor PipeReader és PipeWriter a példányok használatakor Create figyelembe kell vennie az objektum élettartamát Stream . Ha az olvasó vagy író befejezése után hozzá kell férnie a streamheztrue, a létrehozási beállításoknál be kell állítania a LeaveOpen jelölőt. Ellenkező esetben a stream bezárul.

Az alábbi kód bemutatja, hogy egy stream metódusait PipeReader használva Create hozhat létre példányokat és PipeWriter példányokat.

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

Az alkalmazás a StreamReader lorem-ipsum.txt fájlt adatfolyamként olvassa be, és üres sortal kell végződnie. Az FileStream át lett adva PipeReader.Create, amely létrehoz egy objektumot PipeReader . A konzolalkalmazás ezután átadja a szabványos kimeneti streamet a használatnak PipeWriter.CreateConsole.OpenStandardOutput(). A példa támogatja a lemondást.