System.IO.Pipelines v .NET
System.IO.Pipelines je knihovna, která je navržená tak, aby usnadnila provádění vysoce výkonných vstupně-výstupních operací v .NET. Jedná se o knihovnu určenou pro .NET Standard, která funguje ve všech implementacích .NET.
Knihovna je k dispozici v balíčku NuGet System.IO.Pipelines .
Jaký problém řeší System.IO.Pipelines?
Aplikace, které parsují streamovaná data, se skládají z často používaného kódu s mnoha specializovanými a neobvyklými toky kódu. Často používaný kód a kód speciálního případu je složitý a obtížně se udržuje.
System.IO.Pipelines
byl navržen tak, aby:
- Mají vysoký výkon při analýze streamovaných dat.
- Snižte složitost kódu.
Následující kód je typický pro server TCP, který od klienta přijímá zprávy oddělené řádky (oddělené znakem '\n'
) :
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Předchozí kód má několik problémů:
- Při jednom volání
ReadAsync
nemusí být přijata celá zpráva (konec řádku) . - Ignoruje výsledek
stream.ReadAsync
.stream.ReadAsync
vrátí, kolik dat bylo přečteno. - Nepracuje s případem, kdy se při jednom
ReadAsync
volání přečte více řádků. - Přiděluje
byte
pole s každým čtením.
Chcete-li vyřešit předchozí problémy, jsou vyžadovány následující změny:
Příchozí data se budou ukládat do vyrovnávací paměti, dokud se nenajde nový řádek.
Parsujte všechny řádky vrácené ve vyrovnávací paměti.
Je možné, že čára je větší než 1 kB (1024 bajtů). Kód musí změnit velikost vstupní vyrovnávací paměti, dokud se nenajde oddělovač, aby se celý řádek vešel dovnitř vyrovnávací paměti.
- Pokud se změní velikost vyrovnávací paměti, vytvoří se více kopií vyrovnávací paměti, jakmile se ve vstupu zobrazí delší čáry.
- Pokud chcete zmenšit plýtvání místem, zkomprimujte vyrovnávací paměť používanou pro čtení řádků.
Zvažte použití fondu vyrovnávacích pamětí, abyste se vyhnuli opakovanému přidělování paměti.
Následující kód řeší některé z těchto problémů:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Předchozí kód je složitý a neřeší všechny zjištěné problémy. Vysoce výkonné sítě obvykle znamenají psaní složitého kódu, který maximalizuje výkon. System.IO.Pipelines
byla navržena tak, aby usnadnila psaní tohoto typu kódu.
Potrubí
Třídu Pipe lze použít k vytvoření páru PipeWriter/PipeReader
. Všechna data zapsaná do jsou PipeWriter
k dispozici v :PipeReader
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
Základní využití potrubí
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Existují dvě smyčky:
FillPipeAsync
čte zSocket
a zapisuje doPipeWriter
.ReadPipeAsync
čte zPipeReader
a parsuje příchozí řádky.
Nejsou přidělovány žádné explicitní vyrovnávací paměti. Veškerá správa vyrovnávací paměti je delegovaná na PipeReader
implementace a PipeWriter
. Delegování správy vyrovnávací paměti usnadňuje využívání kódu soustředit se výhradně na obchodní logiku.
V první smyčce:
- PipeWriter.GetMemory(Int32) je volána k získání paměti ze základního zapisovače.
- PipeWriter.Advance(Int32) Je volána, aby bylo sdělovat
PipeWriter
, kolik dat bylo zapsáno do vyrovnávací paměti. - PipeWriter.FlushAsync je volána, aby byla data k dispozici pro
PipeReader
.
Ve druhé smyčce PipeReader
využívá vyrovnávací paměti zapsané nástrojem PipeWriter
. Vyrovnávací paměti pocházejí ze zásuvky. Volání metody PipeReader.ReadAsync
:
Vrátí hodnotu , ReadResult která obsahuje dvě důležité informace:
- Data, která byla přečtená ve tvaru
ReadOnlySequence<byte>
. - Logická hodnota
IsCompleted
označující, jestli bylo dosaženo konce dat (EOF).
- Data, která byla přečtená ve tvaru
Po vyhledání oddělovače konce řádku (EOL) a analýze čáry:
- Logika zpracovává vyrovnávací paměť tak, aby přeskočí to, co už je zpracované.
PipeReader.AdvanceTo
Je volána, aby bylo sdělovánoPipeReader
, kolik dat se spotřebovalo a prozkoumalo.
Smyčky čtečky a zapisovače končí voláním Complete
. Complete
umožní podkladovému kanálu uvolnit přidělenou paměť.
Zpětný tlak a řízení toku
V ideálním případě by čtení a analýza fungovaly společně:
- Vlákno čtení využívá data ze sítě a vkládá je do vyrovnávacích pamětí.
- Vlákno analýzy je zodpovědné za vytvoření příslušných datových struktur.
Parsování obvykle trvá déle než kopírování bloků dat ze sítě:
- Vlákno čtení se dostane před vlákno analýzy.
- Vlákno čtení musí buď zpomalit, nebo přidělit více paměti k uložení dat pro vlákno analýzy.
Pro zajištění optimálního výkonu existuje rovnováhu mezi častými pauzami a přidělením více paměti.
Pokud chcete vyřešit předchozí problém, má nástroj Pipe
dvě nastavení pro řízení toku dat:
- PauseWriterThreshold: Určuje, kolik dat má být uloženo do vyrovnávací paměti před voláním k FlushAsync pozastavení.
- ResumeWriterThreshold: Určuje, kolik dat musí čtenář sledovat před voláním k
PipeWriter.FlushAsync
obnovení.
- Vrátí neúplnou
ValueTask<FlushResult>
hodnotu při překročeníPauseWriterThreshold
množství dat v objektuPipe
. ValueTask<FlushResult>
Dokončí se, když se změní na nižší hodnotu nežResumeWriterThreshold
.
Dvě hodnoty se používají k prevenci rychlého cyklování, ke kterému může dojít, pokud se použije jedna hodnota.
Příklady
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
PipeScheduler
Při použití async
a await
se asynchronní kód obvykle obnoví na nebo TaskScheduler aktuální SynchronizationContext.
Při provádění vstupně-výstupních operací je důležité mít přesnou kontrolu nad tím, kde se vstupně-výstupní operace provádějí. Tento ovládací prvek umožňuje efektivní využití mezipamětí procesoru. Efektivní ukládání do mezipaměti je důležité pro vysoce výkonné aplikace, jako jsou webové servery. PipeScheduler poskytuje kontrolu nad tím, kde se asynchronní zpětná volání spouští. Ve výchozím nastavení:
- Použije se aktuální.SynchronizationContext
- Pokud neexistuje
SynchronizationContext
, použije fond vláken ke spouštění zpětných volání.
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool je PipeScheduler implementace, která zařadí zpětná volání do fondu vláken. PipeScheduler.ThreadPool
je výchozí a obecně nejlepší volba. PipeScheduler.Inline může způsobit nezamýšlené důsledky, jako je například zablokování.
Resetování potrubí
Opakované použití objektu Pipe
je často efektivní. Pokud chcete kanál resetovat, zavolejte PipeReaderReset po dokončení i PipeReader
PipeWriter
.
Čtečka potrubí
PipeReader spravuje paměť jménem volajícího. Vždy volejte PipeReader.AdvanceTo po volání PipeReader.ReadAsync. To informuje volajícího PipeReader
o dokončení práce s pamětí, aby bylo možné ji sledovat. Hodnota vrácená ReadOnlySequence<byte>
z PipeReader.ReadAsync
je platná pouze do volání PipeReader.AdvanceTo
metody . Jeho použití ReadOnlySequence<byte>
po volání PipeReader.AdvanceTo
je neplatné.
PipeReader.AdvanceTo
používá dva SequencePosition argumenty:
- První argument určuje, kolik paměti se spotřebovalo.
- Druhý argument určuje, jaká část vyrovnávací paměti byla pozorována.
Označení dat jako spotřebovaných znamená, že kanál může vrátit paměť do základního fondu vyrovnávacích pamětí. Označení dat jako pozorovaných řídí, co další volání PipeReader.ReadAsync
provede. Označení všeho jako pozorovaného znamená, že další volání PipeReader.ReadAsync
se nevrátí, dokud se do kanálu nezapíšou další data. Jakákoli jiná hodnota provede další volání, které PipeReader.ReadAsync
okamžitě vrátí pozorovaná a nepozorovaná data, ale ne data, která už byla spotřebována.
Čtení scénářů streamovaných dat
Při pokusu o čtení streamovaných dat se objevuje několik typických vzorů:
- Vzhledem k datovému proudu parsujte jednu zprávu.
- Vzhledem k datovému proudu dat parsujte všechny dostupné zprávy.
Následující příklady používají metodu TryParseLines
pro analýzu zpráv z ReadOnlySequence<byte>
. TryParseLines
Parsuje jednu zprávu a aktualizuje vstupní vyrovnávací paměť tak, aby parsovanou zprávu z vyrovnávací paměti ořízla. TryParseLines
není součástí .NET, je to metoda napsaná uživatelem, která se používá v následujících částech.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Přečtení jedné zprávy
Následující kód přečte jednu zprávu z PipeReader
a vrátí ji volajícímu.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Předchozí kód:
- Parsuje jednu zprávu.
- Aktualizace spotřebované a vyšetřené
SequencePosition
SequencePosition
tak, aby ukazovaly na začátek oříznuté vstupní vyrovnávací paměti.
Oba SequencePosition
argumenty se aktualizují, protože TryParseLines
odeberou analyzovanou zprávu ze vstupní vyrovnávací paměti. Obecně platí, že při analýze jedné zprávy z vyrovnávací paměti by zkoumaná pozice měla být jedna z následujících:
- Konec zprávy.
- Konec přijaté vyrovnávací paměti, pokud nebyla nalezena žádná zpráva.
Případ jedné zprávy má největší potenciál k chybám. Předání nesprávných hodnot ke zkoumání může vést k výjimce kvůli nedostatku paměti nebo nekonečné smyčce. Další informace najdete v části Věnované běžným problémům s PipeReaderem v tomto článku.
Čtení více zpráv
Následující kód čte všechny zprávy z PipeReader
a a na každé z nich volá ProcessMessageAsync
.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Zrušení
PipeReader.ReadAsync
:
- Podporuje předávání .CancellationToken
- Vyvolá chybu, OperationCanceledException
CancellationToken
pokud se zruší, zatímco čeká na čtení. - Podporuje způsob, jak zrušit aktuální operaci čtení přes PipeReader.CancelPendingRead, čímž se zabrání vyvolání výjimky. Volání
PipeReader.CancelPendingRead
způsobí, že aktuální nebo další voláníPipeReader.ReadAsync
vrátí hodnotu s nastaveným ReadResultIsCanceled
natrue
. To může být užitečné pro zastavení stávající smyčky čtení nedestruktivním a nevýjimným způsobem.
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Běžné problémy s PipeReaderem
Předání nesprávných hodnot do nebo
examined
může vést keconsumed
čtení již načtených dat.Předání
buffer.End
v rámci prověření může vést k:- Zablokovaná data
- Pokud se data nespotřebovávají, může dojít k případné výjimce mimo paměť (OOM). Například
PipeReader.AdvanceTo(position, buffer.End)
při zpracování jedné zprávy najednou z vyrovnávací paměti.
Předání nesprávných hodnot do nebo
examined
může vést kconsumed
nekonečné smyčce. Pokud se například nezměnil, způsobí to,PipeReader.AdvanceTo(buffer.Start)
buffer.Start
že se další voláníPipeReader.ReadAsync
vrátí okamžitě před příchodem nových dat.Předání nesprávných hodnot do nebo
examined
může vést kconsumed
nekonečnému ukládání do vyrovnávací paměti (případná OOM).ReadOnlySequence<byte>
Použití následujícího voláníPipeReader.AdvanceTo
může vést k poškození paměti (použití po volném volání).Selhání volání
PipeReader.Complete/CompleteAsync
může způsobit nevracení paměti.Kontrola ReadResult.IsCompleted a ukončení logiky čtení před zpracováním vyrovnávací paměti způsobí ztrátu dat. Podmínka ukončení smyčky by měla být založena na
ReadResult.Buffer.IsEmpty
aReadResult.IsCompleted
. Pokud to uděláte nesprávně, může dojít k nekonečné smyčce.
Problematický kód
❌Ztráta dat
Pokud ReadResult
je nastavená hodnota true
, může vrátit poslední segment datIsCompleted
. Nečte tato data před ukončením smyčky čtení, dojde ke ztrátě dat.
Upozornění
Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka vysvětluje běžné problémy s PipeReaderem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Upozornění
Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.
❌Nekonečná smyčka
Následující logika může vést k nekonečné smyčce, pokud Result.IsCompleted
je true
, ale nikdy není ve vyrovnávací paměti úplná zpráva.
Upozornění
Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka vysvětluje běžné problémy s PipeReaderem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Upozornění
Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.
Tady je další část kódu se stejným problémem. Před kontrolou kontroluje neprázdnou vyrovnávací paměť ReadResult.IsCompleted
. Vzhledem k tomu, že je ve smyčce else if
, bude se opakovat, pokud ve vyrovnávací paměti nikdy nebude úplná zpráva.
Upozornění
Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka vysvětluje běžné problémy s PipeReaderem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Upozornění
Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.
❌Nereagující aplikace
Bezpodmínečné volání PipeReader.AdvanceTo
s buffer.End
v examined
pozici může vést k tomu, že aplikace přestane reagovat při analýze jedné zprávy. Další volání se PipeReader.AdvanceTo
nevrátí, dokud:
- Do kanálu se zapisuje více dat.
- A nová data se předtím nezkoumala.
Upozornění
Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka vysvětluje běžné problémy s PipeReaderem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Upozornění
Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.
❌Nedostatek paměti (OOM)
Za následujících podmínek se následující kód ukládá do vyrovnávací paměti, dokud nedojde k chybě OutOfMemoryException :
- Neexistuje žádná maximální velikost zprávy.
- Data vrácená z nástroje
PipeReader
nevytvářou úplnou zprávu. Například neudělá úplnou zprávu, protože druhá strana píše velkou zprávu (například zprávu o velikosti 4 GB).
Upozornění
Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka vysvětluje běžné problémy s PipeReaderem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Upozornění
Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.
❌Poškození paměti
Při zápisu pomocných rutin, které čtou vyrovnávací paměť, by se všechny vrácené datové části měly před voláním Advance
zkopírovat. Následující příklad vrátí paměť, kterou Pipe
zahodil, a může ji znovu použít pro další operaci (čtení/zápis).
Upozornění
NEPOUŽÍVEJTE následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka vysvětluje běžné problémy PipeReader.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Upozornění
NEPOUŽÍVEJTE předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení PipeReader Běžné problémy.
PipeWriter
Spravuje PipeWriter vyrovnávací paměti pro zápis jménem volajícího. PipeWriter
implementuje IBufferWriter<byte>
. IBufferWriter<byte>
umožňuje získat přístup k vyrovnávacím pamětím, aby bylo možné provádět zápisy bez dalších kopií vyrovnávací paměti.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Předchozí kód:
- Vyžádá si vyrovnávací paměť nejméně 5 bajtů z
PipeWriter
pomocí GetMemory. - Zapíše bajty pro řetězec
"Hello"
ASCII do vrácenéMemory<byte>
. - Volání Advance k určení, kolik bajtů bylo zapsáno do vyrovnávací paměti.
- Vyprázdní
PipeWriter
objekt , který odešle bajty do základního zařízení.
Předchozí metoda zápisu používá vyrovnávací paměti poskytované objektem PipeWriter
. Mohl také použít PipeWriter.WriteAsync, který:
- Zkopíruje existující vyrovnávací paměť do .
PipeWriter
Advance
Podle potřeby zavoláGetSpan
a zavolá .FlushAsync
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
Zrušení
FlushAsyncpodporuje předání .CancellationToken Předáním výsledku CancellationToken
se zobrazí , OperationCanceledException
pokud je token zrušený, zatímco čeká na vyprázdnění. PipeWriter.FlushAsync
podporuje způsob, jak zrušit aktuální operaci vyprázdnění prostřednictvím bez PipeWriter.CancelPendingFlush vyvolání výjimky. Volání PipeWriter.CancelPendingFlush
způsobí, že aktuální nebo další volání PipeWriter.FlushAsync
nebo PipeWriter.WriteAsync
vrátí hodnotu s nastaveným FlushResultIsCanceled
na true
. To může být užitečné pro zastavení nedestruktivního a nevyjímačného způsobu.
PipeWriter – běžné problémy
- GetSpan a GetMemory vrátit vyrovnávací paměť s alespoň požadovanou velikostí paměti. Nepředpokládejte přesné velikosti vyrovnávací paměti.
- Není zaručeno, že po sobě jdoucí volání vrátí stejnou vyrovnávací paměť nebo vyrovnávací paměť stejné velikosti.
- Po volání Advance se musí vyžádat nová vyrovnávací paměť, aby bylo možné pokračovat v zápisu dalších dat. Dříve získanou vyrovnávací paměť nelze zapisovat do.
- Volání
GetMemory
neboGetSpan
neúplné volání neníFlushAsync
bezpečné. - Volání
Complete
neboCompleteAsync
v době, kdy jsou k dispozici nevytřebná data, může vést k poškození paměti.
Tipy pro používání PipeReader a PipeWriter
Následující tipy vám pomůžou úspěšně používat System.IO.Pipelines třídy:
- Vždy vyplňte PipeReader a PipeWriter, včetně výjimky tam, kde je to možné.
- Vždy volejte PipeReader.AdvanceTo po volání PipeReader.ReadAsync.
- Během psaní pravidelně
await
PipeWriter.FlushAsync a vždy kontrolujte FlushResult.IsCompleted. Přerušit psaní, pokudIsCompleted
jetrue
, protože to znamená, že čtenář je dokončený a už se nestará o to, co je napsané. - Volejte PipeWriter.FlushAsync poté, co napíšete něco, ke kterému chcete
PipeReader
mít přístup. - Nevolejte
FlushAsync
, pokud čtečka nemůže začít, dokudFlushAsync
se nedokončí, protože to může způsobit vzájemné zablokování. - Ujistěte se, že pouze jeden kontext "vlastní"
PipeReader
neboPipeWriter
k nim přistupuje. Tyto typy nejsou bezpečné pro přístup z více vláken. - Po volání
AdvanceTo
nebo dokončeníPipeReader
nikdy nepřistupujte k ReadResult.Buffer .
IDuplexPipe
Je IDuplexPipe kontrakt pro typy, které podporují čtení i zápis. Například síťové připojení by bylo reprezentováno objektem IDuplexPipe
.
Na rozdíl od Pipe
, který obsahuje PipeReader
a PipeWriter
, IDuplexPipe
představuje jednu stranu plně duplexního připojení. To znamená, že to, co je zapsáno do PipeWriter
nebude přečteno z PipeReader
.
Streamy
Při čtení nebo zápisu dat datového proudu obvykle číst data pomocí de-serializátoru a zapisovat data pomocí serializátoru. Většina těchto rozhraní API streamu čtení a zápisu Stream
má parametr. Aby se usnadnila integrace s těmito stávajícími rozhraními PipeReader
API a PipeWriter
zveřejnění AsStream metody. AsStream vrátí implementaci Stream
PipeReader
kolem nebo PipeWriter
.
Příklad streamu
PipeReader
instance a PipeWriter
lze vytvořit pomocí statických Create
metod zadaných objektu Stream a volitelných odpovídajících možností vytvoření.
Umožňuje StreamPipeReaderOptions kontrolu nad vytvořením PipeReader
instance s následujícími parametry:
- StreamPipeReaderOptions.BufferSize je minimální velikost vyrovnávací paměti v bajtech používaná při pronajímání paměti z fondu a výchozí hodnota
4096
je . - StreamPipeReaderOptions.LeaveOpen flag určuje, jestli je podkladový datový proud po
PipeReader
dokončení otevřený, a výchozí hodnotafalse
je . - StreamPipeReaderOptions.MinimumReadSize představuje prahovou hodnotu zbývajících bajtů ve vyrovnávací paměti před přidělením nové vyrovnávací paměti a výchozí
1024
hodnota je . - StreamPipeReaderOptions.Pool
MemoryPool<byte>
se používá při přidělování paměti a výchozí hodnotanull
je .
Umožňuje StreamPipeWriterOptions kontrolu nad vytvořením PipeWriter
instance s následujícími parametry:
- StreamPipeWriterOptions.LeaveOpen flag určuje, jestli je podkladový datový proud po
PipeWriter
dokončení otevřený, a výchozí hodnotafalse
je . - StreamPipeWriterOptions.MinimumBufferSize představuje minimální velikost vyrovnávací paměti, která se má použít při pronajímání paměti z objektu Pool, a výchozí hodnota
4096
je . - StreamPipeWriterOptions.Pool
MemoryPool<byte>
se používá při přidělování paměti a výchozí hodnotanull
je .
Důležité
Při vytváření PipeReader
instancí a PipeWriter
pomocí Create
těchto metod je potřeba vzít v úvahu životnost objektu Stream
. Pokud potřebujete přístup ke streamu poté, co ho čtenář nebo autor dokončí, budete muset u možností vytvoření nastavit LeaveOpen
příznak na true
. Jinak se datový proud zavře.
Následující kód ukazuje vytvoření PipeReader
instancí a PipeWriter
pomocí Create
metod z datového proudu.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
Aplikace použije StreamReader ke čtení souborulorem-ipsum.txt datový proud a musí končit prázdným řádkem. Předá FileStream se objektu PipeReader.Create, který vytvoří instanci objektu PipeReader
. Konzolová aplikace pak předá svůj standardní výstupní stream pomocí PipeWriter.Create příkazu Console.OpenStandardOutput(). Příklad podporuje zrušení.
Váš názor
Odeslat a zobrazit názory pro