Обучение
Модуль
Подключение команд к конвейеру - Training
В этом модуле содержится информация о подключении команд к конвейеру.
Этот браузер больше не поддерживается.
Выполните обновление до Microsoft Edge, чтобы воспользоваться новейшими функциями, обновлениями для системы безопасности и технической поддержкой.
System.IO.Pipelines — это библиотека, предназначенная для упрощения работы с высокопроизводительной операцией ввода-вывода в .NET. Это библиотека, предназначенная для .NET Standard, которая работает во всех реализациях .NET.
Библиотека доступна в пакете Nuget System.IO.Pipelines .
Приложения, которые анализируют потоковые данные, состоят из стандартного кода, имеющего множество специализированных и необычных потоков кода. Стандартный и специальный код сложен, и его трудно поддерживать.
Возможности System.IO.Pipelines
:
Следующий код является типичным для TCP-сервера, который получает сообщения с разделенными строками (разделитель — '\n'
) от клиента:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
В предыдущем коде несколько проблем:
ReadAsync
.stream.ReadAsync
. stream.ReadAsync
возвращает объем считанных данных.ReadAsync
.byte
при каждом чтении.Чтобы устранить предыдущие проблемы, необходимо внести следующие изменения:
Помещение входящих данных в буфер до тех пор, пока не будет найдена новая строка.
Синтаксический анализ всех строк, возвращенных в буфер.
Возможно, длина строки превышает 1 КБ (1024 байт). Код должен изменять размер входного буфера до тех пор, пока не будет найден разделитель, чтобы вместить всю строку в буфере.
Рекомендуется использовать буферные пулы, чтобы избежать повторного выделения памяти.
В следующем коде решаются некоторые из этих проблем:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Предыдущий код является сложным и не предназначен для устранения всех обнаруженных проблем. Высокопроизводительная сеть обычно означает написание сложного кода для повышения производительности. System.IO.Pipelines
был разработан для упрощения написания этого типа кода.
Класс Pipe можно использовать для создания пары PipeWriter/PipeReader
. Все данные, записанные в PipeWriter
, доступны в PipeReader
:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Существует два цикла:
FillPipeAsync
считывает из Socket
и выполняет запись в PipeWriter
.ReadPipeAsync
считывает из PipeReader
и анализирует входящие строки.Буферы явно не выделяются. Управление всеми буферами делегируется реализациям PipeReader
и PipeWriter
. Делегирование управления буфером упрощает использование кода, чтобы сосредоточиться только на бизнес-логике.
В первом цикле:
PipeWriter
, сколько данных было записано в буфер.PipeReader
.Во втором цикле PipeReader
использует буферы, записанные PipeWriter
. Буферы поступают из сокета. Вызов PipeReader.ReadAsync
:
Возвращает ReadResult, который содержит два важных элемента информации:
ReadOnlySequence<byte>
.IsCompleted
, указывающее, достигнут ли конец данных (EOF).После нахождения разделителя конца строки (EOL) и синтаксического анализа строки:
PipeReader.AdvanceTo
вызывается, чтобы сообщить PipeReader
, сколько данных было обработано и проверено.Циклы чтения и записи заканчиваются вызовом Complete
. Complete
позволяет базовому каналу освободить выделенную память.
В идеале чтение и анализ работают вместе:
Обычно синтаксический анализ занимает больше времени, чем копирование блоков данных из сети:
Для оптимальной производительности необходим баланс между частыми паузами и выделением большего объема памяти.
Чтобы устранить описанную выше проблему, Pipe
имеет два параметра для управления потоком данных:
PipeWriter.FlushAsync
.ValueTask<FlushResult>
, если объем данных в Pipe
пересекает PauseWriterThreshold
.ValueTask<FlushResult>
, когда он становится меньше ResumeWriterThreshold
.Для предотвращения быстрого цикла, который может возникнуть при одном значении, используются два значения.
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
Обычно при использовании async
и асинхронном коде возобновляется либо в текущем, ни await
в TaskScheduler текущем SynchronizationContext.
При выполнении операций ввода-вывода важно иметь точный контроль над местом выполнения операций ввода-вывода. Этот контроль позволяет эффективно использовать кэш ЦП. Эффективное кэширование является критически важным для высокопроизводительных приложений, таких как веб-серверы. PipeScheduler предоставляет контроль над тем, где выполняются асинхронные обратные вызовы. По умолчанию:
SynchronizationContext
отсутствует, он использует пул потоков для выполнения обратных вызовов.public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool — это реализация PipeScheduler, которая позволяет поместить обратные вызовы в очередь пула потоков. PipeScheduler.ThreadPool
является параметром по умолчанию и, как правило, лучшим выбором. PipeScheduler.Inline может привести к непредвиденным последствиям, например к взаимоблокировкам.
Зачастую бывает эффективно повторно использовать объект Pipe
. Чтобы сбросить канал, вызовите PipeReader Reset при завершении PipeReader
и PipeWriter
.
PipeReader управляет памятью от имени вызывающего объекта. Всегда вызывайте PipeReader.AdvanceTo после вызова PipeReader.ReadAsync. Это позволяет PipeReader
узнать, когда вызывающий объект закончил использовать память, чтобы его можно было отследить. ReadOnlySequence<byte>
, возвращенный из PipeReader.ReadAsync
, допустим только до вызова PipeReader.AdvanceTo
. Использование ReadOnlySequence<byte>
после вызова PipeReader.AdvanceTo
недопустимо.
PipeReader.AdvanceTo
принимает два аргумента SequencePosition:
Если пометить данные как потребленные, канал сможет вернуть память в базовый буферный пул. Пометка данных как отмеченных управляет тем, что делает следующий вызов PipeReader.ReadAsync
. Пометка всех элементов как отмеченных означает, что следующий вызов PipeReader.ReadAsync
не вернется, пока в канал не будет записано больше данных. Любое другое значение заставит следующий вызов PipeReader.ReadAsync
вернуться немедленно с отмеченными и неотмеченными, но не с уже потребленными данными.
Существует несколько типичных шаблонов, которые возникают при попытке чтения потоковых данных:
В следующих примерах используется метод TryParseLines
для синтаксического анализа сообщений из ReadOnlySequence<byte>
. TryParseLines
анализирует одно сообщение и обновляет входной буфер, чтобы обрезать проанализированное сообщение из буфера. TryParseLines
не является частью .NET, это пользовательский письменный метод, используемый в следующих разделах.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Следующий код считывает одно сообщение из PipeReader
и возвращает его вызывающему объекту.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Предыдущий код:
SequencePosition
и изученный SequencePosition
, чтобы указать на начало обрезанного входного буфера.Два аргумента SequencePosition
обновляются, так как TryParseLines
удаляет проанализированное сообщение из входного буфера. Как правило, при синтаксическом анализе одного сообщения из буфера изученное расположение должно быть одним из следующих:
В случае с одним сообщением риск ошибок наиболее велик. Передача неверных значений в изученное может привести к исключению нехватки памяти или бесконечному циклу. Дополнительные сведения см. в разделе Распространенные проблемы PipeReader в этой статье.
Следующий код считывает все сообщения из PipeReader
и вызывает ProcessMessageAsync
для каждого из них.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader.ReadAsync
:
CancellationToken
отменяется при ожидании чтения.PipeReader.CancelPendingRead
приводит к тому, что текущий или следующий вызов PipeReader.ReadAsync
возвращает ReadResult с IsCanceled
со значением true
. Это может быть полезно для остановки существующего цикла чтения без сбоев и исключений.private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Передача неверных значений в consumed
или examined
может привести к считыванию уже считанных данных.
Передача buffer.End
как изученного может привести к следующему:
PipeReader.AdvanceTo(position, buffer.End)
при обработке одного сообщения за раз из буфера.Передача неверных значений в consumed
или examined
может привести к бесконечному циклу. Например, PipeReader.AdvanceTo(buffer.Start)
, если buffer.Start
не изменился, приведет к тому, что следующий вызов PipeReader.ReadAsync
вернется немедленно перед поступлением новых данных.
Передача неверных значений в consumed
или examined
может привести к бесконечной буферизации (и нехватке памяти в конечном итоге).
Использование ReadOnlySequence<byte>
после вызова PipeReader.AdvanceTo
может привести к повреждению памяти (используйте после освобождения).
Отсутствие вызова PipeReader.Complete/CompleteAsync
может привести к утечке памяти.
Проверка ReadResult.IsCompleted и выход из логики чтения до обработки буфера приводит к потере данных. Условие выхода цикла должно основываться на ReadResult.Buffer.IsEmpty
и ReadResult.IsCompleted
. Неправильное выполнение этого действия может привести к бесконечному циклу.
❌Потеря данных
ReadResult
может возвращать окончательный сегмент данных, если IsCompleted
имеет значение true
. Если не считать эти данные до выхода из цикла чтения, данные будут утеряны.
Предупреждение
НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Ниже приведен пример, поясняющий Распространенные проблемы с PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Предупреждение
НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Выше приведен пример, поясняющий Распространенные проблемы с PipeReader.
❌Бесконечный цикл
Следующая логика может привести к бесконечному циклу, если Result.IsCompleted
равно true
, но в буфере никогда нет полного сообщения.
Предупреждение
НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Ниже приведен пример, поясняющий Распространенные проблемы с PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Предупреждение
НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Выше приведен пример, поясняющий Распространенные проблемы с PipeReader.
Вот еще один фрагмент кода с такой же проблемой. Проверяется наличие непустого буфера перед проверкой ReadResult.IsCompleted
. Так как он находится в else if
, цикл будет повторяться бесконечно, если в буфере никогда не будет полного сообщения.
Предупреждение
НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Ниже приведен пример, поясняющий Распространенные проблемы с PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Предупреждение
НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Выше приведен пример, поясняющий Распространенные проблемы с PipeReader.
❌Неответственное приложение
Без каких-то условий вызов PipeReader.AdvanceTo
с buffer.End
examined
позицией может привести к тому, что приложение не отвечает при анализе одного сообщения. Следующий вызов PipeReader.AdvanceTo
вернет значение только в следующих условиях:
Предупреждение
НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Ниже приведен пример, поясняющий Распространенные проблемы с PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Предупреждение
НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Выше приведен пример, поясняющий Распространенные проблемы с PipeReader.
❌Нехватка памяти
В следующих случаях приведенный ниже код сохраняет буферизацию до тех пор, пока не произойдет OutOfMemoryException:
PipeReader
, не составляют полное сообщение. Например, они не составляют полное сообщение, поскольку другая сторона записывает большое сообщение (например, сообщение размером 4 ГБ).Предупреждение
НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Ниже приведен пример, поясняющий Распространенные проблемы с PipeReader.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Предупреждение
НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Выше приведен пример, поясняющий Распространенные проблемы с PipeReader.
❌Повреждение памяти
При написании вспомогательных методов, считывающих буфер, все возвращенные полезные данные должны быть скопированы перед вызовом Advance
. В следующем примере возвращается память, которая была удалена Pipe
и может использоваться повторно для следующей операции (чтение или запись).
Предупреждение
НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Ниже приведен пример, поясняющий Распространенные проблемы с PipeReader.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Предупреждение
НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Выше приведен пример, поясняющий Распространенные проблемы с PipeReader.
PipeWriter управляет буферами для записи от имени вызывающего объекта. PipeWriter
реализует IBufferWriter<byte>
. IBufferWriter<byte>
позволяет получить доступ к буферам для выполнения операций записи без дополнительных копий буфера.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Предыдущий код:
PipeWriter
с помощью GetMemory."Hello"
в возвращенный Memory<byte>
.PipeWriter
, который отправляет байты на базовое устройство.Предыдущий метод записи использует буферы, предоставленные PipeWriter
. Он также мог бы использовать PipeWriter.WriteAsync, что:
PipeWriter
.GetSpan
, выполняет Advance
надлежащим образом и вызывает FlushAsync.async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
FlushAsync поддерживает передачу CancellationToken. Передача CancellationToken
приводит к исключению OperationCanceledException
, если маркер отменяется в ожидании освобождения. PipeWriter.FlushAsync
поддерживает способ отмены текущей операции освобождения с помощью PipeWriter.CancelPendingFlush, не вызывая исключение. Вызов PipeWriter.CancelPendingFlush
приводит к тому, что текущий или следующий вызов PipeWriter.FlushAsync
или PipeWriter.WriteAsync
возвращает FlushResult с IsCanceled
со значением true
. Это может быть полезно для остановки освобождения без сбоев и исключений.
GetMemory
или GetSpan
при наличии незавершенного вызова FlushAsync
не является надежным.Complete
или CompleteAsync
при наличии неосвобожденных данных может привести к повреждению памяти.Следующие советы помогут вам успешно использовать System.IO.Pipelines классы:
await
PipeWriter.FlushAsync и всегда проверяйте FlushResult.IsCompleted. Прерывание записи, если IsCompleted
это true
означает, что читатель завершен и больше не заботится о том, что пишется.PipeReader
получить доступ.FlushAsync
, если читатель не может начать работу до FlushAsync
завершения, так как это может привести к взаимоблокировке.PipeReader
или PipeWriter
обращается к ним. Эти типы не являются потокобезопасными.AdvanceTo
или завершения PipeReader
работы.IDuplexPipe является контрактом для типов, поддерживающих чтение и запись. Например, сетевое подключение будет представлено IDuplexPipe
.
В отличие от Pipe
, который содержит PipeReader
и PipeWriter
, IDuplexPipe
представляет собой одну сторону полного дуплексного подключения. Это означает, что запись в PipeWriter
не будет считываться из PipeReader
.
При чтении или записи потоковых данных данные обычно считываются с помощью десериализатора и записываются с помощью сериализатора. Большая часть API потока чтения и записи имеет параметр Stream
. Чтобы упростить интеграцию с существующими API, PipeReader
и PipeWriter
предоставляют AsStream. AsStream возвращает реализацию Stream
на основе PipeReader
или PipeWriter
.
Экземпляры PipeReader
и PipeWriter
могут быть созданы с помощью статических методов Create
, для которых задан объект Stream и необязательные соответствующие параметры создания.
StreamPipeReaderOptions позволяют контролировать создание экземпляра PipeReader
со следующими параметрами:
4096
.PipeReader
, и его значение по умолчанию — false
.1024
.MemoryPool<byte>
, используемый при выделении памяти, а значение по умолчанию — null
.StreamPipeWriterOptions позволяют контролировать создание экземпляра PipeWriter
со следующими параметрами:
PipeWriter
, и его значение по умолчанию — false
.4096
.MemoryPool<byte>
, используемый при выделении памяти, а значение по умолчанию — null
.Важно!
При создании экземпляров PipeReader
и PipeWriter
с помощью методов Create
необходимо учитывать время существования объекта Stream
. Если требуется доступ к потоку после того, как модуль чтения или записи завершит работу, необходимо установить для флага LeaveOpen
значение true
для параметров создания. В противном случае поток будет закрыт.
В следующем коде показано создание экземпляров PipeReader
и PipeWriter
с помощью методов Create
из потока.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
Приложение использует StreamReader для чтения файла lorem-ipsum.txt в виде потока, и оно должно заканчиваться пустой строкой. FileStream передается в PipeReader.Create, который создает экземпляр объекта PipeReader
. Затем консольное приложение передает стандартный выходной поток в PipeWriter.Create с помощью Console.OpenStandardOutput(). Пример поддерживает отмену.
Отзыв о .NET
.NET — это проект с открытым исходным кодом. Выберите ссылку, чтобы оставить отзыв:
Обучение
Модуль
Подключение команд к конвейеру - Training
В этом модуле содержится информация о подключении команд к конвейеру.