Udostępnij za pośrednictwem


System.IO.Pipelines na platformie .NET

System.IO.Pipelines jest biblioteką zaprojektowaną tak, aby ułatwić wykonywanie operacji we/wy o wysokiej wydajności na platformie .NET. Jest to biblioteka przeznaczona dla platformy .NET Standard, która działa we wszystkich implementacjach platformy .NET.

Biblioteka jest dostępna w pakiecie NuGet System.IO.Pipelines .

Jaki problem rozwiązuje system.IO.Pipelines

Aplikacje, które analizują dane przesyłane strumieniowo, składają się ze standardowego kodu o wielu wyspecjalizowanych i nietypowych przepływach kodu. Standardowy i specjalny kod przypadku jest złożony i trudny do utrzymania.

System.IO.Pipelines został zaprojektowany w celu:

  • Mają wysoką wydajność analizowania danych przesyłanych strumieniowo.
  • Zmniejsz złożoność kodu.

Poniższy kod jest typowy dla serwera TCP, który odbiera komunikaty rozdzielane wierszami (rozdzielane przez '\n') z klienta:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

Powyższy kod ma kilka problemów:

  • Cała wiadomość (koniec wiersza) może nie zostać odebrana w jednym wywołaniu metody ReadAsync.
  • Ignoruje wynik .stream.ReadAsync stream.ReadAsync Zwraca ilość odczytanych danych.
  • Nie obsługuje przypadku, w którym wiele wierszy jest odczytywanych w jednym ReadAsync wywołaniu.
  • Przydziela tablicę z każdym odczytem byte .

Aby rozwiązać powyższe problemy, wymagane są następujące zmiany:

  • Buforuj dane przychodzące do momentu znalezienia nowego wiersza.

  • Przeanalizuj wszystkie wiersze zwrócone w buforze.

  • Możliwe, że wiersz jest większy niż 1 KB (1024 bajty). Kod musi zmienić rozmiar buforu wejściowego do momentu znalezienia ogranicznika w celu dopasowania do kompletnego wiersza wewnątrz buforu.

    • Jeśli rozmiar buforu zostanie zmieniony, więcej kopii buforu zostanie wykonanych w miarę wyświetlania dłuższych wierszy w danych wejściowych.
    • Aby zmniejszyć zmarnowane miejsce, skompaktuj bufor używany do odczytywania wierszy.
  • Rozważ użycie buforowania buforów, aby uniknąć wielokrotnego przydzielania pamięci.

  • Poniższy kod rozwiązuje niektóre z tych problemów:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

Poprzedni kod jest złożony i nie dotyczy wszystkich zidentyfikowanych problemów. Sieć o wysokiej wydajności zwykle oznacza pisanie złożonego kodu w celu zmaksymalizowania wydajności. System.IO.Pipelines został zaprojektowany tak, aby ułatwić pisanie tego typu kodu.

Rury

Klasa Pipe może służyć do tworzenia PipeWriter/PipeReader pary. Wszystkie dane zapisane w obiekcie PipeWriter są dostępne w pliku PipeReader:

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Podstawowe użycie potoku

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

Istnieją dwie pętle:

  • FillPipeAsync odczytuje dane z pliku Socket i zapisu w pliku PipeWriter.
  • ReadPipeAsync odczytuje z PipeReader wierszy przychodzących i analizuje je.

Nie przydzielono jawnych buforów. Wszystkie zarządzanie buforami jest delegowane do PipeReader implementacji i PipeWriter . Delegowanie zarządzania buforami ułatwia korzystanie z kodu wyłącznie na logice biznesowej.

W pierwszej pętli:

W drugiej pętli program PipeReader zużywa bufory zapisywane przez PipeWriterprogram . Bufory pochodzą z gniazda. Wywołanie metody :PipeReader.ReadAsync

  • Zwraca element ReadResult zawierający dwa ważne informacje:

    • Dane, które zostały odczytane w postaci ReadOnlySequence<byte>.
    • Wartość logiczna IsCompleted wskazująca, czy osiągnięto koniec danych (EOF).

Po znalezieniu ogranicznika końca wiersza (EOL) i przeanalizowaniu wiersza:

  • Logika przetwarza bufor, aby pominąć już przetworzone dane.
  • PipeReader.AdvanceTo jest wywoływany, aby poinformować, PipeReader ile danych zostało wykorzystanych i zbadanych.

Pętle czytnika i modułu zapisywania kończą się wywołaniem metody Complete. Complete umożliwia bazowej potokowi zwolnienie pamięci przydzielonej.

Backpressure i sterowanie przepływem

W idealnym przypadku czytanie i analizowanie współpracują ze sobą:

  • Wątek odczytu zużywa dane z sieci i umieszcza je w buforach.
  • Wątek analizy jest odpowiedzialny za konstruowanie odpowiednich struktur danych.

Zazwyczaj analizowanie zajmuje więcej czasu niż tylko kopiowanie bloków danych z sieci:

  • Wątek odczytu jest przed wątkiem analizowania.
  • Wątek odczytu musi zwolnić lub przydzielić więcej pamięci do przechowywania danych dla wątku analizowania.

W celu uzyskania optymalnej wydajności istnieje równowaga między częstymi wstrzymaniami i przydzielaniem większej ilości pamięci.

Aby rozwiązać powyższy problem, Pipe ma dwa ustawienia umożliwiające sterowanie przepływem danych:

Diagram with ResumeWriterThreshold and PauseWriterThreshold

PipeWriter.FlushAsync:

  • Zwraca niekompletny ValueTask<FlushResult> element, gdy ilość danych w Pipe przecięciu PauseWriterThreshold.
  • Kończy ValueTask<FlushResult> się, gdy staje się niższy niż ResumeWriterThreshold.

Dwie wartości są używane w celu zapobiegania szybkiemu cyklowi rowerowemu, które mogą wystąpić, jeśli jedna wartość jest używana.

Przykłady

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

Zwykle w przypadku używania i asyncawaitkod asynchroniczny jest wznawiany w TaskScheduler bieżącym SynchronizationContextobiekcie lub .

Podczas wykonywania operacji we/wy ważne jest, aby mieć precyzyjną kontrolę nad miejscem wykonywania operacji we/wy. Ta kontrolka umożliwia efektywne korzystanie z pamięci podręcznych procesora CPU. Wydajne buforowanie ma kluczowe znaczenie dla aplikacji o wysokiej wydajności, takich jak serwery internetowe. PipeScheduler zapewnia kontrolę nad tym, gdzie są uruchamiane wywołania zwrotne asynchroniczne. Domyślnie:

  • Jest używany bieżący SynchronizationContext .
  • Jeśli nie ma SynchronizationContextelementu , używa puli wątków do uruchamiania wywołań zwrotnych.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPool to implementacja PipeScheduler , która kolejkuje wywołania zwrotne do puli wątków. PipeScheduler.ThreadPool jest domyślnym i ogólnie najlepszym wyborem. PipeScheduler.Inline może powodować niezamierzone konsekwencje, takie jak zakleszczenia.

Resetowanie potoku

Często wydajne jest ponowne użycie Pipe obiektu. Aby zresetować potok, wywołaj metodę PipeReaderReset po zakończeniu PipeReader operacji i PipeWriter .

PipeReader

PipeReader zarządza pamięcią w imieniu wywołującego. Zawsze dzwonijPipeReader.AdvanceTo po wywołaniu metody PipeReader.ReadAsync. Informuje o tym, PipeReader kiedy obiekt wywołujący jest wykonywany z pamięcią, dzięki czemu można go śledzić. Zwrócony ReadOnlySequence<byte> element jest PipeReader.ReadAsync prawidłowy tylko do momentu wywołania PipeReader.AdvanceTometody . Jest to niedozwolone do użycia ReadOnlySequence<byte> po wywołaniu metody PipeReader.AdvanceTo.

PipeReader.AdvanceTo przyjmuje dwa SequencePosition argumenty:

  • Pierwszy argument określa, ile pamięci zostało zużyte.
  • Drugi argument określa, ile buforu zaobserwowano.

Oznaczanie danych jako wykorzystanych oznacza, że potok może zwrócić pamięć do bazowej puli buforów. Oznaczanie danych obserwowanych kontroluje, co robi następne wywołanie PipeReader.ReadAsync . Oznaczanie wszystkiego, co zaobserwowano oznacza, że następne wywołanie PipeReader.ReadAsync metody nie zwróci się, dopóki nie będzie więcej danych zapisanych w potoku. Każda inna wartość spowoduje natychmiastowe wywołanie, aby PipeReader.ReadAsync zwrócić dane obserwowane i nieobserwowane, ale nie dane, które zostały już zużyte.

Scenariusze odczytu danych przesyłanych strumieniowo

Istnieje kilka typowych wzorców, które pojawiają się podczas próby odczytu danych przesyłanych strumieniowo:

  • Biorąc pod uwagę strumień danych, przeanalizuj pojedynczy komunikat.
  • Biorąc pod uwagę strumień danych, przeanalizuj wszystkie dostępne komunikaty.

W poniższych przykładach użyto TryParseLines metody analizowania komunikatów z klasy ReadOnlySequence<byte>. TryParseLines Analizuje pojedynczy komunikat i aktualizuje bufor wejściowy w celu przycinania przeanalizowanego komunikatu z buforu. TryParseLines nie jest częścią platformy .NET, jest to metoda napisana przez użytkownika używana w poniższych sekcjach.

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Odczytywanie pojedynczej wiadomości

Poniższy kod odczytuje pojedynczy komunikat z obiektu PipeReader i zwraca go do obiektu wywołującego.

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

Powyższy kod ma następujące działanie:

  • Analizuje pojedynczy komunikat.
  • Aktualizacje zużyte SequencePosition i zbadaneSequencePosition, aby wskazać początek przyciętego buforu wejściowego.

Dwa SequencePosition argumenty są aktualizowane, ponieważ TryParseLines usuwa przeanalizowany komunikat z buforu wejściowego. Ogólnie rzecz biorąc, podczas analizowania pojedynczego komunikatu z buforu zbadane stanowisko powinno być jednym z następujących elementów:

  • Koniec wiadomości.
  • Koniec odebranego buforu, jeśli nie znaleziono żadnego komunikatu.

Pojedynczy przypadek komunikatu ma największy potencjał błędów. Przekazanie nieprawidłowych wartości do zbadania może spowodować wyjątek braku pamięci lub nieskończoną pętlę. Aby uzyskać więcej informacji, zobacz sekcję Typowe problemy PipeReader w tym artykule.

Odczytywanie wielu komunikatów

Poniższy kod odczytuje wszystkie komunikaty z obiektu PipeReader i wywołuje poszczególne wywołania ProcessMessageAsync .

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Anulowanie

PipeReader.ReadAsync:

  • Obsługuje przekazywanie elementu CancellationToken.
  • Zgłasza błąd OperationCanceledException , jeśli CancellationToken element zostanie anulowany podczas oczekiwania na odczyt.
  • Obsługuje sposób anulowania bieżącej operacji odczytu za pomocą PipeReader.CancelPendingReadpolecenia , co pozwala uniknąć zgłaszania wyjątku. Wywołanie powoduje, że bieżące lub następne wywołanie zwraca element PipeReader.ReadAsync z ustawioną wartością trueReadResultIsCanceled .PipeReader.CancelPendingRead Może to być przydatne w przypadku zatrzymania istniejącej pętli odczytu w sposób niedrukcyjny i nietypowy.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Typowe problemy z usługą PipeReader

  • Przekazywanie nieprawidłowych wartości do consumed lub examined może spowodować odczytywanie już odczytanych danych.

  • Przekazanie buffer.End w wyniku zbadania może spowodować:

    • Zatrzymane dane
    • Prawdopodobnie ewentualny wyjątek braku pamięci (OOM), jeśli dane nie są używane. Na przykład PipeReader.AdvanceTo(position, buffer.End) podczas przetwarzania pojedynczego komunikatu w czasie z buforu.
  • Przekazywanie nieprawidłowych wartości do consumed lub examined może spowodować nieskończoną pętlę. Na przykład PipeReader.AdvanceTo(buffer.Start) , jeśli buffer.Start nie zmieniono, spowoduje, że następne wywołanie , aby powrócić PipeReader.ReadAsync bezpośrednio przed nadejściem nowych danych.

  • Przekazywanie nieprawidłowych wartości do consumed lub examined może spowodować nieskończone buforowanie (ostateczne OOM).

  • ReadOnlySequence<byte> Użycie metody po wywołaniu PipeReader.AdvanceTo może spowodować uszkodzenie pamięci (użycie po wolnym użyciu).

  • Niepowodzenie wywołania PipeReader.Complete/CompleteAsync może spowodować wyciek pamięci.

  • Sprawdzanie ReadResult.IsCompleted i zamykanie logiki odczytu przed przetworzeniem buforu powoduje utratę danych. Warunek zakończenia pętli powinien być oparty na elementach ReadResult.Buffer.IsEmpty i ReadResult.IsCompleted. Wykonanie tego błędnie może spowodować nieskończoną pętlę.

Problematyczny kod

Utrata danych

Element ReadResult może zwrócić końcowy segment danych, gdy IsCompleted jest ustawiony na truewartość . Nie można odczytać tych danych przed zamknięciem pętli odczytu, co spowoduje utratę danych.

Ostrzeżenie

NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Ostrzeżenie

Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.

Pętla nieskończona

Poniższa logika może spowodować nieskończoną pętlę, jeśli Result.IsCompleted element jest true , ale nigdy nie ma pełnego komunikatu w buforze.

Ostrzeżenie

NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Ostrzeżenie

Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.

Oto kolejny fragment kodu z tym samym problemem. Sprawdza on, czy bufor nie jest pusty przed sprawdzeniem ReadResult.IsCompletedwartości . Ponieważ znajduje się w elemecie else if, pętla będzie zawsze występować, jeśli w buforze nigdy nie ma pełnego komunikatu.

Ostrzeżenie

NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Ostrzeżenie

Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.

Aplikacja nie odpowiada

Bezwarunkowe wywoływanie PipeReader.AdvanceTo elementu w examinedbuffer.End pozycji może spowodować, że aplikacja przestaje odpowiadać podczas analizowania pojedynczego komunikatu. Następne wywołanie metody PipeReader.AdvanceTo nie zostanie zwrócone do czasu:

  • W potoku jest więcej danych zapisanych w potoku.
  • Nowe dane nie zostały wcześniej zbadane.

Ostrzeżenie

NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Ostrzeżenie

Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.

Brak pamięci (OOM)

W następujących warunkach następujący kod będzie buforowy do momentu OutOfMemoryException wystąpienia:

  • Nie ma maksymalnego rozmiaru komunikatu.
  • Dane zwrócone z elementu nie tworzą kompletnego komunikatu PipeReader . Na przykład nie tworzy pełnego komunikatu, ponieważ druga strona zapisuje duży komunikat (na przykład komunikat o rozmiarze 4 GB).

Ostrzeżenie

NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Ostrzeżenie

Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.

Uszkodzenie pamięci

Podczas zapisywania pomocników odczytujących bufor należy skopiować dowolny zwrócony ładunek przed wywołaniem metody Advance. W poniższym przykładzie zostanie zwrócona pamięć, która Pipe została odrzucona i może zostać ponownie użyta do następnej operacji (odczyt/zapis).

Ostrzeżenie

NIE używaj następującego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Poniższy przykład został przedstawiony w celu wyjaśnienia typowych problemów usługi PipeReader.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Ostrzeżenie

Nie używaj powyższego kodu. Użycie tego przykładu spowoduje utratę danych, zawieszanie się, problemy z zabezpieczeniami i nie powinny być kopiowane. Powyższy przykład jest dostarczany w celu wyjaśnienia typowych problemów PipeReader.

PipeWriter

Zarządza PipeWriter buforami do zapisywania w imieniu wywołującego. PipeWriter implementuje IBufferWriter<byte>. IBufferWriter<byte> umożliwia uzyskanie dostępu do buforów w celu wykonywania operacji zapisu bez dodatkowych kopii buforu.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

Poprzedni kod:

  • Żąda buforu o rozmiarze co najmniej 5 bajtów z PipeWriter obiektu using GetMemory.
  • Zapisuje bajty dla ciągu "Hello" ASCII zwracanego Memory<byte>elementu .
  • Wywołania Advance wskazujące liczbę bajtów zapisanych w buforze.
  • Opróżnia element PipeWriter, który wysyła bajty do urządzenia bazowego.

Poprzednia metoda zapisu używa buforów dostarczonych przez PipeWriterprogram . Można również użyć polecenia PipeWriter.WriteAsync, które:

  • Kopiuje istniejący bufor do .PipeWriter
  • Wywołuje GetSpanmetodę , Advance zgodnie z potrzebami i wywołuje metodę FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

Anulowanie

FlushAsync program obsługuje przekazywanie elementu CancellationToken. Przekazywanie CancellationToken wyników w OperationCanceledException przypadku anulowania tokenu podczas oczekiwania na opróżnienie. PipeWriter.FlushAsync obsługuje sposób anulowania bieżącej operacji opróżniania za pośrednictwem PipeWriter.CancelPendingFlush bez zgłaszania wyjątku. Wywołanie powoduje, że bieżące lub następne wywołanie metody lub PipeWriter.WriteAsync zwraca element z ustawioną wartością trueIsCanceledFlushResult .PipeWriter.CancelPendingFlushPipeWriter.FlushAsync Może to być przydatne do zatrzymania wypłukiwania w sposób niedrukcyjny i nietypowy.

Typowe problemy z potokami PipeWriter

  • GetSpan i GetMemory zwraca bufor z co najmniej żądaną ilością pamięci. Nie zakładaj dokładnych rozmiarów buforu.
  • Nie ma gwarancji, że kolejne wywołania będą zwracać ten sam bufor lub bufor o takim samym rozmiarze.
  • Po wywołaniu Advance polecenia należy zażądać nowego buforu, aby kontynuować zapisywanie większej ilości danych. Nie można zapisać wcześniej uzyskanego buforu.
  • Nawiązywanie połączenia GetMemory lub GetSpan gdy istnieje niekompletne połączenie FlushAsync , nie jest bezpieczne.
  • Wywoływanie Complete lub CompleteAsync w przypadku braku pływu danych może spowodować uszkodzenie pamięci.

Wskazówki do używania elementów PipeReader i PipeWriter

Poniższe porady pomogą Ci pomyślnie użyć System.IO.Pipelines klas:

  • Zawsze należy ukończyć potoki PipeReader i PipeWriter, łącznie z wyjątkiem, jeśli ma to zastosowanie.
  • Zawsze dzwonij PipeReader.AdvanceTo po wywołaniu metody PipeReader.ReadAsync.
  • awaitPipeWriter.FlushAsync Okresowo podczas pisania i zawsze sprawdzaj wartość FlushResult.IsCompleted. Przerwij pisanie, jeśli IsCompleted ma wartość true, ponieważ oznacza to, że czytelnik jest ukończony i nie dba już o to, co zostało napisane.
  • Wykonaj połączenie PipeWriter.FlushAsync po napisaniu czegoś, do czego chcesz PipeReader mieć dostęp.
  • Nie należy wywoływać FlushAsync , jeśli czytelnik nie może rozpocząć do FlushAsync zakończenia, ponieważ może to spowodować zakleszczenie.
  • Upewnij się, że tylko jeden kontekst jest właścicielem elementu PipeReader lub PipeWriter lub uzyskuje do nich dostęp. Te typy nie są bezpieczne wątkowo.
  • Nigdy nie uzyskaj dostępu do elementu ReadResult.Buffer po wywołaniu AdvanceTo lub ukończeniu .PipeReader

IDuplexPipe

Jest IDuplexPipe to umowa dla typów, które obsługują zarówno odczytywanie, jak i pisanie. Na przykład połączenie sieciowe będzie reprezentowane przez element IDuplexPipe.

W przeciwieństwie do Pipeelementu , który zawiera element PipeReader i PipeWriter, IDuplexPipe reprezentuje jedną stronę pełnego połączenia dwustronnego. Oznacza to, że to, co jest zapisywane w obiekcie PipeWriter , nie będzie odczytywane z elementu PipeReader.

Strumienie

Podczas odczytywania lub zapisywania danych strumienia zwykle odczytuje się dane przy użyciu desemizatora i zapisu danych przy użyciu serializatora. Większość z tych interfejsów API odczytu i zapisu strumienia ma Stream parametr . Aby ułatwić integrację z tymi istniejącymi interfejsami PipeReader API i PipeWriter uwidocznić metodę AsStream . AsStream zwraca implementację Stream wokół klasy PipeReader lub PipeWriter.

Przykład usługi Stream

PipeReader wystąpienia i PipeWriter można tworzyć przy użyciu metod statycznych Create przy użyciu Stream obiektu i opcjonalnych odpowiednich opcji tworzenia.

Zezwalaj StreamPipeReaderOptions na kontrolę nad tworzeniem PipeReader wystąpienia przy użyciu następujących parametrów:

Zezwalaj StreamPipeWriterOptions na kontrolę nad tworzeniem PipeWriter wystąpienia przy użyciu następujących parametrów:

Ważne

Podczas tworzenia PipeReader wystąpień i PipeWriter przy użyciu Create metod należy wziąć pod uwagę Stream okres istnienia obiektu. Jeśli potrzebujesz dostępu do strumienia po zakończeniu działania czytnika lub składnika zapisywania, musisz ustawić flagę LeaveOpen na true wartość w opcjach tworzenia. W przeciwnym razie strumień zostanie zamknięty.

Poniższy kod demonstruje tworzenie PipeReader wystąpień i PipeWriter przy użyciu Create metod ze strumienia.

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

Aplikacja używa elementu do StreamReader odczytywania pliku lorem-ipsum.txt jako strumienia i musi kończyć się pustym wierszem. Element FileStream jest przekazywany do PipeReader.Createobiektu , który tworzy wystąpienie PipeReader obiektu. Następnie aplikacja konsolowa przekazuje standardowy strumień wyjściowy do PipeWriter.Create programu Console.OpenStandardOutput(). Przykład obsługuje anulowanie.