System.IO.Pipelines a .NET-ben
System.IO.Pipelines egy olyan kódtár, amelynek célja, hogy megkönnyítse a nagy teljesítményű I/O használatát a .NET-ben. Ez egy .NET Standardot célzó kódtár, amely minden .NET-implementáción működik.
A kódtár a System.IO.Pipelines Nuget csomagban érhető el.
Milyen problémát old meg a System.IO.Pipelines?
A streamelési adatokat elemző alkalmazások olyan sablonkódból állnak, amely számos speciális és szokatlan kódfolyamattal rendelkezik. A kazánlemez és a speciális esetkód összetett és nehezen karbantartható.
System.IO.Pipelines
a következőre lett kitalálták:
- Nagy teljesítményű streamelési adatok elemzése.
- A kód összetettségének csökkentése.
Az alábbi kód jellemzően egy olyan TCP-kiszolgálóra jellemző, amely sorhatárolt üzeneteket fogad egy ügyféltől (tagolt) '\n'
az ügyféltől:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Az előző kódnak több problémája is van:
- Előfordulhat, hogy a teljes üzenet (a sor vége) nem érkezik meg egyetlen hívásban.
ReadAsync
- Figyelmen kívül hagyja a .
stream.ReadAsync
stream.ReadAsync
azt adja vissza, hogy mennyi adat lett beolvasva. - Nem kezeli azt az esetet, amikor egyetlen
ReadAsync
hívásban több sor van beolvasva. - Egy tömböt
byte
foglal le minden egyes olvasással.
Az előző problémák megoldásához a következő módosítások szükségesek:
Pufferelje a bejövő adatokat, amíg új sort nem talál.
Elemezni kell a pufferben visszaadott összes sort.
Lehetséges, hogy a sor nagyobb, mint 1 KB (1024 bájt). A kódnak át kell méreteznie a bemeneti puffert, amíg meg nem találja a határolót, hogy elférjen a teljes sor a pufferben.
- Ha a puffer átméretezve van, a rendszer további puffermásolatokat készít, amint hosszabb sorok jelennek meg a bemenetben.
- Az elpazarolt hely csökkentése érdekében tömörítse az olvasóvonalakhoz használt puffert.
Fontolja meg a pufferkészletezés használatát a memória ismételt kiosztásának elkerülése érdekében.
Az alábbi kód az alábbi problémák némelyikét oldja meg:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Az előző kód összetett, és nem kezeli az összes azonosított problémát. A nagy teljesítményű hálózatkezelés általában összetett kód írását jelenti a teljesítmény maximalizálása érdekében. System.IO.Pipelines
úgy lett kialakítva, hogy megkönnyítse az ilyen típusú kód írását.
Cső
Az Pipe osztály használható párok PipeWriter/PipeReader
létrehozására. A következő helyen található minden, a következőbe PipeWriter
PipeReader
írt adat:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
Cső alapszintű használata
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Két hurok van:
FillPipeAsync
olvasása ésSocket
írása aPipeWriter
.ReadPipeAsync
beolvassa aPipeReader
bejövő sorokat, és elemzi azokat.
Nincsenek explicit pufferek lefoglalva. Minden pufferkezelés delegálva van az és PipeWriter
az PipeReader
implementációk számára. A pufferkezelés delegálása megkönnyíti, hogy a kód felhasználása kizárólag az üzleti logikára összpontosítson.
Az első ciklusban:
- PipeWriter.GetMemory(Int32) a rendszer a mögöttes író memóriájának lekérésére hívja meg.
- PipeWriter.Advance(Int32) a rendszer meghívja, hogy elmondja,
PipeWriter
mennyi adatot írtak a pufferbe. - PipeWriter.FlushAsync az adatok elérhetővé tétele a
PipeReader
.
A második ciklusban a rendszer az PipeReader
általa PipeWriter
írt puffereket használja fel. A pufferek a foglalatból származnak. A következő hívás:PipeReader.ReadAsync
Olyan értéket ReadResult ad vissza, amely két fontos információt tartalmaz:
- A beolvasott adatok a következő formában
ReadOnlySequence<byte>
: . - Logikai érték
IsCompleted
, amely azt jelzi, hogy elérte-e az adatok végét (EOF).
- A beolvasott adatok a következő formában
A sor végének (EOL) elválasztójának megkeresése és a vonal elemzése után:
- A logika feldolgozza a puffert, hogy kihagyja a már feldolgozott elemeket.
PipeReader.AdvanceTo
a meghívásával megállapíthatja,PipeReader
hogy mennyi adatot felhasználtak és vizsgáltak meg.
Az olvasó és az író ciklusai a hívással Complete
végződnek. Complete
lehetővé teszi, hogy a mögöttes cső felszabadítsa a lefoglalt memóriát.
Backpressure és flow control
Ideális esetben az olvasás és az elemzés együtt működik:
- Az olvasószál a hálózatból származó adatokat használja fel, és pufferekbe helyezi.
- Az elemzési szál feladata a megfelelő adatstruktúrák létrehozása.
Az elemzés általában több időt vesz igénybe, mint az adatblokkok másolása a hálózatról:
- Az olvasószál megelőzi az elemzési szálat.
- Az olvasószálnak lassítania kell vagy több memóriát kell lefoglalnia az elemzési szál adatainak tárolásához.
Az optimális teljesítmény érdekében egyensúly van a gyakori szünetek és a több memória kiosztása között.
Az előző probléma megoldásához két Pipe
beállítással szabályozhatja az adatáramlást:
- PauseWriterThreshold: Meghatározza, hogy mennyi adatot kell pufferelni a szüneteltetni kívánt FlushAsync hívások előtt.
- ResumeWriterThreshold: Meghatározza, hogy mennyi adatot kell megfigyelnie az olvasónak a folytatáshoz szükséges
PipeWriter.FlushAsync
hívások előtt.
- Hiányos
ValueTask<FlushResult>
értéket ad vissza, ha a keresztekbenPauseWriterThreshold
lévőPipe
adatok mennyisége . - Akkor fejeződik
ValueTask<FlushResult>
be, ha kisebb lesz, mintResumeWriterThreshold
.
A gyors kerékpározás megelőzésére két érték szolgál, amelyek egy érték használata esetén fordulhatnak elő.
Példák
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
PipeScheduler
Az aszinkron kód használata és await
használata async
általában egy vagy az aktuális SynchronizationContextrendszeren TaskScheduler folytatódik.
Az I/O végrehajtásakor fontos, hogy részletesen szabályozva legyen az I/O végrehajtásának helye. Ez a vezérlő lehetővé teszi a cpu-gyorsítótárak hatékony kihasználását. A hatékony gyorsítótárazás kritikus fontosságú a nagy teljesítményű alkalmazások, például a webkiszolgálók esetében. PipeScheduler szabályozza, hogy hol futnak az aszinkron visszahívások. Alapértelmezés szerint:
- A rendszer az aktuálisat SynchronizationContext használja.
- Ha nincs
SynchronizationContext
, a visszahívások futtatásához a szálkészletet használja.
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
A PipeScheduler.ThreadPool az a PipeScheduler implementáció, amely várólistára állítja a szálkészlet visszahívásait. PipeScheduler.ThreadPool
az alapértelmezett és általában a legjobb választás. A PipeScheduler.Inline nem szándékos következményeket, például holtpontokat okozhat.
Cső alaphelyzetbe állítása
Gyakran hatékony az objektum újrafelhasználása Pipe
. A cső alaphelyzetbe állításához hívja meg PipeReaderReset , ha mind a PipeReader
PipeWriter
kettő befejeződött.
PipeReader
PipeReader a hívó nevében kezeli a memóriát. Hívás után PipeReader.ReadAsyncmindig hívjonPipeReader.AdvanceTo. Ez tudatja a PipeReader
hívóval, hogy mikor fejezték be a memóriát, hogy nyomon lehessen követni. A ReadOnlySequence<byte>
visszaadott érték PipeReader.ReadAsync
csak a hívásig PipeReader.AdvanceTo
érvényes. A hívás PipeReader.AdvanceTo
után nem lehet használniReadOnlySequence<byte>
.
PipeReader.AdvanceTo
két SequencePosition argumentumot vesz fel:
- Az első argumentum határozza meg, hogy mennyi memória volt használatban.
- A második argumentum határozza meg, hogy a puffer mekkora részét figyelték meg.
Az adatok felhasználtként való megjelölése azt jelenti, hogy a cső vissza tudja adni a memóriát a mögöttes pufferkészletnek. Az adatok megfigyeltként való megjelölése szabályozza, hogy a következő hívás mit tesz PipeReader.ReadAsync
. A megfigyelt adatok megjelölése azt jelenti, hogy a következő hívás nem fog visszatérni PipeReader.ReadAsync
, amíg több adat nem íródott a csőbe. Bármely más érték a következő hívást kezdeményezi, hogy azonnal visszatérjen PipeReader.ReadAsync
a megfigyelt és nem figyelt adatokkal, de a már felhasznált adatokkal nem.
Streamelési adatforgatókönyvek olvasása
A streamelési adatok olvasása során néhány tipikus minta jelenik meg:
- Adatstreamek esetén elemezz egyetlen üzenetet.
- Az adatstreamek alapján elemezni kell az összes elérhető üzenetet.
Az alábbi példák a metódust használják egy TryParseLines
üzenet elemzéséhez.ReadOnlySequence<byte>
TryParseLines
Egyetlen üzenetet elemez, és frissíti a bemeneti puffert, hogy levágja az elemzett üzenetet a pufferből. TryParseLines
nem része a .NET-nek, hanem a következő szakaszokban használt, felhasználó által írt metódus.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Egyetlen üzenet olvasása
Az alábbi kód egyetlen üzenetet olvas be egy PipeReader
üzenetből, és visszaadja a hívónak.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
A fenti kód a következőket végzi el:
- Egyetlen üzenet elemzése.
- Frissítések a felhasznált
SequencePosition
és megvizsgáltSequencePosition
adatokat, hogy a levágott bemeneti puffer elejére mutasson.
A két SequencePosition
argumentum azért frissül, mert TryParseLines
eltávolítja az elemzett üzenetet a bemeneti pufferből. Ha egyetlen üzenetet elemez a pufferből, a vizsgált pozíciónak az alábbiak egyikének kell lennie:
- Az üzenet vége.
- Ha nem található üzenet, a fogadott puffer vége.
Az egyetlen üzenetes eset a legtöbb hibalehetőséget rejti magában. A nem megfelelő értékek átadása memóriakivételt vagy végtelen hurkot eredményezhet. További információ: A PipeReader gyakori problémáinak szakasza ebben a cikkben.
Több üzenet olvasása
Az alábbi kód beolvassa az összes üzenetet egy PipeReader
üzenetből, és meghívja ProcessMessageAsync
az egyes üzeneteket.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Érvénytelenítés
PipeReader.ReadAsync
:
- Támogatja a CancellationToken.
- OperationCanceledException Ha az
CancellationToken
olvasás függőben van, a megszakítást jelzi. - Támogatja az aktuális olvasási művelet PipeReader.CancelPendingReadmegszakításának módját, amely elkerüli a kivételt. A hívás
PipeReader.CancelPendingRead
hatására az aktuális vagy a következő hívásPipeReader.ReadAsync
egy beállított értékettrue
ad vissza ReadResultIsCanceled
. Ez hasznos lehet a meglévő olvasási ciklus roncsolásmentes és kivételes módon történő leállításához.
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
A PipeReader gyakori problémái
Ha rossz értékeket ad át a már beolvasott adatoknak
consumed
, vagyexamined
az azt eredményezheti, hogy már beolvassa az adatokat.A vizsgált módon történő
buffer.End
átadás a következőt eredményezheti:- Elakadt adatok
- Előfordulhat, hogy az adatok nem használnak fel végleges memóriakivételt (OOM). Ha például
PipeReader.AdvanceTo(position, buffer.End)
egyetlen üzenetet dolgoz fel egyszerre a pufferből.
Ha rossz értékeket ad át,
consumed
azexamined
végtelen ciklust eredményezhet. Ha például nem változik,PipeReader.AdvanceTo(buffer.Start)
buffer.Start
a következő hívásPipeReader.ReadAsync
azonnal visszatér, mielőtt új adatok érkeznek.Ha rossz értékeket ad át,
consumed
azexamined
végtelen pufferelést (végleges OOM) eredményezhet.ReadOnlySequence<byte>
A hívás utáni használatPipeReader.AdvanceTo
memóriasérülést okozhat (ingyenes használat után).A hívás
PipeReader.Complete/CompleteAsync
sikertelensége memóriavesztést okozhat.Az olvasási logika ellenőrzése ReadResult.IsCompleted és kilépése a puffer feldolgozása előtt adatvesztést eredményez. A hurok kilépési feltételének a következőn
ReadResult.Buffer.IsEmpty
kell alapulnia: ésReadResult.IsCompleted
. A helytelenül végzett művelet végtelen ciklust eredményezhet.
Problémás kód
❌Adatvesztés
Az ReadResult
adat utolsó szegmensét adja vissza, ha IsCompleted
be van állítva true
. Ha nem olvassa be az adatokat az olvasási ciklusból való kilépés előtt, az adatvesztéshez vezet.
Figyelmeztetés
Ne használja a következő kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az alábbi minta a PipeReader gyakori problémáinak magyarázatára szolgál.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Figyelmeztetés
NE használja az előző kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az előző minta a PipeReader gyakori problémáinak magyarázatára szolgál.
❌Végtelen hurok
Az alábbi logika végtelen ciklust eredményezhet, ha az Result.IsCompleted
is, true
de a pufferben soha nem található teljes üzenet.
Figyelmeztetés
Ne használja a következő kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az alábbi minta a PipeReader gyakori problémáinak magyarázatára szolgál.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Figyelmeztetés
NE használja az előző kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az előző minta a PipeReader gyakori problémáinak magyarázatára szolgál.
Íme egy másik kód, ugyanazzal a problémával. Az ellenőrzés ReadResult.IsCompleted
előtt nem üres puffert keres. Mivel egy folyamatban else if
van, a rendszer örökké hurokba kerül, ha soha nem jelenik meg teljes üzenet a pufferben.
Figyelmeztetés
Ne használja a következő kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az alábbi minta a PipeReader gyakori problémáinak magyarázatára szolgál.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Figyelmeztetés
NE használja az előző kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az előző minta a PipeReader gyakori problémáinak magyarázatára szolgál.
❌Nem válaszoló alkalmazás
Ha feltétel nélkül hív meg PipeReader.AdvanceTo
buffer.End
egy adott pozíciót, examined
az alkalmazás nem válaszol egyetlen üzenet elemzésekor. A következő hívás nem fog visszatérni PipeReader.AdvanceTo
, amíg:
- Több adat van a csőbe írva.
- Az új adatokat korábban nem vizsgálták meg.
Figyelmeztetés
Ne használja a következő kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az alábbi minta a PipeReader gyakori problémáinak magyarázatára szolgál.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Figyelmeztetés
NE használja az előző kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az előző minta a PipeReader gyakori problémáinak magyarázatára szolgál.
❌Memóriahiány (OOM)
A következő feltételek mellett a következő kód pufferelése addig tart, amíg a OutOfMemoryException következő nem következik be:
- Nincs maximális üzenetméret.
- A visszaadott
PipeReader
adatok nem teljes üzenetet alkotnak. Például nem készít teljes üzenetet, mert a másik oldal egy nagy üzenetet ír (például egy 4 GB-os üzenetet).
Figyelmeztetés
Ne használja a következő kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az alábbi minta a PipeReader gyakori problémáinak magyarázatára szolgál.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Figyelmeztetés
NE használja az előző kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az előző minta a PipeReader gyakori problémáinak magyarázatára szolgál.
❌Memóriasérülés
A puffert olvasó segítők írásakor a visszaadott hasznos adatokat a hívás Advance
előtt át kell másolni. Az alábbi példa visszaadja az Pipe
elvetett memóriát, és újra felhasználhatja a következő művelethez (olvasási/írási).
Figyelmeztetés
Ne használja a következő kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az alábbi minta a PipeReader gyakori problémáinak magyarázatára szolgál.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Figyelmeztetés
NE használja az előző kódot. A minta használata adatvesztést, lefagyást, biztonsági problémákat eredményez, és NEM másolható. Az előző minta a PipeReader gyakori problémáinak magyarázatára szolgál.
PipeWriter
A PipeWriter pufferek kezelése a hívó nevében történő íráshoz. PipeWriter
implementálja.IBufferWriter<byte>
IBufferWriter<byte>
lehetővé teszi a pufferekhez való hozzáférést az írások további puffermásolatok nélkül történő végrehajtásához.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Az előző kód:
- Legalább 5 bájt puffert kér a
PipeWriter
felhasználótól GetMemory. - Bájtokat ír az ASCII-sztringhez
"Hello"
a visszaadottMemory<byte>
értékre. - Hívások Advance a pufferbe írt bájtok számának jelzésére.
- Kiüríti a
PipeWriter
bájtokat a mögöttes eszközre küldő bájtokat.
Az előző írási módszer a .PipeWriter
A következőt is használhatta PipeWriter.WriteAsyncvolna:
- Másolja a meglévő puffert a
PipeWriter
. - Hívások
GetSpan
,Advance
szükség szerint és hívások FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
Érvénytelenítés
FlushAsync támogatja a CancellationToken. Eredmény OperationCanceledException
átadásaCancellationToken
, ha a jogkivonat megszakad, amíg egy kiürítés függőben van. PipeWriter.FlushAsync
lehetővé teszi az aktuális kiürítési művelet PipeWriter.CancelPendingFlush megszakítását kivétel nélkül. A hívás PipeWriter.CancelPendingFlush
hatására az aktuális vagy a következő hívás PipeWriter.FlushAsync
egy adott értékre IsCanceled
van állítvatrue
.PipeWriter.WriteAsync
FlushResult Ez hasznos lehet a hozam öblítés non-destruktív és nem kivételes módon történő megállításához.
PipeWriter – gyakori problémák
- GetSpan és GetMemory adjon vissza egy puffert legalább a kért memóriamennyiséggel. Ne feltételezze a puffer pontos méretét.
- Nincs garancia arra, hogy az egymást követő hívások ugyanazt a puffert vagy azonos méretű puffert fogják visszaadni.
- A további adatok írásának folytatásához a hívás Advance után új puffert kell kérni. A korábban beszerzett puffer nem írható be.
- A hívás
GetMemory
vagyGetSpan
a nem teljes hívásFlushAsync
nem biztonságos. - A hívás
Complete
vagyCompleteAsync
a nem befolyt adatok memóriasérülést okozhatnak.
Tippek a PipeReader és a PipeWriter használatához
Az alábbi tippek segítenek az System.IO.Pipelines osztályok sikeres használatához:
- Mindig végezze el a PipeReadert és a PipeWritert, adott esetben kivételt is beleértve.
- Hívás után PipeReader.ReadAsyncmindig hívjonPipeReader.AdvanceTo.
await
PipeWriter.FlushAsync Rendszeresen írás közben, és mindig ellenőrizzeFlushResult.IsCompleted. Megszakítja az írást, haIsCompleted
vantrue
, mivel ez azt jelzi, hogy az olvasó befejeződött, és már nem érdekli, hogy mi van megírva.- PipeWriter.FlushAsync Hívás után írjon valamit, amelyhez hozzá szeretne
PipeReader
férni. - Ne hívjon,
FlushAsync
ha az olvasó nem tud elindulni, amígFlushAsync
be nem fejeződik, mert ez holtpontot okozhat. - Győződjön meg arról, hogy csak egy környezet rendelkezik
PipeReader
PipeWriter
vagy fér hozzá hozzájuk. Ezek a típusok nem szálbiztosak. - Soha ne férhessen hozzá a ReadResult.Buffer hívás
AdvanceTo
vagy a befejezés után.PipeReader
IDuplexPipe
Ez IDuplexPipe egy olyan szerződés, amely az olvasást és az írást egyaránt támogatja. Egy hálózati kapcsolatot például egy IDuplexPipe
.
Ellentétben Pipe
a teljes kétoldalas kapcsolat egyetlen oldalával, IDuplexPipe
amely egy PipeReader
és egy PipeWriter
elemet tartalmaz. Ez azt jelenti, hogy amit írnak, az PipeWriter
nem lesz olvasható a PipeReader
.
Adatfolyamok
Streamadatok olvasása vagy írása során általában szerializálóval olvas be adatokat, és szerializálóval adatokat ír. Az olvasási és írási stream API-k többsége rendelkezik paraméterrel Stream
. Hogy könnyebben integrálható legyen ezekkel a meglévő API-kkal, PipeReader
és PipeWriter
közzétehesse a metódust AsStream . AsStreamA egy implementációt Stream
ad vissza a (vagyPipeWriter
)PipeReader
Példa streamelésre
PipeReader
és PipeWriter
a példányok létrehozhatók statikus Create
metódusokkal, adott Stream objektum és opcionálisan megfelelő létrehozási lehetőségek használatával.
A StreamPipeReaderOptions példány létrehozásának engedélyezése a PipeReader
következő paraméterekkel:
- StreamPipeReaderOptions.BufferSize az a minimális pufferméret bájtban, amelyet a készletből való memóriabérléskor használnak, és az alapértelmezett érték a következő:
4096
. - StreamPipeReaderOptions.LeaveOpen jelölő határozza meg, hogy a mögöttes stream nyitva marad-e a
PipeReader
befejezés után, és az alapértelmezett érték a következő leszfalse
. - StreamPipeReaderOptions.MinimumReadSizeA pufferben maradó bájtok küszöbértékét jelöli az új puffer lefoglalása előtt, az alapértelmezett érték pedig a következő.
1024
- StreamPipeReaderOptions.Pool
MemoryPool<byte>
a memória kiosztásakor használatos, alapértelmezett értéke pedig a következő:null
.
A StreamPipeWriterOptions példány létrehozásának engedélyezése a PipeWriter
következő paraméterekkel:
- StreamPipeWriterOptions.LeaveOpen jelölő határozza meg, hogy a mögöttes stream nyitva marad-e a
PipeWriter
befejezés után, és az alapértelmezett érték a következő leszfalse
. - StreamPipeWriterOptions.MinimumBufferSize A minimális pufferméretet jelöli, amelyet a memória a helyről való bérbeadásakor használ, és az alapértelmezett érték a Poolkövetkező:
4096
. - StreamPipeWriterOptions.Pool
MemoryPool<byte>
a memória kiosztásakor használatos, alapértelmezett értéke pedig a következő:null
.
Fontos
A metódusok létrehozásakor PipeReader
és PipeWriter
a példányok használatakor Create
figyelembe kell vennie az objektum élettartamát Stream
. Ha az olvasó vagy író befejezése után hozzá kell férnie a streamheztrue
, a létrehozási beállításoknál be kell állítania a LeaveOpen
jelölőt. Ellenkező esetben a stream bezárul.
Az alábbi kód bemutatja, hogy egy stream metódusait PipeReader
használva Create
hozhat létre példányokat és PipeWriter
példányokat.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
Az alkalmazás a StreamReader lorem-ipsum.txt fájlt adatfolyamként olvassa be, és üres sortal kell végződnie. Az FileStream át lett adva PipeReader.Create, amely létrehoz egy objektumot PipeReader
. A konzolalkalmazás ezután átadja a szabványos kimeneti streamet a használatnak PipeWriter.CreateConsole.OpenStandardOutput(). A példa támogatja a lemondást.
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: