Training
Modul
Verknüpfen von Befehlen in einer Pipeline - Training
In diesem Modul erfahren Sie, wie Sie Befehle in einer Pipeline verknüpfen.
Dieser Browser wird nicht mehr unterstützt.
Führen Sie ein Upgrade auf Microsoft Edge durch, um die neuesten Features, Sicherheitsupdates und den technischen Support zu nutzen.
System.IO.Pipelines ist eine Bibliothek, die entwickelt wurde, um die Ausführung von Hochleistungs-E/A in .NET zu erleichtern. Dabei handelt es sich um eine Bibliothek für .NET Standard, die mit allen .NET-Implementierungen kompatibel ist.
Die Bibliothek ist im Nuget-Paket System.IO.Pipelines verfügbar.
Apps, die Streamingdaten analysieren, bestehen aus Codebausteinen, die über viele spezialisierte und ungewöhnliche Codeflows verfügen. Die Codebausteine und der Sonderfallcode sind komplex und schwer zu verwalten.
System.IO.Pipelines
soll Folgendes ermöglichen:
Der folgende Code ist typisch für einen TCP-Server, der durch Zeilen getrennte Nachrichten (getrennt durch '\n'
) von einem Client empfängt:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Der vorstehende Code weist mehrere Probleme auf:
ReadAsync
empfangen.stream.ReadAsync
wird ignoriert. stream.ReadAsync
gibt zurück, wie viele Daten gelesen wurden.ReadAsync
gelesen werden, wird nicht verarbeitet.byte
-Array zugeordnet.Um die oben beschriebenen Probleme zu beheben, sind die folgenden Änderungen erforderlich:
Puffern der eingehenden Daten, bis eine neue Zeile gefunden wird.
Analysieren aller zurückgegebenen Zeilen im Puffer.
Es ist möglich, dass die Zeile größer als 1 KB (1.024 Bytes) ist. Der Code muss die Größe des Eingabepuffers ändern, bis das Trennzeichen gefunden wird, damit die gesamte Zeile in den Puffer passt.
Verwenden Sie ggf. Pufferpools, um zu vermeiden, dass wiederholt Speicher zugeteilt wird.
Der folgende Code behandelt einige dieser Probleme:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Der oben gezeigte Code ist komplex und behandelt nicht alle identifizierten Probleme. Hochleistungsnetzwerke bedeuten in der Regel das Schreiben von komplexem Code, um die Leistung zu maximieren. System.IO.Pipelines
wurde entworfen, um das Schreiben dieser Art von Code zu vereinfachen.
Die Pipe-Klasse kann verwendet werden, um ein PipeWriter/PipeReader
-Paar zu erstellen. Alle Daten, die in PipeWriter
geschrieben werden, sind in PipeReader
verfügbar:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Es gibt zwei Schleifen:
FillPipeAsync
liest aus Socket
und schreibt in PipeWriter
.ReadPipeAsync
liest aus PipeReader
und analysiert eingehende Zeilen.Es sind keine expliziten Puffer zugeordnet. Die gesamte Pufferverwaltung wird an die PipeReader
- und PipeWriter
-Implementierungen delegiert. Durch die Delegierung der Pufferverwaltung ist es für Daten verarbeitenden Code einfacher, sich ausschließlich auf die Geschäftslogik zu konzentrieren.
In der ersten Schleife geschieht Folgendes:
PipeWriter
mitzuteilen, wie viele Daten in den Puffer geschrieben wurden.PipeReader
verfügbar zu machen.In der zweiten Schleife verarbeitet PipeReader
die von PipeWriter
geschriebenen Puffer. Die Puffer stammen aus dem Socket. Für den Aufruf von PipeReader.ReadAsync
gilt Folgendes:
Er gibt ein ReadResult zurück, das zwei wichtige Informationen enthält:
ReadOnlySequence<byte>
.IsCompleted
, der angibt, ob das Ende der Daten (EOF) erreicht wurde.Nachdem das Zeilenende-Trennzeichen (EOL) gefunden und die Zeile analysiert wurde, geschieht Folgendes:
PipeReader.AdvanceTo
wird aufgerufen, um PipeReader
mitzuteilen, wie viele Daten verarbeitet und untersucht wurden.Die Reader- und Writerschleifen werden durch den Aufruf von Complete
beendet. Complete
ermöglicht der zugrunde liegenden Pipe, den zugeordneten Arbeitsspeicher freizugeben.
Im Idealfall arbeiten die Lese- und Analysevorgänge zusammen:
In der Regel nimmt die Analyse mehr Zeit als das Kopieren von Datenblöcken aus dem Netzwerk in Anspruch:
Für optimale Leistung ist ein Gleichgewicht zwischen häufigen Pausen und der Zuteilung von mehr Speicherplatz vorhanden.
Zum Beheben des oben beschriebenen Problems hat Pipe
zwei Einstellungen, um den Datenfluss zu steuern:
PipeWriter.FlushAsync
fortgesetzt werden.ValueTask<FlushResult>
zurück, wenn die Datenmenge in Pipe
PauseWriterThreshold
erreicht.ValueTask<FlushResult>
ab, wenn der Wert kleiner als ResumeWriterThreshold
wird.Es werden zwei Werte verwendet, um einen schnellen Zyklus zu verhindern, der bei Verwendung nur eines Werts auftreten kann.
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
In der Regel wird der asynchrone Code bei Verwendung von async
und await
entweder für einen TaskScheduler oder den aktuellen SynchronizationContext fortgesetzt.
Wenn E/A-Vorgänge durchgeführt werden, ist es wichtig, genau zu steuern, wo die E/A-Vorgänge durchgeführt werden. Diese Steuerung ermöglicht die effektive Nutzung von CPU-Caches. Effiziente Zwischenspeicherung ist für Hochleistungs-Apps wie Webserver von entscheidender Bedeutung. PipeScheduler bietet die Kontrolle darüber, wo asynchrone Rückrufe ausgeführt werden. Standardmäßig:
SynchronizationContext
vorhanden ist, wird der Threadpool verwendet, um Rückrufe auszuführen.public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool ist die PipeScheduler-Implementierung, die Rückrufe des Threadpools der Warteschlange hinzufügt. PipeScheduler.ThreadPool
ist die Standardeinstellung und in der Regel die beste Wahl. Pipescheduler.Inline kann unbeabsichtigte Folgen verursachen, z.B. Deadlocks.
Es ist häufig effizient, das Pipe
-Objekt wiederzuverwenden. Zum Zurücksetzen der Pipe rufen Sie PipeReader Reset auf, wenn PipeReader
sowie PipeWriter
abgeschlossen ist.
PipeReader verwaltet den Arbeitsspeicher im Auftrag des Aufrufers. Rufen Sie immer PipeReader.AdvanceTo auf, nachdem PipeReader.ReadAsync aufgerufen wurde. Dadurch kann PipeReader
erkennen, wenn der Aufrufer Vorgänge im Arbeitsspeicher abgeschlossen hat, damit sie nachverfolgt werden können. Das PipeReader.ReadAsync
, das von ReadOnlySequence<byte>
zurückgegeben wird, ist nur bis zum Aufruf von PipeReader.AdvanceTo
gültig. Es ist unzulässig, ReadOnlySequence<byte>
nach dem Aufruf von PipeReader.AdvanceTo
zu verwenden.
PipeReader.AdvanceTo
nimmt zwei SequencePosition-Argumente an.
Das Markieren von Daten als verarbeitet bedeutet, dass die Pipe den Arbeitsspeicher an den zugrunde liegenden Pufferpool zurückgeben kann. Durch das Markieren von Daten als untersucht wird gesteuert, welche Aktion der nächste Aufruf von PipeReader.ReadAsync
ausführt. Wenn alle Daten als untersucht markiert sind, bedeutet dies, dass der nächste Aufruf von PipeReader.ReadAsync
erst dann eine Rückgabe liefert, wenn weitere Daten in die Pipe geschrieben werden. Jeder andere Wert führt dazu, dass der nächste Aufruf von PipeReader.ReadAsync
sofort die untersuchten und die nicht untersuchten Daten zurückgibt. Es handelt sich jedoch nicht um bereits verarbeitete Daten.
Beim Versuch, Streamingdaten zu lesen, gibt es einige typische Muster:
In den folgenden Beispielen wird die TryParseLines
-Methode zum Analysieren von Nachrichten aus ReadOnlySequence<byte>
verwendet. TryParseLines
analysiert eine einzelne Nachricht und aktualisiert den Eingabepuffer, um die analysierte Nachricht aus dem Puffer zu kürzen. TryParseLines
ist nicht Bestandteil von .NET, sondern eine vom Benutzer erstellte Methode, die in den folgenden Abschnitten verwendet wird.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Der folgende Code liest eine einzelne Nachricht aus PipeReader
und gibt sie an den Aufrufer zurück.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Der vorangehende Code:
SequencePosition
und die untersuchte SequencePosition
, um auf den Anfang des gekürzten Eingabepuffers zu verweisen.Die beiden SequencePosition
-Argumente werden aktualisiert, weil TryParseLines
die analysierte Nachricht aus dem Eingabepuffer entfernt. Wenn eine einzelne Nachricht aus dem Puffer analysiert wird, sollte die untersuchte Position im Allgemeinen eine der folgenden Positionen sein:
Das Szenario mit einer einzelnen Nachricht weist das größte Fehlerpotenzial auf. Wenn Sie die falschen Werte an examined (untersucht) übergeben, kann dies zu einer Ausnahme des Typs „Nicht genügend Arbeitsspeicher“ oder einer Endlosschleife führen. Weitere Informationen finden Sie im Abschnitt Allgemeine PipeReader-Probleme in diesem Artikel.
Der folgende Code liest alle Nachrichten aus einem PipeReader
und ruft für jede Nachricht ProcessMessageAsync
auf.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader.ReadAsync
:
CancellationToken
abgebrochen wird, während ein Lesevorgang aussteht.PipeReader.CancelPendingRead
bewirkt, dass der aktuelle oder nächste Aufruf von PipeReader.ReadAsync
ein ReadResult zurückgibt, wobei IsCanceled
auf true
festgelegt ist. Dies kann nützlich sein, um die vorhandene Leseschleife auf nicht destruktive Weise und ohne Auslösen einer Ausnahme anzuhalten.private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Wenn die falschen Werte an consumed
oder examined
übergeben werden, können bereits gelesene Daten erneut gelesen werden.
Wenn Sie buffer.End
als untersucht übergeben, kann dies zu folgenden Ergebnissen führen:
PipeReader.AdvanceTo(position, buffer.End)
, wenn jeweils eine einzelne Nachricht aus dem Puffer verarbeitet wird.Wenn die falschen Werte an consumed
oder examined
übergeben werden, kann dies zu einer Endlosschleife führen. PipeReader.AdvanceTo(buffer.Start)
, wenn sich buffer.Start
nicht geändert hat, bewirkt beispielsweise, dass der nächste Aufruf von PipeReader.ReadAsync
sofort vor dem Eintreffen neuer Daten zurückgegeben wird.
Wenn die falschen Werte an consumed
oder examined
übergeben werden, kann dies zu einer Endlospufferung führen (möglicherweise OOM).
Wenn Sie ReadOnlySequence<byte>
nach dem Aufruf von PipeReader.AdvanceTo
verwenden, kann dies zu einer Beschädigung des Arbeitsspeichers führen (Verwendung nach dem Freigeben).
Wenn PipeReader.Complete/CompleteAsync
nicht aufgerufen wird, kann dies zu einem Arbeitsspeicherverlust führen.
Das Überprüfen von ReadResult.IsCompleted und das Beenden der Leselogik vor dem Verarbeiten der Pufferergebnisse führt zu Datenverlusten. Die Beendigungsbedingung der Schleife sollte auf ReadResult.Buffer.IsEmpty
und ReadResult.IsCompleted
basieren. Wenn dieser Vorgang nicht ordnungsgemäß erfolgt, kann dies zu einer Endlosschleife führen.
❌Datenverlust
ReadResult
kann das finale Datensegment zurückgeben, wenn IsCompleted
auf true
festgelegt ist. Das Nichtlesen dieser Daten vor dem Beenden der Leseschleife führt zu Datenverlusten.
Warnung
Verwenden Sie NICHT den folgenden Code. Das Verwenden dieses Beispiels führt zu Datenverlusten, nicht reagierenden Apps, Sicherheitsproblemen und sollte NICHT kopiert werden. Das folgende Beispiel wird bereitgestellt, um häufige PipeReader-Probleme zu verdeutlichen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Warnung
Verwenden Sie den oben gezeigten Code NICHT. Das Verwenden dieses Beispiels führt zu Datenverlusten, nicht reagierenden Apps, Sicherheitsproblemen und sollte NICHT kopiert werden. Das Beispiel oben wird bereitgestellt, um häufige PipeReader-Probleme zu verdeutlichen.
❌Endlosschleife
Die folgende Logik kann zu einer Endlosschleife führen, wenn Result.IsCompleted
true
ist, im Puffer aber niemals eine vollständige Nachricht vorhanden ist.
Warnung
Verwenden Sie NICHT den folgenden Code. Das Verwenden dieses Beispiels führt zu Datenverlusten, nicht reagierenden Apps, Sicherheitsproblemen und sollte NICHT kopiert werden. Das folgende Beispiel wird bereitgestellt, um häufige PipeReader-Probleme zu verdeutlichen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Warnung
Verwenden Sie den oben gezeigten Code NICHT. Das Verwenden dieses Beispiels führt zu Datenverlusten, nicht reagierenden Apps, Sicherheitsproblemen und sollte NICHT kopiert werden. Das Beispiel oben wird bereitgestellt, um häufige PipeReader-Probleme zu verdeutlichen.
Im Folgenden finden Sie einen weiteren Codeausschnitt mit dem gleichen Problem. Der Code überprüft, ob ein nicht leerer Puffer vorhanden ist, bevor ReadResult.IsCompleted
überprüft wird. Da dies in einer else if
-Anweisung stattfindet, ist das Ergebnis eine Endlosschleife, wenn im Puffer niemals eine vollständige Nachricht vorhanden ist.
Warnung
Verwenden Sie NICHT den folgenden Code. Das Verwenden dieses Beispiels führt zu Datenverlusten, nicht reagierenden Apps, Sicherheitsproblemen und sollte NICHT kopiert werden. Das folgende Beispiel wird bereitgestellt, um häufige PipeReader-Probleme zu verdeutlichen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Warnung
Verwenden Sie den oben gezeigten Code NICHT. Das Verwenden dieses Beispiels führt zu Datenverlusten, nicht reagierenden Apps, Sicherheitsproblemen und sollte NICHT kopiert werden. Das Beispiel oben wird bereitgestellt, um häufige PipeReader-Probleme zu verdeutlichen.
❌Anwendung reagiert nicht
Der Aufruf von PipeReader.AdvanceTo
mit buffer.End
an der examined
-Position ohne Bedingungen kann beim Analysieren einer einzelnen Nachricht dazu führen, dass die Anwendung nicht mehr reagiert. Der nächste Aufruf von PipeReader.AdvanceTo
liefert erst eine Rückgabe, wenn:
Warnung
Verwenden Sie NICHT den folgenden Code. Das Verwenden dieses Beispiels führt zu Datenverlusten, nicht reagierenden Apps, Sicherheitsproblemen und sollte NICHT kopiert werden. Das folgende Beispiel wird bereitgestellt, um häufige PipeReader-Probleme zu verdeutlichen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Warnung
Verwenden Sie den oben gezeigten Code NICHT. Das Verwenden dieses Beispiels führt zu Datenverlusten, nicht reagierenden Apps, Sicherheitsproblemen und sollte NICHT kopiert werden. Das Beispiel oben wird bereitgestellt, um häufige PipeReader-Probleme zu verdeutlichen.
❌Nicht genügend Arbeitsspeicher (Out of Memory, OOM)
Mit den folgenden Bedingungen behält der folgende Code die Pufferung bei, bis eine OutOfMemoryException auftritt:
PipeReader
zurückgegebenen Daten bilden keine vollständige Nachricht. Beispielsweise ergibt sich keine vollständige Nachricht, weil die andere Seite eine große Nachricht schreibt (z.B. eine Nachricht mit 4 GB).Warnung
Verwenden Sie NICHT den folgenden Code. Das Verwenden dieses Beispiels führt zu Datenverlusten, nicht reagierenden Apps, Sicherheitsproblemen und sollte NICHT kopiert werden. Das folgende Beispiel wird bereitgestellt, um häufige PipeReader-Probleme zu verdeutlichen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Warnung
Verwenden Sie den oben gezeigten Code NICHT. Das Verwenden dieses Beispiels führt zu Datenverlusten, nicht reagierenden Apps, Sicherheitsproblemen und sollte NICHT kopiert werden. Das Beispiel oben wird bereitgestellt, um häufige PipeReader-Probleme zu verdeutlichen.
❌Speicherbeschädigung
Beim Schreiben von Hilfsprogrammen, die den Puffer lesen, sollte jede zurückgegebene Nutzlast kopiert werden, bevor Advance
aufgerufen wird. Im folgenden Beispiel wird der Arbeitsspeicher zurückgegeben, der von Pipe
verworfen wurde. Er kann für den nächsten Vorgang (Lese-/Schreibzugriff) wiederverwendet werden.
Warnung
Verwenden Sie NICHT den folgenden Code. Das Verwenden dieses Beispiels führt zu Datenverlusten, nicht reagierenden Apps, Sicherheitsproblemen und sollte NICHT kopiert werden. Das folgende Beispiel wird bereitgestellt, um häufige PipeReader-Probleme zu verdeutlichen.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Warnung
Verwenden Sie den oben gezeigten Code NICHT. Das Verwenden dieses Beispiels führt zu Datenverlusten, nicht reagierenden Apps, Sicherheitsproblemen und sollte NICHT kopiert werden. Das Beispiel oben wird bereitgestellt, um häufige PipeReader-Probleme zu verdeutlichen.
PipeWriter verwaltet Puffer zum Schreiben im Auftrag des Aufrufers. PipeWriter
implementiert IBufferWriter<byte>
. IBufferWriter<byte>
ermöglicht es, Zugriff auf Puffer zu erhalten, um Schreibvorgänge ohne zusätzliche Pufferkopien auszuführen.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Der vorherige Code:
PipeWriter
unter Verwendung von GetMemory an."Hello"
in das zurückgegebene Memory<byte>
-Element.PipeWriter
-Element, das die Bytes an das zugrunde liegende Gerät sendet.Die vorherige Methode zum Schreiben verwendet die Puffer, die von PipeWriter
bereitgestellt werden. Es hätte auch PipeWriter.WriteAsync verwendet werden können, das:
PipeWriter
.GetSpan
, Advance
nach Bedarf auf und ruft dann FlushAsync auf.async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
FlushAsync unterstützt das Übergeben eines CancellationToken-Elements. Das Übergeben eines CancellationToken
führt zu einer OperationCanceledException
, wenn das Token abgebrochen wird, während ein Leerungsvorgang aussteht. PipeWriter.FlushAsync
unterstützt die Möglichkeit, den aktuellen Leerungsvorgang über PipeWriter.CancelPendingFlush abzubrechen, ohne eine Ausnahme auszulösen. Der Aufruf von PipeWriter.CancelPendingFlush
bewirkt, dass der aktuelle oder nächste Aufruf von PipeWriter.FlushAsync
oder PipeWriter.WriteAsync
ein FlushResult zurückgibt, wobei IsCanceled
auf true
festgelegt ist. Dies kann nützlich sein, um die sich ergebende Leerung auf nicht destruktive Weise und ohne Auslösen einer Ausnahme anzuhalten.
GetMemory
oder GetSpan
ist während eines unvollständiger Aufrufs von FlushAsync
nicht sicher.Complete
oder CompleteAsync
aufrufen, während nicht geleerte Daten vorhanden sind, kann dies zu einer Speicherbeschädigung führen.Die folgenden Tipps helfen Ihnen bei der erfolgreichen Verwendung der System.IO.Pipelines-Klassen:
await
PipeWriter.FlushAsync, und überprüfen Sie immer FlushResult.IsCompleted. Abbrechen des Schreibens, wenn IsCompleted
ist true
, da dies angibt, dass der Reader abgeschlossen ist und nicht mehr überwacht, was geschrieben wird.PipeReader
Zugriff haben soll.FlushAsync
, wenn der Reader erst starten kann, wenn FlushAsync
abgeschlossen ist, da dies zu einem Deadlock führen kann.PipeReader
oder PipeWriter
„besitzt“ oder auf sie zugreift. Diese Typen sind nicht threadsicher.AdvanceTo
aufgerufen oder PipeReader
abgeschlossen haben.IDuplexPipe ist ein Vertrag für Typen, die sowohl Lese- als auch Schreibvorgänge unterstützen. Eine Netzwerkverbindung würde z.B. durch eine IDuplexPipe
dargestellt werden.
Im Gegensatz zum Pipe
-Element, das eine PipeReader
- und eine PipeWriter
-Klasse enthält, stellt IDuplexPipe
nur eine Seite einer vollständigen Duplexverbindung dar. Dies bedeutet, dass die in PipeWriter
geschriebenen Informationen von PipeReader
gelesen werden.
Beim Lesen oder Schreiben von Streamdaten lesen Sie Daten in der Regel mithilfe eines Deserialisierers und schreiben Daten mit einem Serialisierer. Die meisten dieser APIs zum Lesen und Schreiben eines Datenstrom verfügen über einen Stream
-Parameter. Um die Integration mit diesen vorhandenen APIs zu vereinfachen, machen PipeReader
und PipeWriter
eine AsStream-Methode verfügbar. AsStream gibt eine Stream
-Implementierung um PipeReader
oder PipeWriter
zurück.
PipeReader
- und PipeWriter
-Instanzen können mithilfe der statischen Methode Create
erstellt werden, sofern ein Stream-Objekt und optional entsprechende Erstellungsoptionen vorhanden sind.
Mit StreamPipeReaderOptions kann die Erstellung einer PipeReader
-Instanz gesteuert werden, indem folgende Parameter verwendet werden:
4096
.PipeReader
geöffnet bleibt. Der Standardwert ist false
.1024
.MemoryPool<byte>
-Klasse. Der Standardwert ist null
.Mit StreamPipeWriterOptions kann die Erstellung einer PipeWriter
-Instanz gesteuert werden, indem folgende Parameter verwendet werden:
PipeWriter
geöffnet bleibt. Der Standardwert ist false
.4096
verwendet werden soll. Der Standardwert ist Pool.MemoryPool<byte>
-Klasse. Der Standardwert ist null
.Wichtig
Wenn Sie PipeReader
- und PipeWriter
-Instanzen mithilfe von Create
-Methoden erstellen, müssen Sie die Lebensdauer des Stream
-Objekts berücksichtigen. Wenn Sie Zugriff auf den Stream benötigen, nachdem der Reader oder Writer beendet wurde, müssen Sie das Flag in den Erstellungsoptionen LeaveOpen
auf true
festlegen. Andernfalls wird der Stream geschlossen.
Im folgenden Code wird die Erstellung von PipeReader
- und PipeWriter
-Instanzen mithilfe der Create
-Methoden über einen Stream veranschaulicht.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
Die Anwendung verwendet ein StreamReader an, um die lorem-ipsum.txt-Datei als Stream zu lesen, und sie muss mit einer leeren Zeile enden. FileStream wird an die PipeReader.Create-Methode übergeben, die ein PipeReader
-Objekt instanziiert. Die Konsolenanwendung übergibt dann ihren Standardausgabestream an Console.OpenStandardOutput(). Sie verwendet hierzu PipeWriter.Create. In diesem Beispiel wird ein Abbruch unterstützt.
Feedback zu .NET
.NET ist ein Open Source-Projekt. Wählen Sie einen Link aus, um Feedback zu geben:
Training
Modul
Verknüpfen von Befehlen in einer Pipeline - Training
In diesem Modul erfahren Sie, wie Sie Befehle in einer Pipeline verknüpfen.