Formazione
Modulo
Connettere i comandi in una pipeline - Training
In questo modulo si apprenderà come connettere i comandi in una pipeline.
Questo browser non è più supportato.
Esegui l'aggiornamento a Microsoft Edge per sfruttare i vantaggi di funzionalità più recenti, aggiornamenti della sicurezza e supporto tecnico.
System.IO.Pipelines è una libreria progettata per semplificare l'esecuzione di operazioni di I/O ad alte prestazioni in .NET. È una libreria destinata a .NET Standard che funziona su tutte le implementazioni .NET.
Questa libreria è disponibile nel pacchetto NuGet System.IO.Pipelines.
Le app che analizzano i dati di streaming sono costituite da codice boilerplate con molti flussi di codice specializzati e insoliti. Il boilerplate e il codice caso speciale sono complessi e difficili da gestire.
System.IO.Pipelines
è stato progettato per:
Il codice seguente è tipico per un server TCP che riceve messaggi delimitati da riga (delimitati da '\n'
) da un client:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Il codice precedente presenta diversi problemi:
ReadAsync
.stream.ReadAsync
. stream.ReadAsync
restituisce la quantità di dati letti.ReadAsync
.byte
a ogni lettura.Per risolvere i problemi precedenti, sono necessarie le seguenti modifiche:
Eseguire il buffer dei dati in ingresso fino a quando non viene trovata una nuova riga.
Analizzare tutte le righe restituite nel buffer.
È possibile che la riga sia maggiore di 1 kB (1024 byte). Il codice deve ridimensionare il buffer di input fino a quando non viene trovato il delimitatore per adattare la riga completa all'interno del buffer.
Valutare l'uso del pool di buffer per evitare di allocare memoria ripetutamente.
Il seguente codice risolve alcuni di questi problemi:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Il codice precedente è complesso e non risolve tutti i problemi identificati. La rete ad alte prestazioni implica in genere la scrittura di codice complesso per ottimizzare le prestazioni. System.IO.Pipelines
è stato progettato per semplificare la scrittura di questo tipo di codice.
La classe Pipe può essere usata per creare una coppia PipeWriter/PipeReader
. Tutti i dati scritti in PipeWriter
sono disponibili in PipeReader
:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Esistono due cicli:
FillPipeAsync
legge da Socket
e scrive in PipeWriter
.ReadPipeAsync
legge da PipeReader
e analizza le righe in ingresso.Non sono stati allocati buffer espliciti. Tutta la gestione del buffer viene delegata alle implementazioni PipeReader
e PipeWriter
. La delega della gestione del buffer semplifica l'utilizzo del codice per concentrarsi esclusivamente sulla logica di business.
Nel primo ciclo:
PipeWriter
la quantità di dati scritti nel buffer.PipeReader
.Nel secondo ciclo, PipeReader
utilizza i buffer scritti da PipeWriter
. I buffer provengono dal socket. Chiamata a PipeReader.ReadAsync
:
Restituisce un oggetto ReadResult contenente due informazioni importanti:
ReadOnlySequence<byte>
.IsCompleted
che indica se è stata raggiunta la fine dei dati (EOF).Dopo aver trovato il delimitatore di fine riga (EOL) e aver analizzato la riga:
PipeReader.AdvanceTo
viene chiamato per indicare a PipeReader
la quantità di dati utilizzati ed esaminati.I cicli del lettore e del writer terminano chiamando Complete
. Complete
consente alla pipe sottostante di rilasciare la memoria allocata.
Idealmente, le operazioni di lettura e analisi funzionano insieme:
In genere, l'analisi richiede più tempo rispetto alla semplice copia di blocchi di dati dalla rete:
Per ottenere prestazioni ottimali, è necessario trovare un equilibrio tra pause frequenti e allocazione di più memoria.
Per risolvere il problema precedente, Pipe
dispone di due impostazioni per controllare il flusso di dati:
PipeWriter.FlushAsync
.ValueTask<FlushResult>
quando la quantità di dati nell'oggetto Pipe
interseca PauseWriterThreshold
.ValueTask<FlushResult>
quando diventa inferiore a ResumeWriterThreshold
.Due valori vengono usati per evitare il ciclo rapido, che può verificarsi se viene usato un valore.
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
In genere, quando si usano async
e await
, il codice asincrono riprende su un oggetto TaskScheduler o sull'oggetto corrente SynchronizationContext.
Quando si esegue l'I/O, è importante avere un controllo dettagliato sulla posizione in cui viene eseguito l'I/O. Questo controllo consente di sfruttare in modo efficace le cache della CPU. Una memorizzazione nella cache efficiente è fondamentale per le app ad alte prestazioni come i server Web. PipeScheduler fornisce il controllo sulla posizione in cui vengono eseguiti i callback asincroni. Per impostazione predefinita:
SynchronizationContext
, usa il pool di thread per eseguire i callback.public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool è l'implementazione di PipeScheduler che accoda i callback nel pool di thread. PipeScheduler.ThreadPool
è il valore predefinito e in genere la scelta migliore. PipeScheduler.Inline può causare conseguenze impreviste, ad esempio deadlock.
Spesso è efficace riutilizzare l'oggetto Pipe
. Per reimpostare la pipe, chiamare PipeReader Reset al termine di PipeReader
e PipeWriter
.
PipeReader gestisce la memoria per conto del chiamante. Chiamare sempre PipeReader.AdvanceTo dopo aver chiamato PipeReader.ReadAsync. Ciò consente a PipeReader
di sapere quando il chiamante viene eseguito con la memoria in modo che possa essere monitorato. L'oggetto ReadOnlySequence<byte>
restituito da PipeReader.ReadAsync
è valido solo fino alla chiamata a PipeReader.AdvanceTo
. Non è consentito usare ReadOnlySequence<byte>
dopo aver chiamato PipeReader.AdvanceTo
.
PipeReader.AdvanceTo
accetta due argomenti SequencePosition:
Contrassegnare i dati come utilizzati significa che la pipe può restituire la memoria al pool di buffer sottostante. Contrassegnare i dati come osservati controlla le operazioni della chiamata successiva a PipeReader.ReadAsync
. Contrassegnare tutto come osservato significa che la chiamata successiva a PipeReader.ReadAsync
non restituirà risultati fino a quando non sono presenti altri dati scritti nella pipe. Qualsiasi altro valore farà sì che la chiamata successiva a PipeReader.ReadAsync
restituisca immediatamente i dati osservati e non osservati, ma non i dati che sono già stati consumati.
Esistono due modelli tipici che emergono quando si tenta di leggere i dati di streaming:
Negli esempi seguenti viene usato il metodo TryParseLines
per analizzare i messaggi da un oggetto ReadOnlySequence<byte>
. TryParseLines
analizza un singolo messaggio e aggiorna il buffer di input per tagliare il messaggio analizzato dal buffer. TryParseLines
non fa parte di .NET, è un metodo scritto dall'utente usato nelle sezioni seguenti.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Il codice seguente legge un singolo messaggio da un oggetto PipeReader
e lo restituisce al chiamante.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Il codice precedente:
SequencePosition
e esaminato SequencePosition
in modo da puntare all'inizio del buffer di input tagliato.I due argomenti SequencePosition
vengono aggiornati perché TryParseLines
rimuove il messaggio analizzato dal buffer di input. In genere, quando si analizza un singolo messaggio dal buffer, la posizione esaminata deve essere una delle seguenti:
Il caso di messaggio singolo presenta il maggior rischio di errori. Passare i valori errati in esaminati può comportare un'eccezione di memoria insufficiente o un ciclo infinito. Per ottenere ulteriori informazioni, vedere la sezione Problemi comuni di PipeReader in questo articolo.
Il codice seguente legge tutti i messaggi da un oggetto PipeReader
e chiama ProcessMessageAsync
su ciascuno di essi.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader.ReadAsync
:
CancellationToken
viene annullato mentre è presente una lettura in sospeso.PipeReader.CancelPendingRead
fa sì che la chiamata corrente o successiva a PipeReader.ReadAsync
restituisca un oggetto ReadResult con IsCanceled
impostato su true
. Ciò può essere utile per interrompere il ciclo di lettura esistente in modo non distruttivo e non eccezionale.private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Il passaggio dei valori errati a consumed
o examined
può comportare la lettura di dati già letti.
Il passaggio di buffer.End
in esaminato può comportare:
PipeReader.AdvanceTo(position, buffer.End)
quando si elabora un singolo messaggio alla volta dal buffer.Il passaggio dei valori errati a consumed
o examined
può comportare un ciclo infinito. Ad esempio, PipeReader.AdvanceTo(buffer.Start)
se buffer.Start
non è stato modificato, la chiamata successiva a PipeReader.ReadAsync
verrà restituita immediatamente prima dell'arrivo dei nuovi dati.
Il passaggio dei valori errati a consumed
o examined
può comportare un buffer infinito (OOM finale).
L'uso di ReadOnlySequence<byte>
dopo la chiamata PipeReader.AdvanceTo
può causare un danneggiamento della memoria (uso dopo il rilascio).
Se non si riesce a chiamare PipeReader.Complete/CompleteAsync
, potrebbe verificarsi una perdita di memoria.
Controllare ReadResult.IsCompleted e uscire dalla logica di lettura prima dell'elaborazione del buffer comportano la perdita di dati. La condizione di uscita del ciclo deve essere basata su ReadResult.Buffer.IsEmpty
e ReadResult.IsCompleted
. L'esecuzione errata di questa operazione potrebbe comportare un ciclo infinito.
❌Perdita di dati
ReadResult
può restituire il segmento finale dei dati quando IsCompleted
è impostato su true
. La mancata lettura dei dati prima dell'uscita dal ciclo di lettura comporterà la perdita di dati.
Avviso
Non usare il codice seguente. L'uso di questo esempio comporterà la perdita di dati, i blocchi, i problemi di sicurezza; pertanto, non deve essere copiato. L'esempio seguente viene fornito per spiegare i problemi comuni di PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Avviso
Non usare il codice precedente. L'uso di questo esempio comporterà la perdita di dati, i blocchi, i problemi di sicurezza; pertanto, non deve essere copiato. L'esempio seguente viene fornito per spiegare i problemi comuni di PipeReader.
❌Cicli infiniti
La logica seguente può comportare un ciclo infinito se l'oggetto Result.IsCompleted
è true
ma non è mai presente un messaggio completo nel buffer.
Avviso
Non usare il codice seguente. L'uso di questo esempio comporterà la perdita di dati, i blocchi, i problemi di sicurezza; pertanto, non deve essere copiato. L'esempio seguente viene fornito per spiegare i problemi comuni di PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Avviso
Non usare il codice precedente. L'uso di questo esempio comporterà la perdita di dati, i blocchi, i problemi di sicurezza; pertanto, non deve essere copiato. L'esempio seguente viene fornito per spiegare i problemi comuni di PipeReader.
Ecco un altro frammento di codice che presenta lo stesso problema. Verifica la presenza di un buffer non vuoto prima di controllare ReadResult.IsCompleted
. Poiché si trova in un oggetto else if
, il ciclo verrà ripetuto all'infinito se non è mai presente un messaggio completo nel buffer.
Avviso
Non usare il codice seguente. L'uso di questo esempio comporterà la perdita di dati, i blocchi, i problemi di sicurezza; pertanto, non deve essere copiato. L'esempio seguente viene fornito per spiegare i problemi comuni di PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Avviso
Non usare il codice precedente. L'uso di questo esempio comporterà la perdita di dati, i blocchi, i problemi di sicurezza; pertanto, non deve essere copiato. L'esempio seguente viene fornito per spiegare i problemi comuni di PipeReader.
❌Applicazione non rispondente
La chiamata PipeReader.AdvanceTo
senza condizioni con buffer.End
in posizioneexamined
può comportare la mancata risposta dell'applicazione durante l'analisi di un singolo messaggio. La chiamata successiva a PipeReader.AdvanceTo
non verrà restituita fino a quando:
Avviso
Non usare il codice seguente. L'uso di questo esempio comporterà la perdita di dati, i blocchi, i problemi di sicurezza; pertanto, non deve essere copiato. L'esempio seguente viene fornito per spiegare i problemi comuni di PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Avviso
Non usare il codice precedente. L'uso di questo esempio comporterà la perdita di dati, i blocchi, i problemi di sicurezza; pertanto, non deve essere copiato. L'esempio seguente viene fornito per spiegare i problemi comuni di PipeReader.
❌Memoria insufficiente (OOM)
Con le seguenti condizioni, il codice seguente mantiene il buffering fino a quando non si verifica un oggetto OutOfMemoryException:
PipeReader
non creano un messaggio completo. Ad esempio, non crea un messaggio completo perché l'altro lato sta scrivendo un messaggio di grandi dimensioni (ad esempio, un messaggio da 4 GB).Avviso
Non usare il codice seguente. L'uso di questo esempio comporterà la perdita di dati, i blocchi, i problemi di sicurezza; pertanto, non deve essere copiato. L'esempio seguente viene fornito per spiegare i problemi comuni di PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Avviso
Non usare il codice precedente. L'uso di questo esempio comporterà la perdita di dati, i blocchi, i problemi di sicurezza; pertanto, non deve essere copiato. L'esempio seguente viene fornito per spiegare i problemi comuni di PipeReader.
❌Danneggiamento della memoria
Durante la scrittura di helper che leggono il buffer, è necessario copiare qualsiasi payload restituito prima di chiamare Advance
. Nell'esempio seguente viene restituita la memoria eliminata da Pipe
e che può riutilizzarla per l'operazione successiva (lettura/scrittura).
Avviso
Non usare il codice seguente. L'uso di questo esempio comporterà la perdita di dati, i blocchi, i problemi di sicurezza; pertanto, non deve essere copiato. L'esempio seguente viene fornito per spiegare i problemi comuni di PipeReader.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Avviso
Non usare il codice precedente. L'uso di questo esempio comporterà la perdita di dati, i blocchi, i problemi di sicurezza; pertanto, non deve essere copiato. L'esempio seguente viene fornito per spiegare i problemi comuni di PipeReader.
PipeWriter gestisce i buffer per la scrittura per conto del chiamante. PipeWriter
implementa IBufferWriter<byte>
. IBufferWriter<byte>
consente di ottenere l'accesso ai buffer per eseguire operazioni di scrittura senza copie di buffer aggiuntive.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Il codice precedente:
PipeWriter
utilizzando GetMemory."Hello"
nell'oggetto Memory<byte>
restituito.PipeWriter
, che invia i byte al dispositivo sottostante.Il precedente metodo di scrittura usa il buffer fornito da PipeWriter
. Potrebbe anche essere stato usato PipeWriter.WriteAsync, che:
PipeWriter
.GetSpan
, Advance
in base alle esigenze e chiama FlushAsync.async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
FlushAsync supporta il passaggio di un oggetto CancellationToken. Se il token viene annullato mentre è presente uno scaricamento in sospeso, passare CancellationToken
risulterà in OperationCanceledException
. PipeWriter.FlushAsync
supporta un modo per annullare l'operazione di scaricamento corrente tramite PipeWriter.CancelPendingFlush senza generare un'eccezione. La chiamata PipeWriter.CancelPendingFlush
fa sì che la chiamata corrente o successiva a PipeWriter.FlushAsync
o PipeWriter.WriteAsync
restituisca un oggetto FlushResult con IsCanceled
impostato su true
. Ciò può essere utile per interrompere lo scaricamento della resa in modo non distruttivo e non eccezionale.
GetMemory
o GetSpan
mentre c'è una chiamata incompleta a FlushAsync
non è sicura.Complete
o CompleteAsync
mentre sono presenti dati non scaricati può causare un danneggiamento della memoria.I suggerimenti seguenti consentono di usare correttamente le classi System.IO.Pipelines:
await
PipeWriter.FlushAsync durante la scrittura e controllare sempre FlushResult.IsCompleted. Interrompere la scrittura se IsCompleted
è true
, poiché ciò indica che il lettore è completato e non si preoccupa più di ciò che viene scritto.PipeReader
abbia accesso.FlushAsync
se il lettore non può iniziare fino al termine di FlushAsync
, perché ciò potrebbe causare un deadlock.PipeReader
o PipeWriter
oppure che vi acceda. Questi tipi non sono thread-safe.AdvanceTo
o completato l'oggetto PipeReader
.IDuplexPipe è un contratto per i tipi che supportano sia la lettura che la scrittura. Ad esempio, una connessione di rete sarebbe rappresentata da un oggetto IDuplexPipe
.
A differenza di Pipe
, che contiene PipeReader
e PipeWriter
, IDuplexPipe
rappresenta un singolo lato di una connessione duplex completa. Ciò significa che ciò che viene scritto in PipeWriter
non verrà letto da PipeReader
.
Durante la lettura o la scrittura di dati di flusso, in genere si leggono i dati usando un deserializzatore e si scrivono dati usando un serializzatore. La maggior parte di queste API di flusso di lettura e scrittura ha un parametro Stream
. Per semplificare l'integrazione con queste API esistenti, PipeReader
e PipeWriter
espongono un metodo AsStream. AsStream restituisce un'implementazione Stream
intorno a PipeReader
o PipeWriter
.
Le istanze PipeReader
e PipeWriter
possono essere create usando i metodi statici Create
forniti da un oggetto Stream e le opzioni di creazione corrispondenti facoltative.
StreamPipeReaderOptions consente di controllare la creazione dell'istanza PipeReader
con i parametri seguenti:
4096
.PipeReader
e l'impostazione predefinita è false
.1024
.MemoryPool<byte>
utilizzato per l'allocazione della memoria e l'impostazione predefinita è null
.StreamPipeWriterOptions consente di controllare la creazione dell'istanza PipeWriter
con i parametri seguenti:
PipeWriter
e l'impostazione predefinita è false
.4096
.MemoryPool<byte>
utilizzato per l'allocazione della memoria e l'impostazione predefinita è null
.Importante
Quando si creano le istanze PipeReader
e PipeWriter
si usano i metodi Create
, è necessario prendere in considerazione la durata dell'oggetto Stream
. Se è necessario accedere al flusso dopo che il lettore o il writer ha finito di utilizzarlo, sarà necessario impostare il flag LeaveOpen
su true
per le opzioni di creazione. In caso contrario, il flusso verrà chiuso.
Il codice seguente illustra la creazione di istanze PipeReader
e PipeWriter
usando i metodi Create
di un flusso.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
L'applicazione usa un oggetto StreamReader per leggere il file lorem-ipsum.txt come flusso e deve terminare con una riga vuota. Viene FileStream passato a PipeReader.Create, che crea un'istanza di un oggetto PipeReader
. L'applicazione console passa quindi il flusso di output standard a PipeWriter.Create utilizzando Console.OpenStandardOutput(). L'esempio supporta l'annullamento.
Feedback su .NET
.NET è un progetto di open source. Selezionare un collegamento per fornire feedback:
Formazione
Modulo
Connettere i comandi in una pipeline - Training
In questo modulo si apprenderà come connettere i comandi in una pipeline.
Documentazione
I/O ad alte prestazioni con System.IO.Pipelines
System.IO.Pipelines è nato dal lavoro svolto dal team di .NET Core per semplificare l'esecuzione di operazioni di I/O ad alte prestazioni in .NET.In questo episodio, è possibile visualizzare lo show Diymets (@Pakrym) e David Fowler (@davidfowl) per offrire una panoramica del funzionamento del modello di programmazione Pipelines, oltre a mostrare alcune demo su come usare l'API.[00:26] - Qual è la logica di System.IO.Pipelines?[02:10] - Confronto delle prestazioni tra pipe e flussi[04:17] - Problemi relativi
Altre informazioni su: Usare buffer in .NET