Školení
Modul
Připojení příkazů ke kanálu - Training
V tomto modulu se dozvíte, jak připojit příkazy k kanálu.
Tento prohlížeč se už nepodporuje.
Upgradujte na Microsoft Edge, abyste mohli využívat nejnovější funkce, aktualizace zabezpečení a technickou podporu.
System.IO.Pipelines je knihovna, která je navržená tak, aby usnadnila provádění vysoce výkonných vstupně-výstupních operací v .NET. Jedná se o knihovnu, která cílí na .NET Standard, která funguje na všech implementacích .NET.
Knihovna je k dispozici v balíčku NuGet System.IO.Pipelines .
Aplikace, které analyzují streamovaná data, se skládají z často používaného kódu, který má mnoho specializovaných a neobvyklých toků kódu. Často používaný a speciální kód případu je složitý a obtížně udržovatelná.
System.IO.Pipelines
byl navržen tak, aby:
Následující kód je typický pro server TCP, který přijímá zprávy oddělené řádky (oddělené ) '\n'
od klienta:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Předchozí kód má několik problémů:
ReadAsync
.stream.ReadAsync
. stream.ReadAsync
vrátí, kolik dat bylo přečteno.ReadAsync
volání čte více řádků.byte
pole s každým čtením.Chcete-li vyřešit předchozí problémy, jsou vyžadovány následující změny:
Uložení příchozích dat do vyrovnávací paměti, dokud se nenajde nový řádek.
Parsujte všechny řádky vrácené v vyrovnávací paměti.
Je možné, že čára je větší než 1 kB (1024 bajtů). Kód musí změnit velikost vstupní vyrovnávací paměti, dokud se nenajde oddělovač, aby se do vyrovnávací paměti nevešel celý řádek.
Zvažte použití sdružování vyrovnávacích pamětí, abyste se vyhnuli opakovanému přidělování paměti.
Následující kód řeší některé z těchto problémů:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Předchozí kód je složitý a neřeší všechny zjištěné problémy. Vysoce výkonné sítě obvykle znamenají psaní složitého kódu pro maximalizaci výkonu. System.IO.Pipelines
byl navržen tak, aby usnadnil psaní tohoto typu kódu.
Třídu Pipe lze použít k vytvoření páru PipeWriter/PipeReader
. Všechna data zapsaná do služby PipeWriter
jsou k dispozici v :PipeReader
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Existují dvě smyčky:
FillPipeAsync
čte z Socket
a zápisů do PipeWriter
.ReadPipeAsync
čte z příchozích PipeReader
řádků a parsuje je.Nejsou přiděleny žádné explicitní vyrovnávací paměti. Veškerá správa vyrovnávací paměti se deleguje na PipeReader
implementace a PipeWriter
implementace. Delegování správy vyrovnávacích pamětí usnadňuje využívání kódu výhradně na obchodní logiku.
V první smyčce:
PipeWriter
kolik dat bylo zapsáno do vyrovnávací paměti.PipeReader
.Ve druhé smyčce PipeReader
spotřebovává vyrovnávací paměti zapsané PipeWriter
pomocí . Vyrovnávací paměti pocházejí ze zásuvky. Volání:PipeReader.ReadAsync
ReadResult Vrátí hodnotu, která obsahuje dva důležité informace:
ReadOnlySequence<byte>
.IsCompleted
, která označuje, jestli bylo dosaženo konce dat (EOF).Po nalezení oddělovače konce řádku (EOL) a parsování řádku:
PipeReader.AdvanceTo
je volána, aby bylo zjistit PipeReader
, kolik dat bylo spotřebováno a zkoumáno.Čtečka a zapisovač smyčky končí voláním Complete
. Complete
umožňuje podkladovému kanálu uvolnit přidělenou paměť.
V ideálním případě spolupracují čtení a analýza:
Analýza obvykle trvá déle než pouhé kopírování bloků dat ze sítě:
Pro zajištění optimálního výkonu existuje rovnováhu mezi častými pozastaveními a přidělením více paměti.
Pokud chcete vyřešit předchozí problém, Pipe
má dvě nastavení pro řízení toku dat:
PipeWriter.FlushAsync
obnovení.ValueTask<FlushResult>
hodnotu, když množství dat v Pipe
křížích PauseWriterThreshold
.ValueTask<FlushResult>
bude nižší než ResumeWriterThreshold
.Dvě hodnoty se používají k prevenci rychlého cyklistiky, ke kterému může dojít, pokud se použije jedna hodnota.
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
Obvykle při použití async
a await
, asynchronní kód pokračuje na nebo TaskScheduler aktuální SynchronizationContext.
Při provádění vstupně-výstupních operací je důležité mít jemně odstupňovanou kontrolu nad tím, kde se vstupně-výstupní operace provádí. Tento ovládací prvek umožňuje efektivně využívat mezipaměti procesoru. Efektivní ukládání do mezipaměti je důležité pro vysoce výkonné aplikace, jako jsou webové servery. PipeScheduler poskytuje kontrolu nad tím, kde se spouští asynchronní zpětná volání. Standardně:
SynchronizationContext
používá fond vláken ke spouštění zpětných volání.public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool je PipeScheduler implementace, která zařadí zpětná volání do fondu vláken. PipeScheduler.ThreadPool
je výchozí a obecně nejlepší volbou. PipeScheduler.Inline může způsobit nezamýšlené důsledky, jako jsou zablokování.
Často je efektivní objekt znovu použít Pipe
. Pokud chcete kanál resetovat, zavolejte PipeReader Reset po PipeReader
dokončení i PipeWriter
po dokončení kanálu.
PipeReader spravuje paměť jménem volajícího. Vždy zavolat PipeReader.AdvanceTo po volání PipeReader.ReadAsync. To informuje PipeReader
o tom, kdy je volající hotový s pamětí, aby se mohl sledovat. Vrácená ReadOnlySequence<byte>
hodnota PipeReader.ReadAsync
je platná pouze do volání PipeReader.AdvanceTo
. Je nezákonné používat ReadOnlySequence<byte>
po volání PipeReader.AdvanceTo
.
PipeReader.AdvanceTo
přebírá dva SequencePosition argumenty:
Označení dat jako spotřebovaných znamená, že kanál může vrátit paměť do základního fondu vyrovnávací paměti. Označení dat jako pozorovaných řídí, co má další volání PipeReader.ReadAsync
dělat. Označení všeho, co bylo zjištěno, znamená, že další volání PipeReader.ReadAsync
se nevrátí, dokud se do kanálu nezapíšou další data. Jakákoli jiná hodnota provede další volání, které se PipeReader.ReadAsync
okamžitě vrátí s pozorovanými a neobsazenými daty, ale ne s daty, která už byla spotřebována.
Při pokusu o čtení streamovaných dat se objeví několik typických vzorů:
Následující příklady používají metodu TryParseLines
pro analýzu zpráv z objektu .ReadOnlySequence<byte>
TryParseLines
parsuje jednu zprávu a aktualizuje vstupní vyrovnávací paměť, aby ořízla analyzovanou zprávu z vyrovnávací paměti. TryParseLines
není součástí .NET, jedná se o metodu napsanou uživatelem používanou v následujících částech.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Následující kód přečte jednu zprávu z volajícího PipeReader
a vrátí ji volajícímu.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Předchozí kód:
SequencePosition
a prověřovanou SequencePosition
, aby ukazovala na začátek oříznuté vstupní vyrovnávací paměti.Tyto dva SequencePosition
argumenty jsou aktualizovány, protože TryParseLines
odebere analyzovanou zprávu ze vstupní vyrovnávací paměti. Obecně platí, že při analýze jedné zprávy z vyrovnávací paměti by hodnocená pozice měla být jedna z těchto věcí:
Jeden případ zprávy má největší potenciál pro chyby. Předání nesprávných hodnot ke zkoumání může vést k výjimce nedostatku paměti nebo nekonečné smyčce. Další informace naleznete v části Běžné problémy PipeReader v tomto článku.
Následující kód načte všechny zprávy z PipeReader
každého z nich a volá ProcessMessageAsync
je.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader.ReadAsync
:
CancellationToken
je zrušena, zatímco čeká na čtení.PipeReader.CancelPendingRead
způsobí, že aktuální nebo další volání PipeReader.ReadAsync
vrátí hodnotu s IsCanceled
nastavenou ReadResult true
hodnotou . To může být užitečné pro zastavení stávající smyčky čtení nedestruktivním a nevýkonným způsobem.private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Předání nesprávných hodnot nebo examined
může vést ke consumed
čtení dat, která jsou už přečtená.
Předání buffer.End
, které bylo zkoumáno, může mít za následek:
PipeReader.AdvanceTo(position, buffer.End)
při zpracování jedné zprávy najednou z vyrovnávací paměti.Předání nesprávných hodnot nebo examined
může vést k consumed
nekonečné smyčce. Pokud buffer.Start
se například nezměnila, PipeReader.AdvanceTo(buffer.Start)
další volání PipeReader.ReadAsync
se vrátí okamžitě před příchodem nových dat.
Předání nesprávných hodnot nebo examined
může vést k consumed
nekonečnému ukládání do vyrovnávací paměti (případný OOM).
ReadOnlySequence<byte>
Použití volání po volání PipeReader.AdvanceTo
může vést k poškození paměti (použití po uvolnění).
Selhání volání PipeReader.Complete/CompleteAsync
může způsobit nevrácení paměti.
Kontrola ReadResult.IsCompleted a ukončení logiky čtení před zpracováním vyrovnávací paměti způsobí ztrátu dat. Podmínka ukončení smyčky by měla být založena na ReadResult.Buffer.IsEmpty
a ReadResult.IsCompleted
. Když to uděláte nesprávně, může to vést k nekonečné smyčce.
❌Ztráta dat
Může ReadResult
vrátit poslední segment dat, pokud IsCompleted
je nastavena na true
. Nepřečtení dat před ukončením smyčky čtení způsobí ztrátu dat.
Upozornění
Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka je k dispozici pro vysvětlení běžných problémů PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Upozornění
Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.
❌Nekonečná smyčka
Následující logika může vést k nekonečné smyčce, pokud je, Result.IsCompleted
true
ale v vyrovnávací paměti není nikdy úplná zpráva.
Upozornění
Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka je k dispozici pro vysvětlení běžných problémů PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Upozornění
Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.
Tady je další část kódu se stejným problémem. Před kontrolou ReadResult.IsCompleted
kontroluje neprázdnou vyrovnávací paměť . Protože je ve smyčce else if
, bude smyčka navždy, pokud v vyrovnávací paměti nikdy není úplná zpráva.
Upozornění
Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka je k dispozici pro vysvětlení běžných problémů PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Upozornění
Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.
❌Nereagující aplikace
Nepodmíněné volání PipeReader.AdvanceTo
na examined
buffer.End
pozici může způsobit, že aplikace přestane reagovat při analýze jedné zprávy. Další volání, které PipeReader.AdvanceTo
se nevrátí, dokud:
Upozornění
Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka je k dispozici pro vysvětlení běžných problémů PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Upozornění
Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.
❌Nedostatek paměti (OOM)
S následujícími podmínkami se následující kód ukládá do vyrovnávací paměti, dokud OutOfMemoryException nedojde k:
PipeReader
zprávy neprovádí úplnou zprávu. Například se nevytvářá úplná zpráva, protože druhá strana píše velkou zprávu (například zpráva o velikosti 4 GB).Upozornění
Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka je k dispozici pro vysvětlení běžných problémů PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Upozornění
Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.
❌Poškození paměti
Při zápisu pomocných rutin, které čtou vyrovnávací paměť, by se všechny vrácené datové části měly před voláním Advance
zkopírovat . Následující příklad vrátí paměť, která Pipe
byla zahozena a může ji znovu použít pro další operaci (čtení/zápis).
Upozornění
Nepoužívejte následující kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Následující ukázka je k dispozici pro vysvětlení běžných problémů PipeReader.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Upozornění
Nepoužívejte předchozí kód. Použití této ukázky způsobí ztrátu dat, zablokování, problémy se zabezpečením a nemělo by se kopírovat. Předchozí ukázka je k dispozici k vysvětlení běžných problémů PipeReader.
Spravuje PipeWriter vyrovnávací paměti pro zápis jménem volajícího. PipeWriter
implementuje IBufferWriter<byte>
. IBufferWriter<byte>
umožňuje získat přístup k vyrovnávacím pamětím pro provádění zápisů bez dalších kopií vyrovnávací paměti.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Předchozí kód:
PipeWriter
použití GetMemory."Hello"
ASCII do vrácené Memory<byte>
.PipeWriter
, který odešle bajty do podkladového zařízení.Předchozí metoda zápisu používá vyrovnávací paměti poskytované PipeWriter
. Mohl by také použít PipeWriter.WriteAsync, což:
PipeWriter
souboru .GetSpan
, Advance
podle potřeby a volání FlushAsync.async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
FlushAsyncpodporuje předávání .CancellationToken CancellationToken
Předání výsledku OperationCanceledException
v případě zrušení tokenu během čekání na vyprázdnění. PipeWriter.FlushAsync
podporuje způsob, jak zrušit aktuální operaci vyprázdnění prostřednictvím PipeWriter.CancelPendingFlush bez vyvolání výjimky. Volání PipeWriter.CancelPendingFlush
způsobí, že aktuální nebo další volání PipeWriter.FlushAsync
nebo PipeWriter.WriteAsync
vrátí s IsCanceled
nastavenou FlushResult hodnotou true
. To může být užitečné pro zastavení vyprázdnění nedestruktivním a nevýkonným způsobem.
GetMemory
nebo GetSpan
nekompletní hovor FlushAsync
není bezpečný.Complete
nebo CompleteAsync
v době, kdy dojde k nechyceným datům, může dojít k poškození paměti.Následující tipy vám pomůžou úspěšně používat System.IO.Pipelines třídy:
await
PipeWriter.FlushAsync Pravidelně při psaní a vždy kontrolovat FlushResult.IsCompleted. Přerušte psaní, pokud IsCompleted
je true
, protože to znamená, že čtenář je dokončen a už se nezajímá o to, co je napsané.PipeReader
mít přístup, zavolejtePipeWriter.FlushAsync.FlushAsync
, pokud čtečka nemůže začít do FlushAsync
dokončení, protože to může způsobit zablokování.PipeReader
nebo PipeWriter
k nim přistupuje. Tyto typy nejsou bezpečné pro přístup z více vláken.AdvanceTo
nebo dokončení .PipeReader
Jedná se IDuplexPipe o kontrakt pro typy, které podporují čtení i psaní. Například síťové připojení by reprezentovalo .IDuplexPipe
Na rozdíl od Pipe
, který obsahuje PipeReader
a a PipeWriter
, IDuplexPipe
představuje jednu stranu úplného duplexního připojení. To znamená, co je zapsáno PipeWriter
do nebude čteno z PipeReader
.
Při čtení nebo zápisu dat datového proudu obvykle čtete data pomocí de-serializátoru a zapisujete data pomocí serializátoru. Většina těchto rozhraní API pro čtení a zápis streamu má Stream
parametr. Pro usnadnění integrace s těmito stávajícími rozhraními PipeReader
API a PipeWriter
zveřejnění AsStream metody. AsStreamvrátí implementaci, která Stream
se PipeWriter
PipeReader
bude pohybovat v
PipeReader
a PipeWriter
instance lze vytvořit pomocí statických Create
metod zadaných Stream objektu a volitelných odpovídajících možností vytvoření.
Povolení StreamPipeReaderOptions kontroly nad vytvořením PipeReader
instance s následujícími parametry:
4096
je .PipeReader
nebo ne, a výchozí hodnota false
je .1024
je .MemoryPool<byte>
je použit při přidělování paměti a výchozí hodnota null
.Povolení StreamPipeWriterOptions kontroly nad vytvořením PipeWriter
instance s následujícími parametry:
PipeWriter
nebo ne, a výchozí hodnota false
je .4096
.MemoryPool<byte>
je použit při přidělování paměti a výchozí hodnota null
.Důležité
Při vytváření PipeReader
a PipeWriter
instancí pomocí Create
metod je potřeba vzít v úvahu životnost objektu Stream
. Pokud potřebujete přístup ke streamu po dokončení čtení nebo zápisu, budete muset nastavit LeaveOpen
příznak na true
možnosti vytváření. V opačném případě se datový proud zavře.
Následující kód ukazuje vytvoření PipeReader
a PipeWriter
instance pomocí Create
metod ze streamu.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
Aplikace používá StreamReader ke čtení lorem-ipsum.txt souboru jako streamu a musí končit prázdným řádkem. Objekt FileStream se předá PipeReader.Create, který vytvoří instanci objektu PipeReader
. Konzolová aplikace pak předá standardní výstupní datový proud k PipeWriter.Create použití Console.OpenStandardOutput(). Příklad podporuje zrušení.
Zpětná vazba k produktu .NET
.NET je open source projekt. Vyberte odkaz pro poskytnutí zpětné vazby:
Školení
Modul
Připojení příkazů ke kanálu - Training
V tomto modulu se dozvíte, jak připojit příkazy k kanálu.