Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
System.IO.Pipelines , .NET'te yüksek performanslı G/Ç'yi kolaylaştırmak için tasarlanmış bir kitaplıktır. Paket geniş uyumluluk, .NET Framework ve modern .NET için .NET Standard'ı hedefler. Modern .NET sürümlerinde, System.IO.Pipelines paylaşılan çerçeveye dahildir ve ayrı bir NuGet paketi gerektirmez.
Kitaplık System.IO.Pipelines NuGet paketi olarak da kullanılabilir.
System.IO.Pipelines hangi sorunu çözer?
Akış verilerini ayrıştıran uygulamalar, birçok özel ve olağan dışı kod akışına sahip ortak kodlardan oluşur. Şablon ve özel durum kodu karmaşıktır ve bakımı zordur.
System.IO.Pipelines şu şekilde tasarlandı:
- Yüksek performanslı ayrıştırma akış verilerine sahip olun.
- Kod karmaşıklığını azaltın.
Bu kod, bir istemciden '\n' ile sınırlandırılmış satır-ayrılmış iletiler alan bir TCP sunucusu için tipiktir.
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Yukarıdaki kodda çeşitli sorunlar vardır:
- İletinin tamamı (satır sonu)
ReadAsyncfonksiyonuna yapılan tek bir çağrıda alınamayabilir. - Bu
stream.ReadAsyncsonucunu yok sayıyor.stream.ReadAsyncokunan veri miktarı döndürür. - Bir
ReadAsyncçağrısında birden fazla satırın okunması durumunu işlemez. - Her bir okuma ile bir
bytedizi ayırır.
Önceki sorunları düzeltmek için şu değişiklikleri yapın:
Yeni bir satır bulunana kadar gelen verileri arabelleğe alın.
Arabellekte döndürülen tüm satırları ayrıştırın.
Satır 1 KB'tan (1024 bayt) büyük olabilir. Tam satırı arabellek içine sığdırmak için, kodun giriş arabelleğini sınırlandırıcı bulunana kadar yeniden boyutlandırması gerekiyor.
- Arabellek yeniden boyutlandırılırsa, girişte daha uzun satırlar göründüğünde daha fazla arabellek kopyası oluşturulur.
- Boşa harcanan alanı azaltmak için satırları okumak için kullanılan arabelleği sıkıştırın.
Belleği tekrar tekrar ayırmamak için arabellek havuzu kullanmayı göz önünde bulundurun.
Bu kod şu sorunlardan bazılarını giderir:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Önceki kod karmaşıktır ve tanımlanan tüm sorunları gidermez. Yüksek performanslı ağ genellikle performansı en üst düzeye çıkarmak için karmaşık kod yazma anlamına gelir.
System.IO.Pipelines bu tür kod yazmayı kolaylaştırmak için tasarlanmıştır.
Boru
Pipe sınıfını kullanarak bir PipeWriter/PipeReader çifti oluşturun.
PipeWriter içine yazılan tüm veriler PipeReader içinde mevcuttur.
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
Boru temel kullanımı
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
İki döngü okuma ve yazmayı işler:
-
FillPipeAsyncöğesindenSocketokur vePipeWriteröğesine yazar. -
ReadPipeAsync,PipeReader'den okur ve gelen satırları ayrıştırır.
Belirli arabellekler tahsis edilmez. Tüm arabellek yönetimi, PipeReader ve PipeWriter uygulamalarına devredilir. Arabellek yönetimini delege etmek, kodun yalnızca iş mantığına odaklanmasını kolaylaştırır.
İlk döngüde:
- PipeWriter.GetMemory(Int32), temel yazıcıdan bellek almak için çağrılır.
-
PipeWriter.Advance(Int32),
PipeWriter'e arabelleğe ne kadar veri yazıldığını bildirmek için çağrılır. -
PipeWriter.FlushAsync, verileri
PipeReaderiçin kullanılabilir hale getirmek amacıyla çağrılır.
İkinci döngüde PipeReaderPipeWriter tarafından yazılan arabellekleri tüketir. Arabellekler soketten gelir. çağrısı:PipeReader.ReadAsync
İki önemli bilgi parçası içeren bir ReadResult döndürür:
- ReadOnlySequence<T> biçiminde okunan veriler.
- Veri sonuna (EOF) ulaşılıp ulaşılmadığını gösteren boole
IsCompleteddeğeri.
Satır sonu (EOL) sınırlayıcısını bulduktan ve satırı ayrıştırdıktan sonra:
- Mantık, önceden işlenmiş olanları atlamak için arabelleği işler.
-
PipeReader.AdvanceTo, ne kadar verinin tüketildiğini ve incelendiğini
PipeReader'e bildirmek için çağrılır.
Okuyucu ve yazıcı döngüleri, PipeReader.Complete ve PipeWriter.Complete çağrılarıyla sona erer.
Complete çağrısı, Pipe tarafından ayrılmış olan belleği serbest bırakır.
Geri basınç ve akış denetimi
İdeal olarak, okuma ve ayrıştırma birlikte çalışır:
- Okuma iş parçacığı, ağdan verileri alır ve arabelleklere yerleştirir.
- Ayrıştırma iş parçacığı uygun veri yapılarını oluşturmakla sorumludur.
Ayrıştırma genellikle ağdan veri bloklarını kopyalamaktan daha uzun sürer:
- Okuma iş parçacığı, ayrıştırma iş parçacığının önüne geçer.
- Okuma iş parçacığının ayrıştırma iş parçacığının verilerini depolamak için yavaşlamasına veya daha fazla bellek ayırmasına gerek vardır.
En iyi performans için sık duraklamalar ile daha fazla bellek ayırma arasında bir denge vardır.
Yukarıdaki sorunu çözmek için, Pipe veri akışını denetlemek için iki ayarı vardır:
- PauseWriterThreshold: Çağrıların FlushAsync ile duraklatılmasından önce ne kadar verinin arabelleğe alınması gerektiğini belirler.
- ResumeWriterThreshold: Çağrıların PipeWriter.FlushAsync yeniden başlamasından önce okuyucunun ne kadar veri gözlemlemesi gerektiğini belirler.
- Verinin
ValueTask<FlushResult>içindeki miktarıPipesınırını aştığında, tamamlanmamış birPauseWriterThresholddöndürür. -
ValueTask<FlushResult>ResumeWriterThresholddeğerinden daha düşük olduğunda tamamlanır.
Hızlı döngülerin meydana gelmesini önlemek için iki değer kullanılır; bu, yalnızca bir değer kullanıldığında gerçekleşebilir.
Örnekler
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
Boru Zamanlayıcı
Genellikle async ve await kullanırken, uyumsuz kod ya TaskScheduler ya da geçerli SynchronizationContext üzerinde devam eder.
G/Ç yapılırken G/Ç'nin nerede gerçekleştirildiği üzerinde ayrıntılı denetim sahibi olmak önemlidir. Bu denetim, CPU önbelleklerinden etkili bir şekilde yararlanmaya olanak tanır. Verimli önbelleğe alma, web sunucuları gibi yüksek performanslı uygulamalar için kritik öneme sahiptir. PipeScheduler zaman uyumsuz geri çağırmaların nerede çalıştırıldığı üzerinde denetim sağlar. Varsayılan olarak:
- Geçerli SynchronizationContext kullanılır.
- Eğer
SynchronizationContextyoksa, geri çağrımları çalıştırmak için iş parçacığı havuzunu kullanır.
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool iş parçacığı havuzuna geri çağırmaları kuyruğa alan bir uygulamadır PipeScheduler.
PipeScheduler.ThreadPool varsayılan ve genel olarak en iyi seçenektir.
PipeScheduler.Inline kilitlenmeler gibi istenmeyen sonuçlara neden olabilir.
Boru sıfırlama
Nesneyi yeniden kullanma Pipe işlemi genellikle verimlidir. Kanalı sıfırlamak için, hem PipeReader hem de Reset tamamlandıktan sonra PipeReaderPipeWriter çağırın.
PipeReader
PipeReader çağıranın adına belleği yönetir.
Bu, çağıranın belleği kullanmayı bitirdiğini PipeReader'a bildirir, böylece izlenebilir.
ReadOnlySequence<T>
PipeReader.ReadAsync'den döndürülen değer, yalnızca PipeReader.AdvanceTo çağrısına kadar geçerlidir. çağrısı ReadOnlySequence<T>yaptıktan sonra kullanmak PipeReader.AdvanceTo geçersizdir.
PipeReader.AdvanceTo iki SequencePosition parametre alır:
- İlk bağımsız değişken, ne kadar bellek tüketildiğini belirler.
- İkinci bağımsız değişken, arabelleğin ne kadarının gözlemlendiğini belirler.
Verileri tüketilen olarak işaretlemek, boru hattının belleği altındaki arabellek havuzuna geri döndürebileceği anlamına gelir. Verileri gözlemlenmiş olarak işaretlemek, bir sonraki PipeReader.ReadAsync çağrısının ne yapacağını kontrol eder. Her şeyi gözlemlenmiş olarak işaretlemek, boruya daha fazla veri yazılana kadar PipeReader.ReadAsync fonksiyonunun bir sonraki çağrısının geri dönmeyeceği anlamına gelir. Diğer herhangi bir değer, gözlemlenen PipeReader.ReadAsync gözlemlenmeyen verilerle hemen dönmek için bir sonraki çağrıyı yapar, ancak zaten tüketilmiş olan verileri döndürmez.
Akış verisi senaryolarını okuma
Akış verileri okunurken birkaç tipik desen ortaya çıkar:
- Veri akışı verildiğinde, tek bir iletiyi ayrıştırın.
- Bir veri akışı göz önünde bulundurulduğunda, kullanılabilir tüm iletileri ayrıştırın.
Bu örnekler, TryParseLines bir ReadOnlySequence<T>'den iletileri ayrıştırma yöntemini kullanır.
TryParseLines tek bir iletiyi ayrıştırır ve giriş arabelleğini, ayrıştırılan iletiyi arabelleğinden çıkarmak için günceller.
TryParseLines .NET'in bir parçası değildir; bu, aşağıdaki bölümlerde kullanılan kullanıcı tarafından yazılmış bir yöntemdir.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Tek bir iletiyi okuma
Bu kod, PipeReader'den tek bir mesaj okur ve bunu çağırana döndürür.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Yukarıdaki kod:
- Tek bir iletiyi ayrıştırıyor.
- Tüketilen
SequencePositionve incelenenSequencePositionöğesini kırpılan giriş arabelleğinin başlangıcına işaret eden şekilde güncelleştirir.
İki SequencePosition bağımsız değişken, TryParseLines ayrıştırılmış iletiyi giriş arabelleğinden kaldırdığından güncellenir. Genellikle, arabellekten tek bir iletiyi ayrıştırırken, incelenen konum aşağıdakilerden biri olmalıdır:
- İletinin sonu.
- Alınan arabelleğin sonu, eğer ileti bulunamazsa.
Tek bir ileti durumunda en fazla hata olasılığı vardır. Examined'a yanlış değerlerin geçilmesi bellek yetersizliği özel durumu veya sonsuz döngüyle sonuçlanabilir. Daha fazla bilgi için bu makaledeki PipeReader yaygın sorunları bölümüne bakın.
Önemli
ReadSingleMessageAsync, PipeReader.CompleteAsync çağrısı yapmaz. Çağıran, PipeReader öğesini tamamlamaktan sorumludur.
PipeReader.CompleteAsync'in ReadSingleMessageAsync içinde çağrılması daha fazla veri okunamamasını sinyaller ve sonraki mesajların okunmasını önler.
Birden çok iletiyi okuma
Bu kod, PipeReader'den tüm iletileri okur ve her birine ProcessMessageAsync çağrısı yapar.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
ProcessMessagesAsync İleti okuma döngüsünün tamamına sahip olduğundan, tamamlandığında çağrısı PipeReader.CompleteAsync yapılır. Tek mesaj durumuyla farklı olarak, çağıranın okuyucuyu tamamlaması gerekmez.
ProcessMessagesAsync, PipeReader'in yaşam döngüsü üzerinde tam sahiplik alır.
İptal
- bir CancellationTokengeçirmeyi destekler.
-
OperationCanceledException ifadesini, bekleyen bir okuma varken
CancellationTokeniptal edilirse atar. - aracılığıyla PipeReader.CancelPendingRead geçerli okuma işlemini iptal etmenin bir yolunu destekler ve bu da istisna oluşturulmasını önler. Çağrı
PipeReader.CancelPendingRead, geçerli veya sonraki PipeReader.ReadAsync çağrısının, ReadResult'ninIsCanceledolarak ayarlandığı birtruedöndürmesine neden olur. Bu, var olan okuma döngüsünü yıkıcı olmayan ve istisnai olmayan bir şekilde durdurmak için kullanışlıdır.
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader yaygın sorunları
Yanlış değerlerin
consumedveyaexamined'e geçirilmesi, daha önce okunan verilerin tekrar okunmasıyla sonuçlanabilir.İncelendiği gibi geçiş
buffer.Endaşağıdakilere neden olabilir:- Durdurulan veriler
- Veriler tüketilmezse sonunda bellek yetersizliği (OOM) özel durumu oluşabilir. Örneğin,
PipeReader.AdvanceTo(position, buffer.End)arabellekten bir kerede tek bir iletiyi işlerken.
Yanlış değerlerin
consumedveyaexaminedöğesine geçirilmesi sonsuz döngüye neden olabilir. Örneğin,buffer.Startdeğişmediyse, bir sonraki PipeReader.ReadAsync çağrısı, yeni veriler gelmeden hemen önce döner.Yanlış değerlerin
consumedveyaexamined'e geçirilmesi sonsuz arabelleğe alma (sonunda bellek yetersizliği) ile sonuçlanabilir.ReadOnlySequence<T> çağrısından sonra PipeReader.AdvanceTo kullanmak bellek bozulmasına neden olabilir (serbest bırakıldıktan sonra kullanma).
Çağrı Complete/CompleteAsync yapılamaması bellek sızıntısına neden olabilir.
Okuma mantığının kontrol edilip ReadResult.IsCompleted ve arabelleği işlemeye başlamadan önce sonlandırılması, veri kaybına neden olur. Döngü çıkış koşulu,
ReadResult.Buffer.IsEmptyveReadResult.IsCompletedtemel alınarak belirlenmelidir. Bunu yanlış yapmak sonsuz döngüye neden olabilir.
Sorunlu kod
❌ Veri kaybı
ReadResult olarak ayarlandığında IsCompletedverinin true son kesimini döndürebilir. Okuma döngüsünden çıkmadan önce bu verilerin okunmaması veri kaybına neden olur.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için yukarıdaki örnek sağlanmıştır.
❌ Sonsuz döngü
"Aşağıdaki mantık, eğer Result.IsCompletedtrue ise ve arabellekte hiçbir zaman tam bir ileti yoksa, sonsuz döngüye neden olabilir."
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için yukarıdaki örnek sağlanmıştır.
Aynı sorunla ilgili başka bir kod parçası aşağıdadır. Boş olmayan bir arabelleği kontrol ettikten sonra ReadResult.IsCompleted’yu kontrol eder.
else if içinde olduğundan, arabellekte asla tam bir ileti yoksa sonsuz döngüde kalır.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için yukarıdaki örnek sağlanmıştır.
❌ Yanıt vermeyen uygulama
examined konumunda PipeReader.AdvanceTo'ı buffer.End ile koşulsuz çağırmak, uygulamanın tek bir iletiyi ayrıştırırken yanıt vermemeye başlamasına neden olabilir. Sonraki PipeReader.AdvanceTo çağrısı şu ana kadar geri dönmeyecek:
- Kanala yazılan daha fazla veri var.
- Yeni veriler daha önce incelenmedi.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için yukarıdaki örnek sağlanmıştır.
❌ Yetersiz Bellek (OOM)
Aşağıdaki koşullarla, bu kod bir OutOfMemoryException durumu gerçekleşene kadar arabelleğe almaya devam eder.
- İleti boyutu üst sınırı yoktur.
- 'dan
PipeReaderdöndürülen veriler tam bir ileti oluşturmaz. Örneğin, diğer taraf büyük bir ileti (örneğin, 4 GB ileti) yazıyor.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için yukarıdaki örnek sağlanmıştır.
❌ Bellek Bozulması
Arabelleği okuyan yardımcıları yazarken, Advance çağrılmadan önce döndürülen veri yükünü kopyalayın. Aşağıdaki örnek, Pipe tarafından atılan belleği geri kazandırır ve belki sonraki işlem (okuma/yazma) için yeniden kullanılabilir.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için aşağıdaki örnek sağlanmıştır.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader ile ilgili yaygın sorunları açıklamak için yukarıdaki örnek sağlanmıştır.
PipeWriter
, PipeWriter arayan adına yazılacak arabellekleri yönetir.
PipeWriter uygular IBufferWriter<byte>.
IBufferWriter<byte> ek arabellek kopyaları olmadan yazma işlemleri gerçekleştirmek için arabelleklere erişim sağlar.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Önceki kod:
-
PipeWriterkullanarak GetMemory'den en az 5 baytlık bir arabellek istemektedir. - ASCII dizesi
"Hello"'nin baytlarını döndürülenMemory<byte>öğesine yazar. - Arabelleğe kaç bayt yazıldığını belirtmek için Advance çağrıları yapılır.
-
PipeWriter'yi boşaltır ve baytları alt cihazına gönderir.
Daha önceki yazma yöntemi, PipeWriter tarafından sağlanan arabellekleri kullanır. Ayrıca şunu da kullanabilir PipeWriter.WriteAsync:
- Mevcut arabelleği
PipeWriter'a kopyalar. - GetSpan ve Advance öğelerini uygun şekilde çağırır, ve FlushAsync öğesini çağırır.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
İptal
FlushAsync bir CancellationTokengeçirmeyi destekler. Bir CancellationToken geçirildiğinde, bekleyen bir temizleme işlemi sırasında belirteç iptal edilirse, bir OperationCanceledException sonucu oluşur.
PipeWriter.FlushAsync, istisna oluşturulmadan PipeWriter.CancelPendingFlush aracılığıyla geçerli temizleme işlemini iptal etmenin bir yolunu destekler.
PipeWriter.CancelPendingFlush fonksiyonu çağrımı, geçerli ya da sonraki PipeWriter.FlushAsync veya PipeWriter.WriteAsync çağrısının, FlushResult olarak ayarlanmış bir IsCanceled ve true döndürmesine neden olur. Bu, boşaltmayı yıkıcı olmayan ve istisnai olmayan bir şekilde durdurmak için yararlıdır.
PipeWriter yaygın sorunları
- GetSpan ve GetMemory en az istenen bellek miktarına sahip bir arabellek döndürür. Kesin arabellek boyutlarını asla varsaymayın.
- Ardışık çağrıların aynı arabelleği veya aynı boyuttaki arabelleği döndürmesi garanti değildir.
- Daha fazla veri yazmaya devam etmek için Advance çağrıldıktan sonra yeni bir arabellek istenmelidir. Daha önce alınan arabellek üzerine yazılamaz.
- GetMemory'ye hâlâ tamamlanmamış bir çağrı varken GetSpan veya FlushAsync çağrısı yapmak güvenli değildir.
- Çağrı Complete veya CompleteAsync boşaltılmamış veriler olduğunda bellek bozulmasına neden olabilir.
PipeReader ve PipeWriter İpuçları
Başarıyla System.IO.Pipelines sınıflarını kullanmak için şu ipuçlarını kullanın:
- Uygun olduğunda bir özel durum da dahil olmak üzere PipeReader ve PipeWriter'ı her zaman tamamlayın.
- PipeReader.AdvanceTo çağrısını yaptıktan sonra her zaman PipeReader.ReadAsync çağrısını yapın.
- Yazarken düzenli aralıklarla
awaitPipeWriter.FlushAsync ve FlushResult.IsCompleted her zaman denetleyin. EğerIsCompletedtrueise, bu durum okuyucunun tamamlandığını ve artık ne yazıldığını umursamadığını gösterdiğinden yazmayı durdurun. - Erişim sahibi olmasını istediğiniz PipeWriter.FlushAsync bir şey yazdıktan sonra arayın
PipeReader. -
FlushAsync'i aramayın; eğer okuyucuFlushAsyncbitene kadar başlayamazsa, bu bir kilitlenmeye neden olabilir. - Bir bağlamın
PipeReaderveyaPipeWriteretiketlerine "sahip olduğundan" veya eriştiğinden emin olun. Bu türler iş parçacığı açısından güvenli değildir. -
ReadResult.Buffer'yı çağırdıktan veya PipeReader.AdvanceTo'yi tamamladıktan sonra asla
PipeReaderöğesine erişmeyin.
IDuplexPipe
IDuplexPipe hem okumayı hem de yazmayı destekleyen türler için bir sözleşmedir. Örneğin, bir ağ bağlantısı ile IDuplexPipetemsil edilir.
Pipe, PipeReader ve PipeWriter içeren sürümden farklı olarak, IDuplexPipe tam çift yönlü bağlantının tek bir tarafını temsil eder.
PipeWriter öğesine yazdıkların PipeReader öğesinden okunmayacak.
Akışlar
Akış verilerini okurken veya yazarken genellikle seri-dışına çıkarıcı kullanarak verileri okur ve seri hale getirici kullanarak veri yazarsınız. Bu okuma ve yazma akışı API'lerinin çoğunun parametresi Stream vardır. Bu mevcut API'lerle tümleştirmeyi kolaylaştırmak için, PipeReader ve PipeWriter bir AsStream yöntemi kullanıma sunar.
AsStreamveya Streamçevresinde PipeReader bir PipeWriter uygulama döndürür.
Akış örneği
Bir Stream nesne ve isteğe bağlı Stream oluşturma seçenekleri ile birlikte, statik Create yöntemleri kullanarak PipeReader ve PipeWriter örnekleri oluşturun.
StreamPipeReaderOptions aşağıdaki parametrelerle PipeReader örneğinin oluşturulması üzerinde denetim sağlar.
-
StreamPipeReaderOptions.BufferSize , havuzdan bellek kiralarken kullanılan bayt cinsinden en düşük arabellek boyutudur ve varsayılan olarak
4096olarak kullanılır. -
StreamPipeReaderOptions.LeaveOpen bayrağı, temel alınan akışın
PipeReadertamamlandıktan sonra açık bırakılıp bırakılmadığını belirler ve varsayılan olarakfalseolarak ayarlanır. -
StreamPipeReaderOptions.MinimumReadSize , yeni bir arabellek ayrılmadan önce arabellekte kalan bayt eşiğini temsil eder ve varsayılan olarak değerine sahiptir
1024. -
StreamPipeReaderOptions.Pool
MemoryPool<byte>bellek ayrılırken kullanılır ve varsayılan olaraknullkullanılır.
StreamPipeWriterOptions aşağıdaki parametrelerle PipeWriter örneğinin oluşturulması üzerinde denetim sağlar.
-
StreamPipeWriterOptions.LeaveOpen bayrağı, temel alınan akışın
PipeWritertamamlandıktan sonra açık bırakılıp bırakılmadığını belirler ve varsayılan olarakfalseolarak ayarlanır. -
StreamPipeWriterOptions.MinimumBufferSize bellek kiralarken kullanılacak en düşük arabellek boyutunu temsil eder ve varsayılan değer `Pool` ile `
4096`'den alınır. -
StreamPipeWriterOptions.Pool
MemoryPool<byte>bellek ayrılırken kullanılır ve varsayılan olaraknullkullanılır.
Önemli
Yöntemleri kullanarak PipeReader ve PipeWriter örnekleri oluştururken Create nesne ömrünü Stream göz önünde bulundurun. Okuyucu veya yazıcıyla işiniz bittikten sonra akışa erişmeniz gerekiyorsa, oluşturma seçeneklerinde LeaveOpen bayrağını true olarak ayarlayın. Aksi takdirde akış kapatılır.
Bu kod, bir akıştan Create yöntemlerini kullanarak PipeReader ve PipeWriter örnekleri oluşturmayı gösterir.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
Uygulama, bir PipeReader nesnesinin örneğini oluşturur. Konsol uygulaması daha sonra standart çıkış akışını PipeWriter.Create kullanarak Console.OpenStandardOutput() öğesine aktarır. Örnek iptali destekler.