System.IO.Pipelines v .NET

System.IO.Pipelines je knihovna, která je navržená tak, aby usnadnila provádění vysoce výkonných vstupně-výstupních operací v .NET. Jedná se o knihovnu určenou pro .NET Standard, která funguje ve všech implementacích .NET.

Knihovna je k dispozici v balíčku NuGet System.IO.Pipelines .

Jaký problém řeší System.IO.Pipelines?

Aplikace, které parsují streamovaná data, se skládají z často používaného kódu s mnoha specializovanými a neobvyklými toky kódu. Často používaný kód a kód speciálního případu je složitý a obtížně se udržuje.

System.IO.Pipelines byl navržen tak, aby:

  • Mají vysoký výkon při analýze streamovaných dat.
  • Snižte složitost kódu.

Následující kód je typický pro server TCP, který od klienta přijímá zprávy oddělené řádky (oddělené znakem '\n') :

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

Předchozí kód má několik problémů:

  • Při jednom volání ReadAsyncnemusí být přijata celá zpráva (konec řádku) .
  • Ignoruje výsledek stream.ReadAsync. stream.ReadAsync vrátí, kolik dat bylo přečteno.
  • Nepracuje s případem, kdy se při jednom ReadAsync volání přečte více řádků.
  • Přiděluje byte pole s každým čtením.

Chcete-li vyřešit předchozí problémy, jsou vyžadovány následující změny:

  • Příchozí data se budou ukládat do vyrovnávací paměti, dokud se nenajde nový řádek.

  • Parsujte všechny řádky vrácené ve vyrovnávací paměti.

  • Je možné, že čára je větší než 1 kB (1024 bajtů). Kód musí změnit velikost vstupní vyrovnávací paměti, dokud se nenajde oddělovač, aby se celý řádek vešel dovnitř vyrovnávací paměti.

    • Pokud se změní velikost vyrovnávací paměti, vytvoří se více kopií vyrovnávací paměti, jakmile se ve vstupu zobrazí delší čáry.
    • Pokud chcete zmenšit plýtvání místem, zkomprimujte vyrovnávací paměť používanou pro čtení řádků.
  • Zvažte použití fondu vyrovnávacích pamětí, abyste se vyhnuli opakovanému přidělování paměti.

  • Následující kód řeší některé z těchto problémů:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

Předchozí kód je složitý a neřeší všechny zjištěné problémy. Vysoce výkonné sítě obvykle znamenají psaní složitého kódu, který maximalizuje výkon. System.IO.Pipelines byla navržena tak, aby usnadnila psaní tohoto typu kódu.

Potrubí

Třídu Pipe lze použít k vytvoření páru PipeWriter/PipeReader . Všechna data zapsaná do jsou PipeWriter k dispozici v :PipeReader

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Základní využití potrubí

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

Existují dvě smyčky:

  • FillPipeAsync čte z Socket a zapisuje do PipeWriter.
  • ReadPipeAsync čte z PipeReader a parsuje příchozí řádky.

Nejsou přidělovány žádné explicitní vyrovnávací paměti. Veškerá správa vyrovnávací paměti je delegovaná na PipeReader implementace a PipeWriter . Delegování správy vyrovnávací paměti usnadňuje využívání kódu soustředit se výhradně na obchodní logiku.

V první smyčce:

Ve druhé smyčce PipeReader využívá vyrovnávací paměti zapsané nástrojem PipeWriter. Vyrovnávací paměti pocházejí ze zásuvky. Volání metody PipeReader.ReadAsync:

  • Vrátí hodnotu , ReadResult která obsahuje dvě důležité informace:

    • Data, která byla přečtená ve tvaru ReadOnlySequence<byte>.
    • Logická hodnota IsCompleted označující, jestli bylo dosaženo konce dat (EOF).

Po vyhledání oddělovače konce řádku (EOL) a analýze čáry:

  • Logika zpracovává vyrovnávací paměť tak, aby přeskočí to, co už je zpracované.
  • PipeReader.AdvanceTo Je volána, aby bylo sdělováno PipeReader , kolik dat se spotřebovalo a prozkoumalo.

Smyčky čtečky a zapisovače končí voláním Complete. Complete umožní podkladovému kanálu uvolnit přidělenou paměť.

Zpětný tlak a řízení toku

V ideálním případě by čtení a analýza fungovaly společně:

  • Vlákno čtení využívá data ze sítě a vkládá je do vyrovnávacích pamětí.
  • Vlákno analýzy je zodpovědné za vytvoření příslušných datových struktur.

Parsování obvykle trvá déle než kopírování bloků dat ze sítě:

  • Vlákno čtení se dostane před vlákno analýzy.
  • Vlákno čtení musí buď zpomalit, nebo přidělit více paměti k uložení dat pro vlákno analýzy.

Pro zajištění optimálního výkonu existuje rovnováhu mezi častými pauzami a přidělením více paměti.

Pokud chcete vyřešit předchozí problém, má nástroj Pipe dvě nastavení pro řízení toku dat:

Diagram s ResumeWriterThreshold a PauseWriterThreshold

PipeWriter.FlushAsync:

  • Vrátí neúplnou ValueTask<FlushResult> hodnotu při překročení PauseWriterThresholdmnožství dat v objektu Pipe .
  • ValueTask<FlushResult> Dokončí se, když se změní na nižší hodnotu než ResumeWriterThreshold.

Dvě hodnoty se používají k prevenci rychlého cyklování, ke kterému může dojít, pokud se použije jedna hodnota.

Příklady

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

Při použití async a awaitse asynchronní kód obvykle obnoví na nebo TaskScheduler aktuální SynchronizationContext.

Při provádění vstupně-výstupních operací je důležité mít přesnou kontrolu nad tím, kde se vstupně-výstupní operace provádějí. Tento ovládací prvek umožňuje efektivní využití mezipamětí procesoru. Efektivní ukládání do mezipaměti je důležité pro vysoce výkonné aplikace, jako jsou webové servery. PipeScheduler poskytuje kontrolu nad tím, kde se asynchronní zpětná volání spouští. Ve výchozím nastavení:

  • Použije se aktuální.SynchronizationContext
  • Pokud neexistuje SynchronizationContext, použije fond vláken ke spouštění zpětných volání.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPool je PipeScheduler implementace, která zařadí zpětná volání do fondu vláken. PipeScheduler.ThreadPool je výchozí a obecně nejlepší volba. PipeScheduler.Inline může způsobit nezamýšlené důsledky, jako je například zablokování.

Resetování potrubí

Opakované použití objektu Pipe je často efektivní. Pokud chcete kanál resetovat, zavolejte PipeReaderReset po dokončení i PipeReaderPipeWriter .

Čtečka potrubí

PipeReader spravuje paměť jménem volajícího. Vždy volejte PipeReader.AdvanceTo po volání PipeReader.ReadAsync. To informuje volajícího PipeReader o dokončení práce s pamětí, aby bylo možné ji sledovat. Hodnota vrácená ReadOnlySequence<byte> z PipeReader.ReadAsync je platná pouze do volání PipeReader.AdvanceTometody . Jeho použití ReadOnlySequence<byte> po volání PipeReader.AdvanceToje neplatné.

PipeReader.AdvanceTo používá dva SequencePosition argumenty:

  • První argument určuje, kolik paměti se spotřebovalo.
  • Druhý argument určuje, jaká část vyrovnávací paměti byla pozorována.

Označení dat jako spotřebovaných znamená, že kanál může vrátit paměť do základního fondu vyrovnávacích pamětí. Označení dat jako pozorovaných řídí, co další volání PipeReader.ReadAsync provede. Označení všeho jako pozorovaného znamená, že další volání PipeReader.ReadAsync se nevrátí, dokud se do kanálu nezapíšou další data. Jakákoli jiná hodnota provede další volání, které PipeReader.ReadAsync okamžitě vrátí pozorovaná a nepozorovaná data, ale ne data, která už byla spotřebována.

Čtení scénářů streamovaných dat

Při pokusu o čtení streamovaných dat se objevuje několik typických vzorů:

  • Vzhledem k datovému proudu parsujte jednu zprávu.
  • Vzhledem k datovému proudu dat parsujte všechny dostupné zprávy.

Následující příklady používají metodu TryParseLines pro analýzu zpráv z ReadOnlySequence<byte>. TryParseLines Parsuje jednu zprávu a aktualizuje vstupní vyrovnávací paměť tak, aby parsovanou zprávu z vyrovnávací paměti ořízla. TryParseLines není součástí .NET, je to metoda napsaná uživatelem, která se používá v následujících částech.

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Přečtení jedné zprávy

Následující kód přečte jednu zprávu z PipeReader a vrátí ji volajícímu.

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

Předchozí kód:

  • Parsuje jednu zprávu.
  • Aktualizace spotřebované a vyšetřené SequencePositionSequencePosition tak, aby ukazovaly na začátek oříznuté vstupní vyrovnávací paměti.

Oba SequencePosition argumenty se aktualizují, protože TryParseLines odeberou analyzovanou zprávu ze vstupní vyrovnávací paměti. Obecně platí, že při analýze jedné zprávy z vyrovnávací paměti by zkoumaná pozice měla být jedna z následujících:

  • Konec zprávy.
  • Konec přijaté vyrovnávací paměti, pokud nebyla nalezena žádná zpráva.

Případ jedné zprávy má největší potenciál k chybám. Předání nesprávných hodnot ke zkoumání může vést k výjimce kvůli nedostatku paměti nebo nekonečné smyčce. Další informace najdete v části Věnované běžným problémům s PipeReaderem v tomto článku.

Čtení více zpráv

Následující kód čte všechny zprávy z PipeReader a a na každé z nich volá ProcessMessageAsync .

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Zrušení

PipeReader.ReadAsync:

  • Podporuje předávání .CancellationToken
  • Vyvolá chybu, OperationCanceledExceptionCancellationToken pokud se zruší, zatímco čeká na čtení.
  • Podporuje způsob, jak zrušit aktuální operaci čtení přes PipeReader.CancelPendingRead, čímž se zabrání vyvolání výjimky. Volání PipeReader.CancelPendingRead způsobí, že aktuální nebo další volání PipeReader.ReadAsync vrátí hodnotu s nastaveným ReadResultIsCanceled na true. To může být užitečné pro zastavení stávající smyčky čtení nedestruktivním a nevýjimným způsobem.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Běžné problémy s PipeReaderem

  • Předání nesprávných hodnot do nebo examined může vést ke consumed čtení již načtených dat.

  • Předání buffer.End v rámci prověření může vést k:

    • Zablokovaná data
    • Pokud se data nespotřebovávají, může dojít k případné výjimce mimo paměť (OOM). Například PipeReader.AdvanceTo(position, buffer.End) při zpracování jedné zprávy najednou z vyrovnávací paměti.
  • Předání nesprávných hodnot do nebo examined může vést k consumed nekonečné smyčce. Pokud se například nezměnil, způsobí to, PipeReader.AdvanceTo(buffer.Start)buffer.Start že se další volání PipeReader.ReadAsync vrátí okamžitě před příchodem nových dat.

  • Předání nesprávných hodnot do nebo examined může vést k consumed nekonečnému ukládání do vyrovnávací paměti (případná OOM).

  • ReadOnlySequence<byte> Použití následujícího volání PipeReader.AdvanceTo může vést k poškození paměti (použití po volném volání).

  • Selhání volání PipeReader.Complete/CompleteAsync může způsobit nevracení paměti.

  • Kontrola ReadResult.IsCompleted a ukončení logiky čtení před zpracováním vyrovnávací paměti způsobí ztrátu dat. Podmínka ukončení smyčky by měla být založena na ReadResult.Buffer.IsEmpty a ReadResult.IsCompleted. Pokud to uděláte nesprávně, může dojít k nekonečné smyčce.

Problematický kód

Ztráta dat

Pokud ReadResult je nastavená hodnota true, může vrátit poslední segment datIsCompleted. Nečte tato data před ukončením smyčky čtení, dojde ke ztrátě dat.

Upozornění

Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka vysvětluje běžné problémy s PipeReaderem.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Upozornění

Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.

Nekonečná smyčka

Následující logika může vést k nekonečné smyčce, pokud Result.IsCompleted je true , ale nikdy není ve vyrovnávací paměti úplná zpráva.

Upozornění

Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka vysvětluje běžné problémy s PipeReaderem.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Upozornění

Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.

Tady je další část kódu se stejným problémem. Před kontrolou kontroluje neprázdnou vyrovnávací paměť ReadResult.IsCompleted. Vzhledem k tomu, že je ve smyčce else if, bude se opakovat, pokud ve vyrovnávací paměti nikdy nebude úplná zpráva.

Upozornění

Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka vysvětluje běžné problémy s PipeReaderem.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Upozornění

Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.

Nereagující aplikace

Bezpodmínečné volání PipeReader.AdvanceTo s buffer.End v examined pozici může vést k tomu, že aplikace přestane reagovat při analýze jedné zprávy. Další volání se PipeReader.AdvanceTo nevrátí, dokud:

  • Do kanálu se zapisuje více dat.
  • A nová data se předtím nezkoumala.

Upozornění

Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka vysvětluje běžné problémy s PipeReaderem.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Upozornění

Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.

Nedostatek paměti (OOM)

Za následujících podmínek se následující kód ukládá do vyrovnávací paměti, dokud nedojde k chybě OutOfMemoryException :

  • Neexistuje žádná maximální velikost zprávy.
  • Data vrácená z nástroje PipeReader nevytvářou úplnou zprávu. Například neudělá úplnou zprávu, protože druhá strana píše velkou zprávu (například zprávu o velikosti 4 GB).

Upozornění

Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka vysvětluje běžné problémy s PipeReaderem.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Upozornění

Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.

Poškození paměti

Při zápisu pomocných rutin, které čtou vyrovnávací paměť, by se všechny vrácené datové části měly před voláním Advancezkopírovat. Následující příklad vrátí paměť, kterou Pipe zahodil, a může ji znovu použít pro další operaci (čtení/zápis).

Upozornění

NEPOUŽÍVEJTE následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka vysvětluje běžné problémy PipeReader.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Upozornění

NEPOUŽÍVEJTE předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení PipeReader Běžné problémy.

PipeWriter

Spravuje PipeWriter vyrovnávací paměti pro zápis jménem volajícího. PipeWriter implementuje IBufferWriter<byte>. IBufferWriter<byte> umožňuje získat přístup k vyrovnávacím pamětím, aby bylo možné provádět zápisy bez dalších kopií vyrovnávací paměti.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

Předchozí kód:

  • Vyžádá si vyrovnávací paměť nejméně 5 bajtů z PipeWriter pomocí GetMemory.
  • Zapíše bajty pro řetězec "Hello" ASCII do vrácené Memory<byte>.
  • Volání Advance k určení, kolik bajtů bylo zapsáno do vyrovnávací paměti.
  • Vyprázdní PipeWriterobjekt , který odešle bajty do základního zařízení.

Předchozí metoda zápisu používá vyrovnávací paměti poskytované objektem PipeWriter. Mohl také použít PipeWriter.WriteAsync, který:

  • Zkopíruje existující vyrovnávací paměť do .PipeWriter
  • Advance Podle potřeby zavolá GetSpana zavolá .FlushAsync
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

Zrušení

FlushAsyncpodporuje předání .CancellationToken Předáním výsledku CancellationToken se zobrazí , OperationCanceledException pokud je token zrušený, zatímco čeká na vyprázdnění. PipeWriter.FlushAsync podporuje způsob, jak zrušit aktuální operaci vyprázdnění prostřednictvím bez PipeWriter.CancelPendingFlush vyvolání výjimky. Volání PipeWriter.CancelPendingFlush způsobí, že aktuální nebo další volání PipeWriter.FlushAsync nebo PipeWriter.WriteAsync vrátí hodnotu s nastaveným FlushResultIsCanceled na true. To může být užitečné pro zastavení nedestruktivního a nevyjímačného způsobu.

PipeWriter – běžné problémy

  • GetSpan a GetMemory vrátit vyrovnávací paměť s alespoň požadovanou velikostí paměti. Nepředpokládejte přesné velikosti vyrovnávací paměti.
  • Není zaručeno, že po sobě jdoucí volání vrátí stejnou vyrovnávací paměť nebo vyrovnávací paměť stejné velikosti.
  • Po volání Advance se musí vyžádat nová vyrovnávací paměť, aby bylo možné pokračovat v zápisu dalších dat. Dříve získanou vyrovnávací paměť nelze zapisovat do.
  • Volání GetMemory nebo GetSpan neúplné volání není FlushAsync bezpečné.
  • Volání Complete nebo CompleteAsync v době, kdy jsou k dispozici nevytřebná data, může vést k poškození paměti.

Tipy pro používání PipeReader a PipeWriter

Následující tipy vám pomůžou úspěšně používat System.IO.Pipelines třídy:

  • Vždy vyplňte PipeReader a PipeWriter, včetně výjimky tam, kde je to možné.
  • Vždy volejte PipeReader.AdvanceTo po volání PipeReader.ReadAsync.
  • Během psaní pravidelně awaitPipeWriter.FlushAsync a vždy kontrolujte FlushResult.IsCompleted. Přerušit psaní, pokud IsCompleted je true, protože to znamená, že čtenář je dokončený a už se nestará o to, co je napsané.
  • Volejte PipeWriter.FlushAsync poté, co napíšete něco, ke kterému chcete PipeReader mít přístup.
  • Nevolejte FlushAsync , pokud čtečka nemůže začít, dokud FlushAsync se nedokončí, protože to může způsobit vzájemné zablokování.
  • Ujistěte se, že pouze jeden kontext "vlastní" PipeReader nebo PipeWriter k nim přistupuje. Tyto typy nejsou bezpečné pro přístup z více vláken.
  • Po volání AdvanceTo nebo dokončení PipeReadernikdy nepřistupujte k ReadResult.Buffer .

IDuplexPipe

Je IDuplexPipe kontrakt pro typy, které podporují čtení i zápis. Například síťové připojení by bylo reprezentováno objektem IDuplexPipe.

Na rozdíl od Pipe, který obsahuje PipeReader a PipeWriter, IDuplexPipe představuje jednu stranu plně duplexního připojení. To znamená, že to, co je zapsáno do PipeWriter nebude přečteno z PipeReader.

Streamy

Při čtení nebo zápisu dat datového proudu obvykle číst data pomocí de-serializátoru a zapisovat data pomocí serializátoru. Většina těchto rozhraní API streamu čtení a zápisu Stream má parametr. Aby se usnadnila integrace s těmito stávajícími rozhraními PipeReader API a PipeWriter zveřejnění AsStream metody. AsStream vrátí implementaci StreamPipeReader kolem nebo PipeWriter.

Příklad streamu

PipeReader instance a PipeWriter lze vytvořit pomocí statických Create metod zadaných objektu Stream a volitelných odpovídajících možností vytvoření.

Umožňuje StreamPipeReaderOptions kontrolu nad vytvořením PipeReader instance s následujícími parametry:

Umožňuje StreamPipeWriterOptions kontrolu nad vytvořením PipeWriter instance s následujícími parametry:

Důležité

Při vytváření PipeReader instancí a PipeWriter pomocí Create těchto metod je potřeba vzít v úvahu životnost objektu Stream . Pokud potřebujete přístup ke streamu poté, co ho čtenář nebo autor dokončí, budete muset u možností vytvoření nastavit LeaveOpen příznak na true . Jinak se datový proud zavře.

Následující kód ukazuje vytvoření PipeReader instancí a PipeWriter pomocí Create metod z datového proudu.

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

Aplikace použije StreamReader ke čtení souborulorem-ipsum.txt datový proud a musí končit prázdným řádkem. Předá FileStream se objektu PipeReader.Create, který vytvoří instanci objektu PipeReader . Konzolová aplikace pak předá svůj standardní výstupní stream pomocí PipeWriter.Create příkazu Console.OpenStandardOutput(). Příklad podporuje zrušení.