System.IO.Pipelines na platformie .NET
System.IO.Pipelines jest biblioteką zaprojektowaną tak, aby ułatwić wykonywanie operacji we/wy o wysokiej wydajności na platformie .NET. Jest to biblioteka przeznaczona dla platformy .NET Standard, która działa we wszystkich implementacjach platformy .NET.
Biblioteka jest dostępna w pakiecie NuGet System.IO.Pipelines .
Jaki problem rozwiązuje system.IO.Pipelines
Aplikacje, które analizują dane przesyłane strumieniowo, składają się ze standardowego kodu o wielu wyspecjalizowanych i nietypowych przepływach kodu. Standardowy i specjalny kod przypadku jest złożony i trudny do utrzymania.
System.IO.Pipelines
został zaprojektowany w celu:
- Mają wysoką wydajność analizowania danych przesyłanych strumieniowo.
- Zmniejsz złożoność kodu.
Poniższy kod jest typowy dla serwera TCP, który odbiera komunikaty rozdzielane wierszami (rozdzielane przez '\n'
) z klienta:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Powyższy kod ma kilka problemów:
- Cała wiadomość (koniec wiersza) może nie zostać odebrana w jednym wywołaniu metody
ReadAsync
. - Ignoruje wynik .
stream.ReadAsync
stream.ReadAsync
Zwraca ilość odczytanych danych. - Nie obsługuje przypadku, w którym wiele wierszy jest odczytywanych w jednym
ReadAsync
wywołaniu. - Przydziela tablicę z każdym odczytem
byte
.
Aby rozwiązać powyższe problemy, wymagane są następujące zmiany:
Buforuj dane przychodzące do momentu znalezienia nowego wiersza.
Przeanalizuj wszystkie wiersze zwrócone w buforze.
Możliwe, że wiersz jest większy niż 1 KB (1024 bajty). Kod musi zmienić rozmiar buforu wejściowego do momentu znalezienia ogranicznika w celu dopasowania do kompletnego wiersza wewnątrz buforu.
- Jeśli rozmiar buforu zostanie zmieniony, więcej kopii buforu zostanie wykonanych w miarę wyświetlania dłuższych wierszy w danych wejściowych.
- Aby zmniejszyć zmarnowane miejsce, skompaktuj bufor używany do odczytywania wierszy.
Rozważ użycie buforowania, aby uniknąć wielokrotnego przydzielania pamięci.
Poniższy kod rozwiązuje niektóre z tych problemów:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Poprzedni kod jest złożony i nie dotyczy wszystkich zidentyfikowanych problemów. Sieć o wysokiej wydajności zwykle oznacza pisanie złożonego kodu w celu zmaksymalizowania wydajności. System.IO.Pipelines
został zaprojektowany tak, aby ułatwić pisanie tego typu kodu.
Fajka
Klasa Pipe może służyć do tworzenia PipeWriter/PipeReader
pary. Wszystkie dane zapisane w obiekcie PipeWriter
są dostępne w pliku PipeReader
:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
Podstawowe użycie potoku
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Istnieją dwie pętle:
FillPipeAsync
odczytuje dane z plikuSocket
i zapisu w plikuPipeWriter
.ReadPipeAsync
odczytuje zPipeReader
wierszy przychodzących i analizuje je.
Nie przydzielono jawnych. Wszystkie zarządzanie jest delegowane do PipeReader
implementacji i PipeWriter
. Delegowanie zarządzania ułatwia korzystanie z kodu wyłącznie na logice biznesowej.
W pierwszej pętli:
- PipeWriter.GetMemory(Int32) polecenie jest wywoływane w celu pobrania pamięci z bazowego składnika zapisywania.
- PipeWriter.Advance(Int32) jest wywoływana
PipeWriter
w celu sprawdzenia, ile danych zostało zapisanych w buforze. - PipeWriter.FlushAsyncelement jest wywoływany w celu udostępnienia danych .
PipeReader
W drugiej pętli program PipeReader
zużywa zapisywane przez PipeWriter
program . pochodzą z gniazda. Wywołanie metody :PipeReader.ReadAsync
Zwraca element ReadResult zawierający dwa ważne informacje:
- Dane, które zostały odczytane w postaci
ReadOnlySequence<byte>
. - Wartość logiczna
IsCompleted
wskazująca, czy osiągnięto koniec danych (EOF).
- Dane, które zostały odczytane w postaci
Po znalezieniu ogranicznika końca wiersza (EOL) i przeanalizowaniu wiersza:
- Logika przetwarza bufor, aby pominąć już przetworzone dane.
PipeReader.AdvanceTo
jest wywoływany, aby poinformować,PipeReader
ile danych zostało wykorzystanych i zbadanych.
Pętle czytnika i modułu zapisywania kończą się wywołaniem metody Complete
. Complete
umożliwia bazowej potokowi zwolnienie pamięci przydzielonej.
Backpressure i sterowanie przepływem
W idealnym przypadku czytanie i analizowanie współpracują ze sobą:
- Wątek odczytu zużywa dane z sieci i umieszcza je w.
- Wątek analizy jest odpowiedzialny za konstruowanie odpowiednich struktur danych.
Zazwyczaj analizowanie zajmuje więcej czasu niż tylko kopiowanie bloków danych z sieci:
- Wątek odczytu jest przed wątkiem analizowania.
- Wątek odczytu musi zwolnić lub przydzielić więcej pamięci do przechowywania danych dla wątku analizowania.
W celu uzyskania optymalnej wydajności istnieje równowaga między częstymi wstrzymaniami i przydzielaniem większej ilości pamięci.
Aby rozwiązać powyższy problem, Pipe
ma dwa ustawienia umożliwiające sterowanie przepływem danych:
- PauseWriterThreshold: określa, ile danych należy buforować przed wywołaniami wstrzymywania FlushAsync .
- ResumeWriterThreshold: określa ilość danych, które czytelnik musi obserwować przed wywołaniami w celu wznowienia
PipeWriter.FlushAsync
.
- Zwraca niekompletny
ValueTask<FlushResult>
element, gdy ilość danych wPipe
przecięciuPauseWriterThreshold
. - Kończy
ValueTask<FlushResult>
się, gdy staje się niższy niżResumeWriterThreshold
.
Dwie wartości są używane w celu zapobiegania szybkiemu cyklowi rowerowemu, które mogą wystąpić, jeśli jedna wartość jest używana.
Przykłady
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
PipeScheduler
Zwykle w przypadku używania i async
await
kod asynchroniczny jest wznawiany w TaskScheduler bieżącym SynchronizationContextobiekcie lub .
Podczas wykonywania operacji we/wy ważne jest, aby mieć precyzyjną kontrolę nad miejscem wykonywania operacji we/wy. Ta kontrolka umożliwia efektywne korzystanie z pamięci podręcznych procesora CPU. Wydajne buforowanie ma kluczowe znaczenie dla aplikacji o wysokiej wydajności, takich jak serwery internetowe. PipeScheduler zapewnia kontrolę nad tym, gdzie są uruchamiane wywołania zwrotne asynchroniczne. Domyślnie:
- Jest używany bieżący SynchronizationContext .
- Jeśli nie ma
SynchronizationContext
elementu , używa puli wątków do uruchamiania wywołań zwrotnych.
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool to implementacja PipeScheduler , która kolejkuje wywołania zwrotne do puli wątków. PipeScheduler.ThreadPool
jest domyślnym i ogólnie najlepszym wyborem. PipeScheduler.Inline może powodować niezamierzone konsekwencje, takie jak zakleszczenia.
Resetowanie potoku
Często wydajne jest ponowne użycie Pipe
obiektu. Aby zresetować potok, wywołaj metodę PipeReader Reset po zakończeniu PipeReader
operacji i PipeWriter
.
PipeReader
PipeReader zarządza pamięcią w imieniu wywołującego. Zawsze dzwonij PipeReader.AdvanceTo po wywołaniu metody PipeReader.ReadAsync. Informuje o tym, PipeReader
kiedy obiekt wywołujący jest wykonywany z pamięcią, dzięki czemu można go śledzić. Zwrócony ReadOnlySequence<byte>
element jest PipeReader.ReadAsync
prawidłowy tylko do momentu wywołania PipeReader.AdvanceTo
metody . Jest to niedozwolone do użycia ReadOnlySequence<byte>
po wywołaniu metody PipeReader.AdvanceTo
.
PipeReader.AdvanceTo
przyjmuje dwa SequencePosition argumenty:
- Pierwszy argument określa, ile pamięci zostało zużyte.
- Drugi argument określa, ile buforu zaobserwowano.
Oznaczanie danych jako wykorzystanych oznacza, że potok może zwrócić pamięć do bazowej puli. Oznaczanie danych obserwowanych kontroluje, co robi następne wywołanie PipeReader.ReadAsync
. Oznaczanie wszystkiego, co zaobserwowano oznacza, że następne wywołanie PipeReader.ReadAsync
metody nie zwróci się, dopóki nie będzie więcej danych zapisanych w potoku. Każda inna wartość spowoduje natychmiastowe wywołanie, aby PipeReader.ReadAsync
zwrócić dane obserwowane i nieobserwowane, ale nie dane, które zostały już zużyte.
Scenariusze odczytu danych przesyłanych strumieniowo
Istnieje kilka typowych wzorców, które pojawiają się podczas próby odczytu danych przesyłanych strumieniowo:
- Biorąc pod uwagę strumień danych, przeanalizuj pojedynczy komunikat.
- Biorąc pod uwagę strumień danych, przeanalizuj wszystkie dostępne komunikaty.
W poniższych przykładach użyto TryParseLines
metody analizowania komunikatów z klasy ReadOnlySequence<byte>
. TryParseLines
Analizuje pojedynczy komunikat i aktualizuje bufor wejściowy w celu przycinania przeanalizowanego komunikatu z buforu. TryParseLines
nie jest częścią platformy .NET, jest to metoda napisana przez użytkownika używana w poniższych sekcjach.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Odczytywanie pojedynczej wiadomości
Poniższy kod odczytuje pojedynczy komunikat z obiektu PipeReader
i zwraca go do obiektu wywołującego.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Powyższy kod ma następujące działanie:
- Analizuje pojedynczy komunikat.
- Aktualizuje zużyte
SequencePosition
i zbadaneSequencePosition
, aby wskazać początek przyciętego buforu wejściowego.
Dwa SequencePosition
argumenty są aktualizowane, ponieważ TryParseLines
usuwa przeanalizowany komunikat z buforu wejściowego. Ogólnie rzecz biorąc, podczas analizowania pojedynczego komunikatu z buforu zbadane stanowisko powinno być jednym z następujących elementów:
- Koniec wiadomości.
- Koniec odebranego buforu, jeśli nie znaleziono żadnego komunikatu.
Pojedynczy przypadek komunikatu ma największy potencjał błędów. Przekazanie nieprawidłowych wartości do zbadania może spowodować wyjątek braku pamięci lub nieskończoną pętlę. Aby uzyskać więcej informacji, zobacz sekcję Typowe problemy PipeReader w tym artykule.
Odczytywanie wielu komunikatów
Poniższy kod odczytuje wszystkie komunikaty z obiektu PipeReader
i wywołuje poszczególne wywołania ProcessMessageAsync
.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Anulowanie
PipeReader.ReadAsync
:
- Obsługuje przekazywanie elementu CancellationToken.
- Zgłasza błąd OperationCanceledException , jeśli
CancellationToken
element zostanie anulowany podczas oczekiwania na odczyt. - Obsługuje sposób anulowania bieżącej operacji odczytu za pomocą PipeReader.CancelPendingReadpolecenia , co pozwala uniknąć zgłaszania wyjątku. Wywołanie powoduje, że bieżące lub następne wywołanie zwraca element
PipeReader.ReadAsync
z ustawioną wartościątrue
ReadResultIsCanceled
.PipeReader.CancelPendingRead
Może to być przydatne w przypadku zatrzymania istniejącej pętli odczytu w sposób niedrukcyjny i nietypowy.
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Typowe problemy z usługą PipeReader
Przekazywanie nieprawidłowych wartości do
consumed
lubexamined
może spowodować odczytywanie już odczytanych danych.Przekazanie
buffer.End
w wyniku zbadania może spowodować:- Zatrzymane dane
- Prawdopodobnie ewentualny wyjątek braku pamięci (OOM), jeśli dane nie są używane. Na przykład
PipeReader.AdvanceTo(position, buffer.End)
podczas przetwarzania pojedynczego komunikatu w czasie z buforu.
Przekazywanie nieprawidłowych wartości do
consumed
lubexamined
może spowodować nieskończoną pętlę. Na przykładPipeReader.AdvanceTo(buffer.Start)
, jeślibuffer.Start
nie zmieniono, spowoduje, że następne wywołanie , aby powrócićPipeReader.ReadAsync
bezpośrednio przed nadejściem nowych danych.Przekazywanie nieprawidłowych wartości do
consumed
lubexamined
może spowodować nieskończone buforowanie (ostateczne OOM).ReadOnlySequence<byte>
Użycie metody po wywołaniuPipeReader.AdvanceTo
może spowodować uszkodzenie pamięci (użycie po wolnym użyciu).Niepowodzenie wywołania
PipeReader.Complete/CompleteAsync
może spowodować wyciek pamięci.Sprawdzanie ReadResult.IsCompleted i zamykanie logiki odczytu przed przetworzeniem buforu powoduje utratę danych. Warunek zakończenia pętli powinien być oparty na elementach
ReadResult.Buffer.IsEmpty
iReadResult.IsCompleted
. Wykonanie tego błędnie może spowodować nieskończoną pętlę.
Problematyczny kod
❌Utrata danych
Element ReadResult
może zwrócić końcowy segment danych, gdy IsCompleted
jest ustawiony na true
wartość . Nie można odczytać tych danych przed zamknięciem pętli odczytu, co spowoduje utratę danych.
Ostrzeżenie
NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Ostrzeżenie
Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.
❌Pętla nieskończona
Poniższa logika może spowodować nieskończoną pętlę, jeśli Result.IsCompleted
element jest true
, ale nigdy nie ma pełnego komunikatu w buforze.
Ostrzeżenie
NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Ostrzeżenie
Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.
Oto kolejny fragment kodu z tym samym problemem. Sprawdza on, czy bufor nie jest pusty przed sprawdzeniem ReadResult.IsCompleted
wartości . Ponieważ znajduje się w elemecie else if
, pętla będzie zawsze występować, jeśli w buforze nigdy nie ma pełnego komunikatu.
Ostrzeżenie
NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Ostrzeżenie
Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.
❌Aplikacja nie odpowiada
Bezwarunkowe wywoływanie PipeReader.AdvanceTo
elementu w examined
buffer.End
pozycji może spowodować, że aplikacja przestaje odpowiadać podczas analizowania pojedynczego komunikatu. Następne wywołanie metody PipeReader.AdvanceTo
nie zostanie zwrócone do czasu:
- W potoku jest więcej danych zapisanych w potoku.
- Nowe dane nie zostały wcześniej zbadane.
Ostrzeżenie
NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Ostrzeżenie
Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.
❌Brak pamięci (OOM)
W następujących warunkach następujący kod będzie buforowy do momentu OutOfMemoryException wystąpienia:
- Nie ma maksymalnego rozmiaru komunikatu.
- Dane zwrócone z elementu nie tworzą kompletnego komunikatu
PipeReader
. Na przykład nie tworzy pełnego komunikatu, ponieważ druga strona zapisuje duży komunikat (na przykład komunikat o rozmiarze 4 GB).
Ostrzeżenie
NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Ostrzeżenie
Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.
❌Uszkodzenie pamięci
Podczas zapisywania pomocników odczytujących bufor należy skopiować dowolny zwrócony ładunek przed wywołaniem metody Advance
. W poniższym przykładzie zostanie zwrócona pamięć, która Pipe
została odrzucona i może zostać ponownie użyta do następnej operacji (odczyt/zapis).
Ostrzeżenie
NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Ostrzeżenie
Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.
PipeWriter
Zarządza PipeWriter do zapisywania w imieniu wywołującego. PipeWriter
implementuje IBufferWriter<byte>
. IBufferWriter<byte>
umożliwia uzyskanie dostępu do w celu wykonywania operacji zapisu bez dodatkowych kopii buforu.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Poprzedni kod:
- Żąda buforu o rozmiarze co najmniej 5 bajtów z
PipeWriter
obiektu using GetMemory. - Zapisuje bajty dla ciągu
"Hello"
ASCII zwracanegoMemory<byte>
elementu . - Wywołania Advance wskazujące liczbę bajtów zapisanych w buforze.
- Opróżnia element
PipeWriter
, który wysyła bajty do urządzenia bazowego.
Poprzednia metoda zapisu używa dostarczonych przez PipeWriter
program . Można również użyć polecenia PipeWriter.WriteAsync, które:
- Kopiuje istniejący bufor do .
PipeWriter
- Wywołuje
GetSpan
metodę ,Advance
zgodnie z potrzebami i wywołuje metodę FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
Anulowanie
FlushAsync program obsługuje przekazywanie elementu CancellationToken. Przekazywanie CancellationToken
wyników w OperationCanceledException
przypadku anulowania tokenu podczas oczekiwania na opróżnienie. PipeWriter.FlushAsync
obsługuje sposób anulowania bieżącej operacji opróżniania za pośrednictwem PipeWriter.CancelPendingFlush bez zgłaszania wyjątku. Wywołanie powoduje, że bieżące lub następne wywołanie metody lub PipeWriter.WriteAsync
zwraca element z ustawioną wartością true
IsCanceled
FlushResult .PipeWriter.CancelPendingFlush
PipeWriter.FlushAsync
Może to być przydatne do zatrzymania wypłukiwania w sposób niedrukcyjny i nietypowy.
Typowe problemy z potokami PipeWriter
- GetSpan i GetMemory zwraca bufor z co najmniej żądaną ilością pamięci. Nie zakładaj dokładnych rozmiarów buforu.
- Nie ma gwarancji, że kolejne wywołania będą zwracać ten sam bufor lub bufor o takim samym rozmiarze.
- Po wywołaniu Advance polecenia należy zażądać nowego buforu, aby kontynuować zapisywanie większej ilości danych. Nie można zapisać wcześniej uzyskanego buforu.
- Nawiązywanie połączenia
GetMemory
lubGetSpan
gdy istnieje niekompletne połączenieFlushAsync
, nie jest bezpieczne. - Wywoływanie
Complete
lubCompleteAsync
w przypadku braku pływu danych może spowodować uszkodzenie pamięci.
Porady dotyczące używania elementów PipeReader i PipeWriter
Poniższe porady pomogą Ci pomyślnie użyć System.IO.Pipelines klas:
- Zawsze należy ukończyć potoki PipeReader i PipeWriter, łącznie z wyjątkiem, jeśli ma to zastosowanie.
- Zawsze dzwonij PipeReader.AdvanceTo po wywołaniu metody PipeReader.ReadAsync.
await
PipeWriter.FlushAsync Okresowo podczas pisania i zawsze sprawdzaj wartość FlushResult.IsCompleted. Przerwij pisanie, jeśliIsCompleted
ma wartośćtrue
, ponieważ oznacza to, że czytelnik jest ukończony i nie dba już o to, co zostało napisane.- Wykonaj połączenie PipeWriter.FlushAsync po napisaniu czegoś, do czego chcesz
PipeReader
mieć dostęp. - Nie należy wywoływać
FlushAsync
, jeśli czytelnik nie może rozpocząć doFlushAsync
zakończenia, ponieważ może to spowodować zakleszczenie. - Upewnij się, że tylko jeden kontekst jest właścicielem elementu
PipeReader
lubPipeWriter
lub uzyskuje do nich dostęp. Te typy nie są bezpieczne wątkowo. - Nigdy nie uzyskaj dostępu do elementu ReadResult.Buffer po wywołaniu
AdvanceTo
lub ukończeniu .PipeReader
IDuplexPipe
Jest IDuplexPipe to umowa dla typów, które obsługują zarówno odczytywanie, jak i pisanie. Na przykład połączenie sieciowe będzie reprezentowane przez element IDuplexPipe
.
W przeciwieństwie do Pipe
elementu , który zawiera element PipeReader
i PipeWriter
, IDuplexPipe
reprezentuje jedną stronę pełnego połączenia dwustronnego. Oznacza to, że to, co jest zapisywane w obiekcie PipeWriter
, nie będzie odczytywane z elementu PipeReader
.
Strumienie
Podczas odczytywania lub zapisywania danych strumienia zwykle odczytuje się dane przy użyciu desemizatora i zapisu danych przy użyciu serializatora. Większość z tych interfejsów API odczytu i zapisu strumienia ma Stream
parametr . Aby ułatwić integrację z tymi istniejącymi interfejsami PipeReader
API i PipeWriter
uwidocznić metodę AsStream . AsStream zwraca implementację Stream
wokół klasy PipeReader
lub PipeWriter
.
Przykład usługi Stream
PipeReader
wystąpienia i PipeWriter
można tworzyć przy użyciu metod statycznych Create
przy użyciu Stream obiektu i opcjonalnych odpowiednich opcji tworzenia.
Zezwalaj StreamPipeReaderOptions na kontrolę nad tworzeniem PipeReader
wystąpienia przy użyciu następujących parametrów:
- StreamPipeReaderOptions.BufferSize to minimalny rozmiar buforu w bajtach używany podczas wynajmowania pamięci z puli, a wartość domyślna to
4096
. - StreamPipeReaderOptions.LeaveOpen flaga określa, czy podstawowy strumień jest pozostawiony otwarty po zakończeniu
PipeReader
, a wartość domyślna tofalse
. - StreamPipeReaderOptions.MinimumReadSize reprezentuje próg pozostałych bajtów w buforze przed przydzieleniu nowego buforu, a wartość domyślna to
1024
. - StreamPipeReaderOptions.Pool
MemoryPool<byte>
jest używany podczas przydzielania pamięci, a wartość domyślna tonull
.
Zezwalaj StreamPipeWriterOptions na kontrolę nad tworzeniem PipeWriter
wystąpienia przy użyciu następujących parametrów:
- StreamPipeWriterOptions.LeaveOpen flaga określa, czy podstawowy strumień jest pozostawiony otwarty po zakończeniu
PipeWriter
, a wartość domyślna tofalse
. - StreamPipeWriterOptions.MinimumBufferSize reprezentuje minimalny rozmiar buforu, który ma być używany podczas wynajmowania pamięci z Poolwartości , i domyślnie na
4096
wartość . - StreamPipeWriterOptions.Pool
MemoryPool<byte>
jest używany podczas przydzielania pamięci, a wartość domyślna tonull
.
Ważne
Podczas tworzenia PipeReader
wystąpień i PipeWriter
przy użyciu Create
metod należy wziąć pod uwagę Stream
okres istnienia obiektu. Jeśli potrzebujesz dostępu do strumienia po zakończeniu działania czytnika lub składnika zapisywania, musisz ustawić flagę LeaveOpen
na true
wartość w opcjach tworzenia. W przeciwnym razie strumień zostanie zamknięty.
Poniższy kod demonstruje tworzenie PipeReader
wystąpień i PipeWriter
przy użyciu Create
metod ze strumienia.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
Aplikacja używa elementu do StreamReader odczytywania pliku lorem-ipsum.txt jako strumienia i musi kończyć się pustym wierszem. Element FileStream jest przekazywany do PipeReader.Createobiektu , który tworzy wystąpienie PipeReader
obiektu. Następnie aplikacja konsolowa przekazuje standardowy strumień wyjściowy do PipeWriter.Create programu Console.OpenStandardOutput(). Przykład obsługuje anulowanie.