Szkolenie
Moduł
Łączenie poleceń z potokiem - Training
W tym module dowiesz się, jak połączyć polecenia z potokiem.
Ta przeglądarka nie jest już obsługiwana.
Przejdź na przeglądarkę Microsoft Edge, aby korzystać z najnowszych funkcji, aktualizacji zabezpieczeń i pomocy technicznej.
System.IO.Pipelines jest biblioteką zaprojektowaną tak, aby ułatwić wykonywanie operacji we/wy o wysokiej wydajności na platformie .NET. Jest to biblioteka przeznaczona dla platformy .NET Standard, która działa we wszystkich implementacjach platformy .NET.
Biblioteka jest dostępna w pakiecie NuGet System.IO.Pipelines .
Aplikacje, które analizują dane przesyłane strumieniowo, składają się ze standardowego kodu o wielu wyspecjalizowanych i nietypowych przepływach kodu. Standardowy i specjalny kod przypadku jest złożony i trudny do utrzymania.
System.IO.Pipelines
został zaprojektowany w celu:
Poniższy kod jest typowy dla serwera TCP, który odbiera komunikaty rozdzielane wierszami (rozdzielane przez '\n'
) z klienta:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Powyższy kod ma kilka problemów:
ReadAsync
.stream.ReadAsync
stream.ReadAsync
Zwraca ilość odczytanych danych.ReadAsync
wywołaniu.byte
.Aby rozwiązać powyższe problemy, wymagane są następujące zmiany:
Buforuj dane przychodzące do momentu znalezienia nowego wiersza.
Przeanalizuj wszystkie wiersze zwrócone w buforze.
Możliwe, że wiersz jest większy niż 1 KB (1024 bajty). Kod musi zmienić rozmiar buforu wejściowego do momentu znalezienia ogranicznika w celu dopasowania do kompletnego wiersza wewnątrz buforu.
Rozważ użycie buforowania, aby uniknąć wielokrotnego przydzielania pamięci.
Poniższy kod rozwiązuje niektóre z tych problemów:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Poprzedni kod jest złożony i nie dotyczy wszystkich zidentyfikowanych problemów. Sieć o wysokiej wydajności zwykle oznacza pisanie złożonego kodu w celu zmaksymalizowania wydajności. System.IO.Pipelines
został zaprojektowany tak, aby ułatwić pisanie tego typu kodu.
Klasa Pipe może służyć do tworzenia PipeWriter/PipeReader
pary. Wszystkie dane zapisane w obiekcie PipeWriter
są dostępne w pliku PipeReader
:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Istnieją dwie pętle:
FillPipeAsync
odczytuje dane z pliku Socket
i zapisu w pliku PipeWriter
.ReadPipeAsync
odczytuje z PipeReader
wierszy przychodzących i analizuje je.Nie przydzielono jawnych. Wszystkie zarządzanie jest delegowane do PipeReader
implementacji i PipeWriter
. Delegowanie zarządzania ułatwia korzystanie z kodu wyłącznie na logice biznesowej.
W pierwszej pętli:
PipeWriter
w celu sprawdzenia, ile danych zostało zapisanych w buforze.PipeReader
W drugiej pętli program PipeReader
zużywa zapisywane przez PipeWriter
program . pochodzą z gniazda. Wywołanie metody :PipeReader.ReadAsync
Zwraca element ReadResult zawierający dwa ważne informacje:
ReadOnlySequence<byte>
.IsCompleted
wskazująca, czy osiągnięto koniec danych (EOF).Po znalezieniu ogranicznika końca wiersza (EOL) i przeanalizowaniu wiersza:
PipeReader.AdvanceTo
jest wywoływany, aby poinformować, PipeReader
ile danych zostało wykorzystanych i zbadanych.Pętle czytnika i modułu zapisywania kończą się wywołaniem metody Complete
. Complete
umożliwia bazowej potokowi zwolnienie pamięci przydzielonej.
W idealnym przypadku czytanie i analizowanie współpracują ze sobą:
Zazwyczaj analizowanie zajmuje więcej czasu niż tylko kopiowanie bloków danych z sieci:
W celu uzyskania optymalnej wydajności istnieje równowaga między częstymi wstrzymaniami i przydzielaniem większej ilości pamięci.
Aby rozwiązać powyższy problem, Pipe
ma dwa ustawienia umożliwiające sterowanie przepływem danych:
PipeWriter.FlushAsync
.ValueTask<FlushResult>
element, gdy ilość danych w Pipe
przecięciu PauseWriterThreshold
.ValueTask<FlushResult>
się, gdy staje się niższy niż ResumeWriterThreshold
.Dwie wartości są używane w celu zapobiegania szybkiemu cyklowi rowerowemu, które mogą wystąpić, jeśli jedna wartość jest używana.
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
Zwykle w przypadku używania i async
await
kod asynchroniczny jest wznawiany w TaskScheduler bieżącym SynchronizationContextobiekcie lub .
Podczas wykonywania operacji we/wy ważne jest, aby mieć precyzyjną kontrolę nad miejscem wykonywania operacji we/wy. Ta kontrolka umożliwia efektywne korzystanie z pamięci podręcznych procesora CPU. Wydajne buforowanie ma kluczowe znaczenie dla aplikacji o wysokiej wydajności, takich jak serwery internetowe. PipeScheduler zapewnia kontrolę nad tym, gdzie są uruchamiane wywołania zwrotne asynchroniczne. Domyślnie:
SynchronizationContext
elementu , używa puli wątków do uruchamiania wywołań zwrotnych.public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool to implementacja PipeScheduler , która kolejkuje wywołania zwrotne do puli wątków. PipeScheduler.ThreadPool
jest domyślnym i ogólnie najlepszym wyborem. PipeScheduler.Inline może powodować niezamierzone konsekwencje, takie jak zakleszczenia.
Często wydajne jest ponowne użycie Pipe
obiektu. Aby zresetować potok, wywołaj metodę PipeReader Reset po zakończeniu PipeReader
operacji i PipeWriter
.
PipeReader zarządza pamięcią w imieniu wywołującego. Zawsze dzwonij PipeReader.AdvanceTo po wywołaniu metody PipeReader.ReadAsync. Informuje o tym, PipeReader
kiedy obiekt wywołujący jest wykonywany z pamięcią, dzięki czemu można go śledzić. Zwrócony ReadOnlySequence<byte>
element jest PipeReader.ReadAsync
prawidłowy tylko do momentu wywołania PipeReader.AdvanceTo
metody . Jest to niedozwolone do użycia ReadOnlySequence<byte>
po wywołaniu metody PipeReader.AdvanceTo
.
PipeReader.AdvanceTo
przyjmuje dwa SequencePosition argumenty:
Oznaczanie danych jako wykorzystanych oznacza, że potok może zwrócić pamięć do bazowej puli. Oznaczanie danych obserwowanych kontroluje, co robi następne wywołanie PipeReader.ReadAsync
. Oznaczanie wszystkiego, co zaobserwowano oznacza, że następne wywołanie PipeReader.ReadAsync
metody nie zwróci się, dopóki nie będzie więcej danych zapisanych w potoku. Każda inna wartość spowoduje natychmiastowe wywołanie, aby PipeReader.ReadAsync
zwrócić dane obserwowane i nieobserwowane, ale nie dane, które zostały już zużyte.
Istnieje kilka typowych wzorców, które pojawiają się podczas próby odczytu danych przesyłanych strumieniowo:
W poniższych przykładach użyto TryParseLines
metody analizowania komunikatów z klasy ReadOnlySequence<byte>
. TryParseLines
Analizuje pojedynczy komunikat i aktualizuje bufor wejściowy w celu przycinania przeanalizowanego komunikatu z buforu. TryParseLines
nie jest częścią platformy .NET, jest to metoda napisana przez użytkownika używana w poniższych sekcjach.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Poniższy kod odczytuje pojedynczy komunikat z obiektu PipeReader
i zwraca go do obiektu wywołującego.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Powyższy kod ma następujące działanie:
SequencePosition
i zbadane SequencePosition
, aby wskazać początek przyciętego buforu wejściowego.Dwa SequencePosition
argumenty są aktualizowane, ponieważ TryParseLines
usuwa przeanalizowany komunikat z buforu wejściowego. Ogólnie rzecz biorąc, podczas analizowania pojedynczego komunikatu z buforu zbadane stanowisko powinno być jednym z następujących elementów:
Pojedynczy przypadek komunikatu ma największy potencjał błędów. Przekazanie nieprawidłowych wartości do zbadania może spowodować wyjątek braku pamięci lub nieskończoną pętlę. Aby uzyskać więcej informacji, zobacz sekcję Typowe problemy PipeReader w tym artykule.
Poniższy kod odczytuje wszystkie komunikaty z obiektu PipeReader
i wywołuje poszczególne wywołania ProcessMessageAsync
.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader.ReadAsync
:
CancellationToken
element zostanie anulowany podczas oczekiwania na odczyt.PipeReader.ReadAsync
z ustawioną wartością true
ReadResult IsCanceled
.PipeReader.CancelPendingRead
Może to być przydatne w przypadku zatrzymania istniejącej pętli odczytu w sposób niedrukcyjny i nietypowy.private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Przekazywanie nieprawidłowych wartości do consumed
lub examined
może spowodować odczytywanie już odczytanych danych.
Przekazanie buffer.End
w wyniku zbadania może spowodować:
PipeReader.AdvanceTo(position, buffer.End)
podczas przetwarzania pojedynczego komunikatu w czasie z buforu.Przekazywanie nieprawidłowych wartości do consumed
lub examined
może spowodować nieskończoną pętlę. Na przykład PipeReader.AdvanceTo(buffer.Start)
, jeśli buffer.Start
nie zmieniono, spowoduje, że następne wywołanie , aby powrócić PipeReader.ReadAsync
bezpośrednio przed nadejściem nowych danych.
Przekazywanie nieprawidłowych wartości do consumed
lub examined
może spowodować nieskończone buforowanie (ostateczne OOM).
ReadOnlySequence<byte>
Użycie metody po wywołaniu PipeReader.AdvanceTo
może spowodować uszkodzenie pamięci (użycie po wolnym użyciu).
Niepowodzenie wywołania PipeReader.Complete/CompleteAsync
może spowodować wyciek pamięci.
Sprawdzanie ReadResult.IsCompleted i zamykanie logiki odczytu przed przetworzeniem buforu powoduje utratę danych. Warunek zakończenia pętli powinien być oparty na elementach ReadResult.Buffer.IsEmpty
i ReadResult.IsCompleted
. Wykonanie tego błędnie może spowodować nieskończoną pętlę.
❌Utrata danych
Element ReadResult
może zwrócić końcowy segment danych, gdy IsCompleted
jest ustawiony na true
wartość . Nie można odczytać tych danych przed zamknięciem pętli odczytu, co spowoduje utratę danych.
Ostrzeżenie
NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Ostrzeżenie
Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.
❌Pętla nieskończona
Poniższa logika może spowodować nieskończoną pętlę, jeśli Result.IsCompleted
element jest true
, ale nigdy nie ma pełnego komunikatu w buforze.
Ostrzeżenie
NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Ostrzeżenie
Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.
Oto kolejny fragment kodu z tym samym problemem. Sprawdza on, czy bufor nie jest pusty przed sprawdzeniem ReadResult.IsCompleted
wartości . Ponieważ znajduje się w elemecie else if
, pętla będzie zawsze występować, jeśli w buforze nigdy nie ma pełnego komunikatu.
Ostrzeżenie
NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Ostrzeżenie
Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.
❌Aplikacja nie odpowiada
Bezwarunkowe wywoływanie PipeReader.AdvanceTo
elementu w examined
buffer.End
pozycji może spowodować, że aplikacja przestaje odpowiadać podczas analizowania pojedynczego komunikatu. Następne wywołanie metody PipeReader.AdvanceTo
nie zostanie zwrócone do czasu:
Ostrzeżenie
NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Ostrzeżenie
Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.
❌Brak pamięci (OOM)
W następujących warunkach następujący kod będzie buforowy do momentu OutOfMemoryException wystąpienia:
PipeReader
. Na przykład nie tworzy pełnego komunikatu, ponieważ druga strona zapisuje duży komunikat (na przykład komunikat o rozmiarze 4 GB).Ostrzeżenie
NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Ostrzeżenie
Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.
❌Uszkodzenie pamięci
Podczas zapisywania pomocników odczytujących bufor należy skopiować dowolny zwrócony ładunek przed wywołaniem metody Advance
. W poniższym przykładzie zostanie zwrócona pamięć, która Pipe
została odrzucona i może zostać ponownie użyta do następnej operacji (odczyt/zapis).
Ostrzeżenie
NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Ostrzeżenie
Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.
Zarządza PipeWriter do zapisywania w imieniu wywołującego. PipeWriter
implementuje IBufferWriter<byte>
. IBufferWriter<byte>
umożliwia uzyskanie dostępu do w celu wykonywania operacji zapisu bez dodatkowych kopii buforu.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Poprzedni kod:
PipeWriter
obiektu using GetMemory."Hello"
ASCII zwracanego Memory<byte>
elementu .PipeWriter
, który wysyła bajty do urządzenia bazowego.Poprzednia metoda zapisu używa dostarczonych przez PipeWriter
program . Można również użyć polecenia PipeWriter.WriteAsync, które:
PipeWriter
GetSpan
metodę , Advance
zgodnie z potrzebami i wywołuje metodę FlushAsync.async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
FlushAsync program obsługuje przekazywanie elementu CancellationToken. Przekazywanie CancellationToken
wyników w OperationCanceledException
przypadku anulowania tokenu podczas oczekiwania na opróżnienie. PipeWriter.FlushAsync
obsługuje sposób anulowania bieżącej operacji opróżniania za pośrednictwem PipeWriter.CancelPendingFlush bez zgłaszania wyjątku. Wywołanie powoduje, że bieżące lub następne wywołanie metody lub PipeWriter.WriteAsync
zwraca element z ustawioną wartością true
IsCanceled
FlushResult .PipeWriter.CancelPendingFlush
PipeWriter.FlushAsync
Może to być przydatne do zatrzymania wypłukiwania w sposób niedrukcyjny i nietypowy.
GetMemory
lub GetSpan
gdy istnieje niekompletne połączenie FlushAsync
, nie jest bezpieczne.Complete
lub CompleteAsync
w przypadku braku pływu danych może spowodować uszkodzenie pamięci.Poniższe porady pomogą Ci pomyślnie użyć System.IO.Pipelines klas:
await
PipeWriter.FlushAsync Okresowo podczas pisania i zawsze sprawdzaj wartość FlushResult.IsCompleted. Przerwij pisanie, jeśli IsCompleted
ma wartość true
, ponieważ oznacza to, że czytelnik jest ukończony i nie dba już o to, co zostało napisane.PipeReader
mieć dostęp.FlushAsync
, jeśli czytelnik nie może rozpocząć do FlushAsync
zakończenia, ponieważ może to spowodować zakleszczenie.PipeReader
lub PipeWriter
lub uzyskuje do nich dostęp. Te typy nie są bezpieczne wątkowo.AdvanceTo
lub ukończeniu .PipeReader
Jest IDuplexPipe to umowa dla typów, które obsługują zarówno odczytywanie, jak i pisanie. Na przykład połączenie sieciowe będzie reprezentowane przez element IDuplexPipe
.
W przeciwieństwie do Pipe
elementu , który zawiera element PipeReader
i PipeWriter
, IDuplexPipe
reprezentuje jedną stronę pełnego połączenia dwustronnego. Oznacza to, że to, co jest zapisywane w obiekcie PipeWriter
, nie będzie odczytywane z elementu PipeReader
.
Podczas odczytywania lub zapisywania danych strumienia zwykle odczytuje się dane przy użyciu desemizatora i zapisu danych przy użyciu serializatora. Większość z tych interfejsów API odczytu i zapisu strumienia ma Stream
parametr . Aby ułatwić integrację z tymi istniejącymi interfejsami PipeReader
API i PipeWriter
uwidocznić metodę AsStream . AsStream zwraca implementację Stream
wokół klasy PipeReader
lub PipeWriter
.
PipeReader
wystąpienia i PipeWriter
można tworzyć przy użyciu metod statycznych Create
przy użyciu Stream obiektu i opcjonalnych odpowiednich opcji tworzenia.
Zezwalaj StreamPipeReaderOptions na kontrolę nad tworzeniem PipeReader
wystąpienia przy użyciu następujących parametrów:
4096
.PipeReader
, a wartość domyślna to false
.1024
.MemoryPool<byte>
jest używany podczas przydzielania pamięci, a wartość domyślna to null
.Zezwalaj StreamPipeWriterOptions na kontrolę nad tworzeniem PipeWriter
wystąpienia przy użyciu następujących parametrów:
PipeWriter
, a wartość domyślna to false
.4096
wartość .MemoryPool<byte>
jest używany podczas przydzielania pamięci, a wartość domyślna to null
.Ważne
Podczas tworzenia PipeReader
wystąpień i PipeWriter
przy użyciu Create
metod należy wziąć pod uwagę Stream
okres istnienia obiektu. Jeśli potrzebujesz dostępu do strumienia po zakończeniu działania czytnika lub składnika zapisywania, musisz ustawić flagę LeaveOpen
na true
wartość w opcjach tworzenia. W przeciwnym razie strumień zostanie zamknięty.
Poniższy kod demonstruje tworzenie PipeReader
wystąpień i PipeWriter
przy użyciu Create
metod ze strumienia.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
Aplikacja używa elementu do StreamReader odczytywania pliku lorem-ipsum.txt jako strumienia i musi kończyć się pustym wierszem. Element FileStream jest przekazywany do PipeReader.Createobiektu , który tworzy wystąpienie PipeReader
obiektu. Następnie aplikacja konsolowa przekazuje standardowy strumień wyjściowy do PipeWriter.Create programu Console.OpenStandardOutput(). Przykład obsługuje anulowanie.
Opinia o produkcie .NET
.NET to projekt typu open source. Wybierz link, aby przekazać opinię:
Szkolenie
Moduł
Łączenie poleceń z potokiem - Training
W tym module dowiesz się, jak połączyć polecenia z potokiem.