Training
Module
Opdrachten verbinden met een pijplijn - Training
In deze module leert u hoe u opdrachten verbindt met een pijplijn.
Deze browser wordt niet meer ondersteund.
Upgrade naar Microsoft Edge om te profiteren van de nieuwste functies, beveiligingsupdates en technische ondersteuning.
System.IO.Pipelines is een bibliotheek die is ontworpen om het eenvoudiger te maken om I/O met hoge prestaties in .NET uit te voeren. Het is een bibliotheek die is gericht op .NET Standard die werkt op alle .NET-implementaties.
De bibliotheek is beschikbaar in het Nuget-pakket System.IO.Pipelines .
Apps die streaminggegevens parseren, bestaan uit standaardcode met veel gespecialiseerde en ongebruikelijke codestromen. De standaard- en speciale casecode is complex en moeilijk te onderhouden.
System.IO.Pipelines
is ontworpen voor:
De volgende code is gebruikelijk voor een TCP-server die berichten met regelscheidingstekens ontvangt (gescheiden door) '\n'
van een client:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
De voorgaande code heeft verschillende problemen:
ReadAsync
.stream.ReadAsync
. stream.ReadAsync
retourneert hoeveel gegevens zijn gelezen.ReadAsync
aanroep worden gelezen.byte
matrix met elke leesbewerking toegewezen.Om de voorgaande problemen op te lossen, zijn de volgende wijzigingen vereist:
Buffer de binnenkomende gegevens totdat er een nieuwe regel wordt gevonden.
Parseert alle regels die in de buffer worden geretourneerd.
Het is mogelijk dat de lijn groter is dan 1 kB (1024 bytes). De code moet het formaat van de invoerbuffer wijzigen totdat het scheidingsteken wordt gevonden om de volledige lijn in de buffer te kunnen aanpassen.
Overweeg het gebruik van bufferpooling om het toewijzen van geheugen herhaaldelijk te voorkomen.
Met de volgende code worden enkele van deze problemen opgelost:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
De vorige code is complex en heeft geen betrekking op alle geïdentificeerde problemen. Netwerken met hoge prestaties betekenen meestal het schrijven van complexe code om de prestaties te maximaliseren. System.IO.Pipelines
is ontworpen om het schrijven van dit type code eenvoudiger te maken.
De Pipe klasse kan worden gebruikt om een PipeWriter/PipeReader
paar te maken. Alle gegevens die in de PipeWriter
gegevens zijn geschreven, zijn beschikbaar in het PipeReader
volgende:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Er zijn twee lussen:
FillPipeAsync
leest van de Socket
en schrijft naar de PipeWriter
.ReadPipeAsync
leest van de PipeReader
binnenkomende regels en parseert deze.Er zijn geen expliciete buffers toegewezen. Alle bufferbeheer wordt gedelegeerd aan de PipeReader
en PipeWriter
implementaties. Het delegeren van bufferbeheer maakt het eenvoudiger om code te gebruiken om zich alleen te richten op de bedrijfslogica.
In de eerste lus:
PipeWriter
hoeveel gegevens naar de buffer zijn geschreven.PipeReader
.In de tweede lus verbruikt de PipeReader
buffers die zijn geschreven door PipeWriter
. De buffers zijn afkomstig van de socket. De oproep naar PipeReader.ReadAsync
:
Retourneert een ReadResult met twee belangrijke gegevens:
ReadOnlySequence<byte>
.IsCompleted
die aangeeft of het einde van de gegevens (EOF) is bereikt.Nadat u het einde van het regelscheidingsteken (EOL) hebt gevonden en de regel hebt geparserd:
PipeReader.AdvanceTo
wordt aangeroepen om te vertellen PipeReader
hoeveel gegevens er zijn verbruikt en onderzocht.De lezer- en schrijflussen eindigen door aan te roepen Complete
. Complete
laat de onderliggende pipe het geheugen vrijgeven dat is toegewezen.
In het ideale voorbeeld werken lezen en parseren samen:
Het parseren kost doorgaans meer tijd dan het kopiëren van blokken gegevens uit het netwerk:
Voor optimale prestaties is er een balans tussen frequente pauzes en het toewijzen van meer geheugen.
Om het voorgaande probleem op te lossen, heeft de Pipe
twee instellingen voor het beheren van de gegevensstroom:
PipeWriter.FlushAsync
hervat.ValueTask<FlushResult>
hoeveelheid gegevens in de Pipe
kruisingen PauseWriterThreshold
.ValueTask<FlushResult>
Wordt voltooid wanneer deze lager wordt dan ResumeWriterThreshold
.Er worden twee waarden gebruikt om snelle cyclussen te voorkomen, die kunnen optreden als één waarde wordt gebruikt.
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
Wanneer u doorgaans asynchrone code gebruikt async
en await
asynchroon wordt hervat op een TaskScheduler of de huidige SynchronizationContextcode.
Bij het uitvoeren van I/O is het belangrijk om nauwkeurige controle te hebben over waar de I/O wordt uitgevoerd. Met dit besturingselement kunt u effectief profiteren van CPU-caches. Efficiënte caching is essentieel voor krachtige apps zoals webservers. PipeScheduler biedt controle over waar asynchrone callbacks worden uitgevoerd. Standaard:
SynchronizationContext
is, wordt de thread-pool gebruikt om callbacks uit te voeren.public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool is de PipeScheduler implementatie waarmee callbacks naar de thread-pool worden geplaatst. PipeScheduler.ThreadPool
is de standaardinstelling en over het algemeen de beste keuze. PipeScheduler.Inline kan onbedoelde gevolgen veroorzaken, zoals impasses.
Het is vaak efficiënt om het Pipe
object opnieuw te gebruiken. Als u de pipe opnieuw wilt instellen, roept PipeReader Reset u aan wanneer zowel de PipeReader
als PipeWriter
de pijp is voltooid.
PipeReader beheert het geheugen namens de beller. Bel altijd na het bellenPipeReader.ReadAsync.PipeReader.AdvanceTo Hiermee wordt PipeReader
aangegeven wanneer de beller klaar is met het geheugen, zodat deze kan worden bijgehouden. De ReadOnlySequence<byte>
geretourneerde PipeReader.ReadAsync
waarde is alleen geldig tot de aanroep .PipeReader.AdvanceTo
Het is illegaal om te gebruiken ReadOnlySequence<byte>
na het bellen PipeReader.AdvanceTo
.
PipeReader.AdvanceTo
heeft twee SequencePosition argumenten:
Het markeren van gegevens als verbruikt betekent dat de pijp het geheugen kan retourneren naar de onderliggende buffergroep. Als u gegevens markeert zoals waargenomen, bepaalt u wat de volgende aanroep doet PipeReader.ReadAsync
. Als u alles markeert zoals waargenomen, betekent dit dat de volgende aanroep PipeReader.ReadAsync
niet wordt geretourneerd totdat er meer gegevens naar de pijp worden geschreven. Elke andere waarde zal de volgende aanroep maken om onmiddellijk te PipeReader.ReadAsync
retourneren met de waargenomen en niet-geobserveerde gegevens, maar niet met gegevens die al zijn gebruikt.
Er zijn een aantal typische patronen die zich voordoen bij het lezen van streaminggegevens:
In de volgende voorbeelden wordt de methode gebruikt voor het TryParseLines
parseren van berichten uit een ReadOnlySequence<byte>
. TryParseLines
parseert één bericht en werkt de invoerbuffer bij om het geparseerde bericht uit de buffer te knippen. TryParseLines
maakt geen deel uit van .NET, het is een door de gebruiker geschreven methode die in de volgende secties wordt gebruikt.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
De volgende code leest één bericht van een PipeReader
en retourneert het bericht naar de aanroeper.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Met de voorgaande code wordt:
SequencePosition
SequencePosition
gegevens bij zodat deze verwijzen naar het begin van de ingekorte invoerbuffer.De twee SequencePosition
argumenten worden bijgewerkt omdat TryParseLines
het geparseerde bericht uit de invoerbuffer wordt verwijderd. Over het algemeen moet bij het parseren van één bericht uit de buffer de onderzochte positie een van de volgende zijn:
De case met één bericht heeft het meeste potentieel voor fouten. Het doorgeven van de verkeerde waarden die moeten worden onderzocht , kan resulteren in een uitzondering op het geheugen of een oneindige lus. Zie de sectie Algemene problemen van PipeReader in dit artikel voor meer informatie.
Met de volgende code worden alle berichten van een PipeReader
en aanroepen ProcessMessageAsync
op elke code gelezen.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader.ReadAsync
:
CancellationToken
status wordt geannuleerd terwijl er een leesbewerking in behandeling is.PipeReader.CancelPendingRead
zorgt ervoor dat de huidige of volgende aanroep PipeReader.ReadAsync
een ReadResult met IsCanceled
set retourneert.true
Dit kan handig zijn voor het stoppen van de bestaande leeslus op een niet-destructieve en niet-uitzonderlijke manier.private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Het doorgeven van de verkeerde waarden aan consumed
of examined
kan leiden tot het lezen van gegevens die al zijn gelezen.
Het doorgeven buffer.End
als onderzocht kan het volgende tot gevolg hebben:
PipeReader.AdvanceTo(position, buffer.End)
het verwerken van één bericht tegelijk vanuit de buffer.Het doorgeven van de verkeerde waarden aan consumed
of examined
kan resulteren in een oneindige lus. Als buffer.Start
dit bijvoorbeeld niet is gewijzigd, PipeReader.AdvanceTo(buffer.Start)
wordt de volgende aanroep PipeReader.ReadAsync
onmiddellijk geretourneerd voordat nieuwe gegevens binnenkomen.
Het doorgeven van de verkeerde waarden aan consumed
of examined
kan resulteren in oneindige buffering (uiteindelijke OOM).
Als u de ReadOnlySequence<byte>
aanroep na aanroep PipeReader.AdvanceTo
gebruikt, kan dit leiden tot beschadiging van het geheugen (gebruik na gratis gebruik).
Het niet aanroepen PipeReader.Complete/CompleteAsync
kan leiden tot een geheugenlek.
Het controleren ReadResult.IsCompleted en afsluiten van de leeslogica voordat de buffer wordt verwerkt, resulteert in gegevensverlies. De voorwaarde voor het afsluiten van de lus moet zijn gebaseerd op ReadResult.Buffer.IsEmpty
en ReadResult.IsCompleted
. Als u dit onjuist doet, kan dit resulteren in een oneindige lus.
❌Gegevensverlies
Het ReadResult
laatste segment van de gegevens kan worden geretourneerd wanneer IsCompleted
deze is ingesteld op true
. Als u deze gegevens niet leest voordat u de leeslus afsluit, gaan gegevens verloren.
Waarschuwing
Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Waarschuwing
Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
❌Oneindige lus
De volgende logica kan resulteren in een oneindige lus als de Result.IsCompleted
is true
, maar er nooit een volledig bericht in de buffer staat.
Waarschuwing
Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Waarschuwing
Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
Hier volgt nog een stukje code met hetzelfde probleem. Er wordt gecontroleerd op een niet-lege buffer voordat deze wordt gecontroleerd ReadResult.IsCompleted
. Omdat het zich in een else if
bevindt, wordt het voor altijd herhaald als er nooit een volledig bericht in de buffer staat.
Waarschuwing
Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Waarschuwing
Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
❌Niet-reagerende toepassing
Onvoorwaardelijke aanroepen PipeReader.AdvanceTo
met buffer.End
in de examined
positie kan ertoe leiden dat de toepassing niet meer reageert bij het parseren van één bericht. De volgende aanroep om pas terug te PipeReader.AdvanceTo
keren:
Waarschuwing
Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Waarschuwing
Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
❌Onvoldoende geheugen (OOM)
Met de volgende voorwaarden blijft de volgende code bufferen totdat er een OutOfMemoryException probleem optreedt:
PipeReader
gegevens worden geretourneerd, maken geen volledig bericht. Het maakt bijvoorbeeld geen volledig bericht omdat de andere kant een groot bericht schrijft (bijvoorbeeld een bericht van 4 GB).Waarschuwing
Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Waarschuwing
Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
❌Geheugenbeschadiging
Bij het schrijven van helpers die de buffer lezen, moet elke geretourneerde nettolading worden gekopieerd voordat u aanroept Advance
. In het volgende voorbeeld wordt geheugen geretourneerd dat is Pipe
verwijderd en opnieuw kan worden gebruikt voor de volgende bewerking (lezen/schrijven).
Waarschuwing
Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Waarschuwing
Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
De PipeWriter beheert buffers voor het schrijven namens de beller. PipeWriter
implementeert IBufferWriter<byte>
. IBufferWriter<byte>
maakt het mogelijk om toegang te krijgen tot buffers om schrijfbewerkingen uit te voeren zonder extra bufferkopieën.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
De vorige code:
PipeWriter
gebruik GetMemory.Memory<byte>
tekenreeks"Hello"
.PipeWriter
bytes naar het onderliggende apparaat verzonden.De vorige schrijfmethode maakt gebruik van de buffers die door de PipeWriter
. Het kan ook hebben gebruikt PipeWriter.WriteAsync, wat:
PipeWriter
.GetSpan
, Advance
indien van toepassing en oproepen FlushAsync.async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
FlushAsync ondersteunt het doorgeven van een CancellationToken. Het doorgeven van een CancellationToken
resultaat in een OperationCanceledException
als het token wordt geannuleerd terwijl er een leegmaken in behandeling is. PipeWriter.FlushAsync
ondersteunt een manier om de huidige flush-bewerking te annuleren zonder PipeWriter.CancelPendingFlush een uitzondering op te geven. Het aanroepen PipeWriter.CancelPendingFlush
veroorzaakt de huidige of volgende oproep naar PipeWriter.FlushAsync
of PipeWriter.WriteAsync
retourneert een FlushResult met IsCanceled
de instelling true
. Dit kan nuttig zijn voor het stoppen van de opbrengst op een niet-destructieve en niet-uitzonderlijke manier.
GetMemory
of GetSpan
terwijl er een onvolledig gesprek is, FlushAsync
is niet veilig.Complete
of CompleteAsync
terwijl er niet-geflusheerde gegevens zijn, kan leiden tot beschadiging van het geheugen.Met de volgende tips kunt u de System.IO.Pipelines klassen gebruiken:
await
PipeWriter.FlushAsync het schrijven en altijd controleren FlushResult.IsCompleted. Schrijf af als IsCompleted
dat zo is true
, zoals dat aangeeft dat de lezer is voltooid en geeft niet langer om wat er is geschreven.PipeReader
hebben.FlushAsync
aan als de lezer pas kan beginnen als FlushAsync
deze is voltooid, omdat dit een impasse kan veroorzaken.PipeReader
of PipeWriter
of toegang heeft tot deze context. Deze typen zijn niet thread-safe.AdvanceTo
of voltooien van de PipeReader
.Het IDuplexPipe is een contract voor typen die zowel lezen als schrijven ondersteunen. Een netwerkverbinding wordt bijvoorbeeld vertegenwoordigd door een IDuplexPipe
.
In tegenstelling tot Pipe
, dat een PipeReader
en een PipeWriter
bevat, IDuplexPipe
vertegenwoordigt een enkele zijde van een volledige duplex-verbinding. Dat betekent dat wat naar de PipeWriter
brief wordt geschreven, niet wordt gelezen uit de PipeReader
.
Wanneer u streamgegevens leest of schrijft, leest u doorgaans gegevens met behulp van een de-serializer en schrijft u gegevens met behulp van een serializer. De meeste van deze stream-API's voor lezen en schrijven hebben een Stream
parameter. Om het gemakkelijker te maken om te integreren met deze bestaande API's PipeReader
en PipeWriter
een AsStream methode beschikbaar te maken. AsStream retourneert een Stream
implementatie rond de PipeReader
of PipeWriter
.
PipeReader
en PipeWriter
exemplaren kunnen worden gemaakt met behulp van de statische Create
methoden op basis van een Stream object en optionele overeenkomstige aanmaakopties.
De StreamPipeReaderOptions mogelijkheid voor controle over het maken van het PipeReader
exemplaar met de volgende parameters:
4096
.PipeReader
bewerking is voltooid en dat de standaardwaarden zijn ingesteld false
op .1024
.MemoryPool<byte>
toewijzen van geheugen en wordt standaard ingesteld op null
.De StreamPipeWriterOptions mogelijkheid voor controle over het maken van het PipeWriter
exemplaar met de volgende parameters:
PipeWriter
bewerking is voltooid en dat de standaardwaarden zijn ingesteld false
op .4096
.MemoryPool<byte>
toewijzen van geheugen en wordt standaard ingesteld op null
.Belangrijk
Bij het maken PipeReader
en PipeWriter
uitvoeren van exemplaren met behulp van de Create
methoden moet u rekening houden met de levensduur van het Stream
object. Als u toegang tot de stream nodig hebt nadat de lezer of schrijver ermee klaar is, moet u de LeaveOpen
vlag true
instellen op de aanmaakopties. Anders wordt de stream gesloten.
In de volgende code ziet u hoe u exemplaren kunt maken en PipeWriter
exemplaren gebruikt met behulp van PipeReader
de Create
methoden uit een stream.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
De toepassing gebruikt een StreamReader bestand voor het lezen van het lorem-ipsum.txt als een stroom en moet eindigen met een lege regel. De FileStream wordt doorgegeven aan PipeReader.Create, waarmee een PipeReader
object wordt geïnstitueerd. De consoletoepassing geeft vervolgens de standaarduitvoerstroom door aan PipeWriter.Create het gebruik van Console.OpenStandardOutput(). In het voorbeeld wordt annulering ondersteund.
.NET-feedback
.NET is een open source project. Selecteer een koppeling om feedback te geven:
Training
Module
Opdrachten verbinden met een pijplijn - Training
In deze module leert u hoe u opdrachten verbindt met een pijplijn.