.NET'te System.IO.Pipelines
System.IO.Pipelines , .NET'te yüksek performanslı G/Ç yapmayı kolaylaştırmak için tasarlanmış bir kitaplıktır. Bu, tüm .NET uygulamaları üzerinde çalışan .NET Standard'i hedefleyen bir kitaplıktır.
Kitaplığı System.IO.Pipelines Nuget paketinde bulabilirsiniz.
System.IO.Pipelines hangi sorunu çözer?
Akış verilerini ayrıştıran uygulamalar, birçok özel ve olağan dışı kod akışına sahip ortak kodlardan oluşur. Ortak ve özel durum kodu karmaşıktır ve bakımı zordur.
System.IO.Pipelines
şu şekilde tasarlandı:
- Yüksek performanslı ayrıştırma akış verilerine sahip olun.
- Kod karmaşıklığını azaltın.
Aşağıdaki kod, bir istemciden satırla sınırlandırılmış iletiler (sınırlandırılmış '\n'
) alan bir TCP sunucusu için tipiktir:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Yukarıdaki kodda çeşitli sorunlar vardır:
- İletinin tamamı (satır sonu) öğesine yapılan tek bir çağrıda
ReadAsync
alınamayabilir. - sonucunu
stream.ReadAsync
yoksayıyor.stream.ReadAsync
okunan veri miktarı döndürür. - Tek
ReadAsync
bir çağrıda birden çok satırın okunması durumunu işlemez. - Her okuma ile bir
byte
dizi ayırır.
Önceki sorunları düzeltmek için aşağıdaki değişiklikler gereklidir:
Yeni bir satır bulunana kadar gelen verileri arabelleğe alın.
Arabellekte döndürülen tüm satırları ayrıştırın.
Çizgi 1 KB'tan (1024 bayt) büyük olabilir. Tam satırı arabelleğe sığdırmak için sınırlayıcı bulunana kadar kodun giriş arabelleğinin yeniden boyutlandırılması gerekir.
- Arabellek yeniden boyutlandırılırsa, girişte daha uzun satırlar göründüğünde daha fazla arabellek kopyası oluşturulur.
- Boşa harcanan alanı azaltmak için satırları okumak için kullanılan arabelleği sıkıştırın.
Belleği tekrar tekrar ayırmamak için arabellek havuzu kullanmayı göz önünde bulundurun.
Aşağıdaki kod bu sorunlardan bazılarını giderir:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Önceki kod karmaşıktır ve tanımlanan tüm sorunları gidermez. Yüksek performanslı ağ genellikle performansı en üst düzeye çıkarmak için karmaşık kod yazma anlamına gelir. System.IO.Pipelines
bu tür kod yazmayı kolaylaştırmak için tasarlanmıştır.
Boru
sınıfı Pipe bir PipeWriter/PipeReader
çift oluşturmak için kullanılabilir. içine PipeWriter
yazılan tüm veriler içinde PipeReader
kullanılabilir:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
Kanal temel kullanımı
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
İki döngü vardır:
FillPipeAsync
ve 'denSocket
okur ve öğesinePipeWriter
yazar.ReadPipeAsync
PipeReader
ve gelen satırları ayrıştırarak okur.
Ayrılmış açık arabellek yok. Tüm arabellek yönetimi ve PipeWriter
uygulamalarına PipeReader
devredilir. Arabellek yönetimi için temsilci seçmek, kod kullanılmasının yalnızca iş mantığına odaklanmasını kolaylaştırır.
İlk döngüde:
- PipeWriter.GetMemory(Int32) , temel alınan yazıcıdan bellek almak için çağrılır.
- PipeWriter.Advance(Int32) arabelleğe ne kadar veri yazıldığını söylemek
PipeWriter
için çağrılır. - PipeWriter.FlushAsync , verileri için kullanılabilir hale getirmek için çağrılır
PipeReader
.
İkinci döngüde PipeReader
, tarafından PipeWriter
yazılan arabellekleri tüketir. Arabellekler yuvadan gelir. çağrısı:PipeReader.ReadAsync
İki önemli bilgi parçası içeren bir ReadResult döndürür:
- biçiminde
ReadOnlySequence<byte>
okunan veriler. - Veri sonuna (EOF) ulaşılıp ulaşılmadığını gösteren boole
IsCompleted
değeri.
- biçiminde
Satır sonu (EOL) sınırlayıcısını bulduktan ve satırı ayrıştırdıktan sonra:
- Mantık, zaten işlenmiş olanları atlamak için arabelleği işler.
PipeReader.AdvanceTo
, ne kadar verinin tüketildiğini ve incelendiğini söylemekPipeReader
için çağrılır.
Okuyucu ve yazıcı döngüleri çağrılarak Complete
sona erer. Complete
temel alınan Kanal'ın ayrılan belleği serbest bırakmasına izin verir.
Geri basınç ve akış denetimi
İdeal olarak, okuma ve ayrıştırma birlikte çalışır:
- Okuma iş parçacığı, ağdan verileri tüketir ve arabelleklere yerleştirir.
- Ayrıştırma iş parçacığı uygun veri yapılarını oluşturmakla sorumludur.
Ayrıştırma genellikle ağdan veri bloklarını kopyalamaktan daha uzun sürer:
- Okuma iş parçacığı ayrıştırma iş parçacığının önüne geçer.
- Okuma iş parçacığının ayrıştırma iş parçacığının verilerini depolamak için yavaşlamasına veya daha fazla bellek ayırmasına gerek vardır.
En iyi performans için sık duraklamalar ile daha fazla bellek ayırma arasında bir denge vardır.
Yukarıdaki sorunu çözmek için, Pipe
veri akışını denetlemek için iki ayarı vardır:
- PauseWriterThreshold: Duraklatılacak çağrılardan FlushAsync önce ne kadar verinin arabelleğe alınması gerektiğini belirler.
- ResumeWriterThreshold: Devam etmek için yapılan çağrılardan önce okuyucunun ne kadar veri gözlemlediğini
PipeWriter.FlushAsync
belirler.
- içindeki veri miktarı kesiştiğinde
Pipe
PauseWriterThreshold
tamamlanmamışValueTask<FlushResult>
bir değer döndürür. - değerinden
ResumeWriterThreshold
daha düşük olduğunda tamamlanırValueTask<FlushResult>
.
Hızlı döngüleri önlemek için iki değer kullanılır ve bu değerlerden biri kullanılırsa ortaya çıkabilir.
Örnekler
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
PipeScheduler
Genellikle ve await
kullanırkenasync
, zaman uyumsuz kod veya TaskScheduler geçerli SynchronizationContextüzerinde devam eder.
G/Ç yapılırken G/Ç'nin nerede gerçekleştirildiği üzerinde ayrıntılı denetim sahibi olmak önemlidir. Bu denetim, CPU önbelleklerinden etkili bir şekilde yararlanmaya olanak tanır. Verimli önbelleğe alma, web sunucuları gibi yüksek performanslı uygulamalar için kritik öneme sahiptir. PipeScheduler zaman uyumsuz geri çağırmaların nerede çalıştırıldığı üzerinde denetim sağlar. Varsayılan olarak:
- Geçerli SynchronizationContext kullanılır.
SynchronizationContext
yoksa, geri çağırmaları çalıştırmak için iş parçacığı havuzunu kullanır.
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool , iş parçacığı havuzuna geri çağırmaları kuyruğa alan uygulamadır PipeScheduler . PipeScheduler.ThreadPool
varsayılan ve genel olarak en iyi seçenektir. PipeScheduler.Inline kilitlenmeler gibi istenmeyen sonuçlara neden olabilir.
Kanal sıfırlama
Nesneyi yeniden kullanmak sık sık verimlidir Pipe
. Kanalı sıfırlamak için hem hem de PipeReader
tamamlandıktan sonra çağırın.PipeReaderResetPipeWriter
PipeReader
PipeReader çağıranın adına belleği yönetir. çağrısı yaptıktan sonra her zaman arayın PipeReader.AdvanceToPipeReader.ReadAsync. Bu, çağıranın PipeReader
bellekle ne zaman yapıldığını bilmelerini ve böylece izlenebilmesini sağlar. döndürülen ReadOnlySequence<byte>
PipeReader.ReadAsync
yalnızca çağrısına PipeReader.AdvanceTo
kadar geçerlidir. çağrısı PipeReader.AdvanceTo
yaptıktan sonra kullanmak ReadOnlySequence<byte>
geçersizdir.
PipeReader.AdvanceTo
iki SequencePosition bağımsız değişken alır:
- İlk bağımsız değişken, ne kadar bellek tüketildiğini belirler.
- İkinci bağımsız değişken, arabelleğin ne kadarının gözlemleneceğini belirler.
Verileri tüketilen olarak işaretlemek, kanalın belleği temel alınan arabellek havuzuna döndürebileceği anlamına gelir. Verileri gözlemlenen olarak işaretlemek, bir sonraki çağrının PipeReader.ReadAsync
ne yaptığını denetler. Her şeyi gözlemlendi olarak işaretlemek, kanala daha fazla veri yazılana PipeReader.ReadAsync
kadar sonraki çağrısının döndürülmeyeceği anlamına gelir. Diğer tüm değerler, gözlemlenen ve gözlemlenmeyen verilerle hemen döndürülmek üzere PipeReader.ReadAsync
bir sonraki çağrıyı yapar, ancak zaten tüketilmiş olan verileri döndürmez.
Akış verisi senaryolarını okuma
Akış verilerini okumaya çalışırken ortaya çıkan birkaç tipik desen vardır:
- Veri akışı verilip tek bir iletiyi ayrıştırın.
- Bir veri akışı göz önünde bulundurulduğunda, kullanılabilir tüm iletileri ayrıştırın.
Aşağıdaki örneklerde, bir ReadOnlySequence<byte>
'den iletileri ayrıştırma yöntemi kullanılırTryParseLines
. TryParseLines
tek bir iletiyi ayrıştırıp giriş arabelleğinden ayrıştırılan iletiyi kırpmak için güncelleştirir. TryParseLines
.NET'in bir parçası değildir, aşağıdaki bölümlerde kullanılan kullanıcı tarafından yazılmış bir yöntemdir.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Tek bir iletiyi okuma
Aşağıdaki kod' dan PipeReader
tek bir iletiyi okur ve çağırana döndürür.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Yukarıdaki kod:
- Tek bir iletiyi ayrıştırıyor.
- Tüketilen
SequencePosition
ve incelenenSequencePosition
Güncelleştirmeler kırpılan giriş arabelleğinin başlangıcına işaret etmek için.
İki SequencePosition
bağımsız değişken, ayrıştırılmış iletiyi giriş arabelleğinden kaldırdığından TryParseLines
güncelleştirilir. Genellikle, arabellekten tek bir iletiyi ayrıştırırken, incelenen konum aşağıdakilerden biri olmalıdır:
- İletinin sonu.
- İleti bulunamazsa alınan arabelleğin sonu.
Tek bir ileti durumunda en fazla hata olasılığı vardır. İncelenmesi gereken yanlış değerlerin geçirilmesi bellek yetersiz özel durumu veya sonsuz döngüye neden olabilir. Daha fazla bilgi için bu makaledeki PipeReader yaygın sorunları bölümüne bakın.
Birden çok iletiyi okuma
Aşağıdaki kod, bir'den PipeReader
gelen tüm iletileri okur ve her birine çağrı ProcessMessageAsync
yapar.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
İptal
PipeReader.ReadAsync
:
- bir CancellationTokengeçirmeyi destekler.
- bekleyen bir okuma olduğunda iptal edilirse
CancellationToken
bir OperationCanceledException atar. - aracılığıyla PipeReader.CancelPendingReadgeçerli okuma işlemini iptal etmenin bir yolunu destekler ve bu da özel durum oluşturulmasını önler. Çağırma
PipeReader.CancelPendingRead
, geçerli veya sonraki çağrısınınPipeReader.ReadAsync
ile olarakIsCanceled
ayarlanmış bir ReadResult döndürmesinetrue
neden olur. Bu, mevcut okuma döngüsünü yıkıcı olmayan ve istisnai olmayan bir şekilde durdurmak için yararlı olabilir.
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader yaygın sorunları
Yanlış değerlerin veya değerlerinin geçirilmesi
consumed
examined
, zaten okunan verilerin okunmasıyla sonuçlanabilir.İncelendiği şekilde geçiş
buffer.End
aşağıdakilere neden olabilir:- Durdurulan veriler
- Veriler tüketilmezse büyük olasılıkla son bir Yetersiz Bellek (OOM) özel durumu. Örneğin,
PipeReader.AdvanceTo(position, buffer.End)
arabellekten bir kerede tek bir iletiyi işlerken.
Yanlış değerlerin öğesine
consumed
geçirilmesi veyaexamined
sonsuz döngüye neden olabilir. Örneğin, değişmediyse,PipeReader.AdvanceTo(buffer.Start)
buffer.Start
yeni veriler gelmeden hemen önce bir sonraki çağrının döndürülmesi gerekirPipeReader.ReadAsync
.Yanlış değerlerin 'e
consumed
geçirilmesi veyaexamined
sonsuz arabelleğe alma (nihai OOM) ile sonuçlanabilir.Çağrıdan
PipeReader.AdvanceTo
sonra komutununReadOnlySequence<byte>
kullanılması bellek bozulmasına neden olabilir (ücretsiz kullanımdan sonra kullanın).Çağrı
PipeReader.Complete/CompleteAsync
yapılamaması bellek sızıntısına neden olabilir.Arabelleği işlemeden önce okuma mantığının denetlenip ReadResult.IsCompleted çıkılması veri kaybına neden olur. Döngü çıkış koşulu ve
ReadResult.IsCompleted
temelReadResult.Buffer.IsEmpty
almalıdır. Bunu yanlış yapmak sonsuz döngüye neden olabilir.
Sorunlu kod
❌Veri kaybı
ReadResult
olarak ayarlandığında true
verinin IsCompleted
son kesimini döndürebilir. Okuma döngüsünden çıkmadan önce bu verilerin okunmaması veri kaybına neden olur.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.
❌Sonsuz döngü
Aşağıdaki mantık, ise ancak arabellekte hiçbir zaman tam bir ileti yoksa sonsuz bir döngüye Result.IsCompleted
true
neden olabilir.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.
Aynı sorunla ilgili başka bir kod parçası aşağıdadır. denetlemeden önce boş olmayan bir arabelleği denetler ReadResult.IsCompleted
. içinde olduğundan, arabellekte else if
hiçbir zaman tam bir ileti yoksa sonsuza kadar döngüye alır.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.
❌Yanıt vermeyen uygulama
konumunda ile buffer.End
examined
koşulsuz çağrılmasıPipeReader.AdvanceTo
, uygulamanın tek bir iletiyi ayrıştırırken yanıt vermemeye başlamasına neden olabilir. Sonraki arama PipeReader.AdvanceTo
şu zamana kadar döndürülmeyecek:
- Kanala yazılan daha fazla veri var.
- Yeni veriler daha önce incelenmedi.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.
❌Yetersiz Bellek (OOM)
Aşağıdaki koşullarla, aşağıdaki kod bir OutOfMemoryException gerçekleşene kadar arabelleğe almayı tutar:
- İleti boyutu üst sınırı yoktur.
- 'dan
PipeReader
döndürülen veriler tam bir ileti oluşturmaz. Örneğin, diğer taraf büyük bir ileti yazdığından (örneğin, 4 GB'lık bir ileti) tam bir ileti oluşturmaz.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.
❌Bellek Bozulması
Arabelleği okuyan yardımcılar yazarken, döndürülen yük çağrılmadan Advance
önce kopyalanmalıdır. Aşağıdaki örnek, atılan belleği Pipe
döndürür ve bir sonraki işlem (okuma/yazma) için yeniden kullanabilir.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.
PipeWriter
, PipeWriter arayan adına yazılacak arabellekleri yönetir. PipeWriter
uygular IBufferWriter<byte>
. IBufferWriter<byte>
ek arabellek kopyaları olmadan yazma işlemleri gerçekleştirmek için arabelleklere erişim elde etmek mümkün hale getirir.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Önceki kod:
- kullanarak 'den en az 5 baytlık bir arabellek istemektedir
PipeWriter
GetMemory. - ASCII dizesi
"Hello"
için baytları döndürülenMemory<byte>
öğesine yazar. - Arabelleğe kaç bayt yazıldığını belirten çağrılar Advance .
- Baytları
PipeWriter
temel alınan cihaza gönderen öğesini temizler.
Önceki yazma yöntemi tarafından PipeWriter
sağlanan arabellekleri kullanır. Şunu da kullanmış PipeWriter.WriteAsyncolabilir:
- Varolan arabelleği öğesine
PipeWriter
kopyalar. - uygun şekilde öğesini
Advance
çağırırGetSpan
ve öğesini çağırırFlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
İptal
FlushAsync bir CancellationTokengeçirmeyi destekler. Bekleyen bir CancellationToken
temizleme işlemi varken belirteç iptal edilirse bir sonuç OperationCanceledException
geçirilir. PipeWriter.FlushAsync
, özel durum oluşturmadan aracılığıyla PipeWriter.CancelPendingFlush geçerli temizleme işlemini iptal etmenin bir yolunu destekler. Çağırma PipeWriter.CancelPendingFlush
, veya öğesine yapılan geçerli veya sonraki çağrının PipeWriter.FlushAsync
PipeWriter.WriteAsync
olarak ayarlanmış olarak bir FlushResultIsCanceled
döndürmesine true
neden olur. Bu, boşaltmayı yıkıcı olmayan ve istisnai olmayan bir şekilde durdurmak için yararlı olabilir.
PipeWriter yaygın sorunları
- GetSpan ve GetMemory en az istenen bellek miktarına sahip bir arabellek döndürür. Tam arabellek boyutlarını varsaymayın .
- Ardışık çağrıların aynı arabelleği veya aynı boyutlu arabelleği döndüreceğinin garantisi yoktur.
- Daha fazla veri yazmaya devam etmek için çağrıldıktan Advance sonra yeni bir arabellek istenmelidir. Daha önce alınan arabellek için yazılamaz.
- Çağrısı
GetMemory
veyaGetSpan
tamamlanmamış bir aramaFlushAsync
olduğunda güvenli değildir. CompleteAsync
VeyaComplete
şişirilmemiş veriler olduğunda çağrılması bellek bozulmasına neden olabilir.
PipeReader ve PipeWriter kullanmak için İpuçları
Aşağıdaki ipuçları sınıfları başarıyla kullanmanıza System.IO.Pipelines yardımcı olur:
- Uygun olduğunda bir özel durum da dahil olmak üzere PipeReader ve PipeWriter'ı her zaman tamamlayın.
- çağrısı yaptıktan sonra her zaman arayın PipeReader.AdvanceToPipeReader.ReadAsync.
- Yazarken düzenli aralıklarla
await
PipeWriter.FlushAsync ve her zaman denetleyin FlushResult.IsCompleted. okuyucunun tamamlandığını ve artık ne yazıldığını umursamadığını gösterdiğinden , iseIsCompleted
true
yazmayı durdurun. - Erişimi olmasını istediğiniz
PipeReader
bir şey yazdıktan sonra arama PipeWriter.FlushAsync yapın. - Okuyucu bitene kadar
FlushAsync
başlayamazsa aramayınFlushAsync
çünkü bu kilitlenmeye neden olabilir. - Veya'ya yalnızca bir bağlam "sahip"
PipeReader
PipeWriter
olduğundan veya bu bağlamlara eriştiğine emin olun. Bu türler iş parçacığı açısından güvenli değildir. - öğesini çağırdıktan
AdvanceTo
veya tamamladıktan sonra hiçbir zaman öğesine ReadResult.Buffer erişemezPipeReader
.
IDuplexPipe
IDuplexPipe, hem okumayı hem de yazmayı destekleyen türler için bir sözleşmedir. Örneğin, bir ağ bağlantısı ile IDuplexPipe
temsil edilir.
ve içeren PipeReader
sürümünden farklı Pipe
olarakPipeWriter
, IDuplexPipe
tam çift yönlü bağlantının tek bir tarafını temsil eder. Bu, öğesine PipeWriter
yazılanların'dan PipeReader
okunmayacağı anlamına gelir.
Akışlar
Akış verilerini okurken veya yazarken genellikle seri hale getirici kullanarak verileri okur ve seri hale getirici kullanarak veri yazarsınız. Bu okuma ve yazma akışı API'lerinin çoğunun parametresi Stream
vardır. Bu mevcut API'lerle PipeReader
tümleştirmeyi kolaylaştırmak ve PipeWriter
bir AsStream yöntemi kullanıma sunma. AsStreamveya PipeWriter
çevresinde PipeReader
bir Stream
uygulama döndürür.
Akış örneği
PipeReader
ve PipeWriter
örnekleri, bir Stream nesne ve isteğe bağlı olarak ilgili oluşturma seçenekleri verilen statik Create
yöntemler kullanılarak oluşturulabilir.
aşağıdaki StreamPipeReaderOptions parametrelerle örneğin oluşturulması PipeReader
üzerinde denetime izin verir:
- StreamPipeReaderOptions.BufferSize , havuzdan bellek kiralarken kullanılan bayt cinsinden en düşük arabellek boyutudur ve varsayılan olarak
4096
olarak kullanılır. - StreamPipeReaderOptions.LeaveOpen bayrağı, temel alınan akışın tamamlandıktan sonra
PipeReader
açık bırakılıp bırakılmayacağını belirler ve varsayılan olarakfalse
olarak ayarlanır. - StreamPipeReaderOptions.MinimumReadSize , yeni bir arabellek ayrılmadan önce arabellekte kalan bayt eşiğini temsil eder ve varsayılan olarak değerine sahiptir
1024
. - StreamPipeReaderOptions.Pool
MemoryPool<byte>
bellek ayrılırken kullanılır ve varsayılan olaraknull
kullanılır.
aşağıdaki StreamPipeWriterOptions parametrelerle örneğin oluşturulması PipeWriter
üzerinde denetime izin verir:
- StreamPipeWriterOptions.LeaveOpen bayrağı, temel alınan akışın tamamlandıktan sonra
PipeWriter
açık bırakılıp bırakılmayacağını belirler ve varsayılan olarakfalse
olarak ayarlanır. - StreamPipeWriterOptions.MinimumBufferSize , 'den Poolbellek kiralarken kullanılacak en düşük arabellek boyutunu temsil eder
4096
ve varsayılan olarak olarak kullanılır. - StreamPipeWriterOptions.Pool
MemoryPool<byte>
bellek ayrılırken kullanılır ve varsayılan olaraknull
kullanılır.
Önemli
yöntemleri kullanarak Create
ve PipeWriter
örnekleri oluştururken PipeReader
nesne ömrünü Stream
göz önünde bulundurmanız gerekir. Okuyucu veya yazıcıyla işlem tamamlandıktan sonra akışa erişmeniz gerekiyorsa, oluşturma seçeneklerinde bayrağını true
olarak ayarlamanız LeaveOpen
gerekir. Aksi takdirde akış kapatılır.
Aşağıdaki kod, bir akıştan yöntemleri kullanarak ve PipeWriter
örneklerinin PipeReader
oluşturulmasını Create
gösterir.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
UygulamaStreamReader, lorem-ipsum.txt dosyasını akış olarak okumak için kullanır ve boş bir satırla bitmelidir. FileStream öğesine geçirilir PipeReader.Createve bu da bir PipeReader
nesnenin örneğini oluşturur. Konsol uygulaması daha sonra standart çıkış akışını kullanarak Console.OpenStandardOutput()öğesine PipeWriter.Create geçirir. Örnek iptali destekler.
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin