Eğitim
Modül
Komutları bir işlem hattına bağlama - Training
Bu modülde komutları bir işlem hattına bağlamayı öğreneceksiniz.
Bu tarayıcı artık desteklenmiyor.
En son özelliklerden, güvenlik güncelleştirmelerinden ve teknik destekten faydalanmak için Microsoft Edge’e yükseltin.
System.IO.Pipelines , .NET'te yüksek performanslı G/Ç yapmayı kolaylaştırmak için tasarlanmış bir kitaplıktır. Bu, tüm .NET uygulamaları üzerinde çalışan .NET Standard'i hedefleyen bir kitaplıktır.
Kitaplığı System.IO.Pipelines Nuget paketinde bulabilirsiniz.
Akış verilerini ayrıştıran uygulamalar, birçok özel ve olağan dışı kod akışına sahip ortak kodlardan oluşur. Ortak ve özel durum kodu karmaşıktır ve bakımı zordur.
System.IO.Pipelines
şu şekilde tasarlandı:
Aşağıdaki kod, bir istemciden satırla sınırlandırılmış iletiler (sınırlandırılmış '\n'
) alan bir TCP sunucusu için tipiktir:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Yukarıdaki kodda çeşitli sorunlar vardır:
ReadAsync
alınamayabilir.stream.ReadAsync
yoksayıyor. stream.ReadAsync
okunan veri miktarı döndürür.ReadAsync
bir çağrıda birden çok satırın okunması durumunu işlemez.byte
dizi ayırır.Önceki sorunları düzeltmek için aşağıdaki değişiklikler gereklidir:
Yeni bir satır bulunana kadar gelen verileri arabelleğe alın.
Arabellekte döndürülen tüm satırları ayrıştırın.
Çizgi 1 KB'tan (1024 bayt) büyük olabilir. Tam satırı arabelleğe sığdırmak için sınırlayıcı bulunana kadar kodun giriş arabelleğinin yeniden boyutlandırılması gerekir.
Belleği tekrar tekrar ayırmamak için arabellek havuzu kullanmayı göz önünde bulundurun.
Aşağıdaki kod bu sorunlardan bazılarını giderir:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Önceki kod karmaşıktır ve tanımlanan tüm sorunları gidermez. Yüksek performanslı ağ genellikle performansı en üst düzeye çıkarmak için karmaşık kod yazma anlamına gelir. System.IO.Pipelines
bu tür kod yazmayı kolaylaştırmak için tasarlanmıştır.
sınıfı Pipe bir PipeWriter/PipeReader
çift oluşturmak için kullanılabilir. içine PipeWriter
yazılan tüm veriler içinde PipeReader
kullanılabilir:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
İki döngü vardır:
FillPipeAsync
ve 'den Socket
okur ve öğesine PipeWriter
yazar.ReadPipeAsync
PipeReader
ve gelen satırları ayrıştırarak okur.Ayrılmış açık arabellek yok. Tüm arabellek yönetimi ve PipeWriter
uygulamalarına PipeReader
devredilir. Arabellek yönetimi için temsilci seçmek, kod kullanılmasının yalnızca iş mantığına odaklanmasını kolaylaştırır.
İlk döngüde:
PipeWriter
için çağrılır.PipeReader
.İkinci döngüde PipeReader
, tarafından PipeWriter
yazılan arabellekleri tüketir. Arabellekler yuvadan gelir. çağrısı:PipeReader.ReadAsync
İki önemli bilgi parçası içeren bir ReadResult döndürür:
ReadOnlySequence<byte>
okunan veriler.IsCompleted
değeri.Satır sonu (EOL) sınırlayıcısını bulduktan ve satırı ayrıştırdıktan sonra:
PipeReader.AdvanceTo
, ne kadar verinin tüketildiğini ve incelendiğini söylemek PipeReader
için çağrılır.Okuyucu ve yazıcı döngüleri çağrılarak Complete
sona erer. Complete
temel alınan Kanal'ın ayrılan belleği serbest bırakmasına izin verir.
İdeal olarak, okuma ve ayrıştırma birlikte çalışır:
Ayrıştırma genellikle ağdan veri bloklarını kopyalamaktan daha uzun sürer:
En iyi performans için sık duraklamalar ile daha fazla bellek ayırma arasında bir denge vardır.
Yukarıdaki sorunu çözmek için, Pipe
veri akışını denetlemek için iki ayarı vardır:
PipeWriter.FlushAsync
belirler.Pipe
PauseWriterThreshold
tamamlanmamış ValueTask<FlushResult>
bir değer döndürür.ResumeWriterThreshold
daha düşük olduğunda tamamlanırValueTask<FlushResult>
.Hızlı döngüleri önlemek için iki değer kullanılır ve bu değerlerden biri kullanılırsa ortaya çıkabilir.
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
Genellikle ve await
kullanırkenasync
, zaman uyumsuz kod veya TaskScheduler geçerli SynchronizationContextüzerinde devam eder.
G/Ç yapılırken G/Ç'nin nerede gerçekleştirildiği üzerinde ayrıntılı denetim sahibi olmak önemlidir. Bu denetim, CPU önbelleklerinden etkili bir şekilde yararlanmaya olanak tanır. Verimli önbelleğe alma, web sunucuları gibi yüksek performanslı uygulamalar için kritik öneme sahiptir. PipeScheduler zaman uyumsuz geri çağırmaların nerede çalıştırıldığı üzerinde denetim sağlar. Varsayılan olarak:
SynchronizationContext
yoksa, geri çağırmaları çalıştırmak için iş parçacığı havuzunu kullanır.public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool , iş parçacığı havuzuna geri çağırmaları kuyruğa alan uygulamadır PipeScheduler . PipeScheduler.ThreadPool
varsayılan ve genel olarak en iyi seçenektir. PipeScheduler.Inline kilitlenmeler gibi istenmeyen sonuçlara neden olabilir.
Nesneyi yeniden kullanmak sık sık verimlidir Pipe
. Kanalı sıfırlamak için hem hem de PipeReader
tamamlandıktan sonra çağırın.PipeReader Reset PipeWriter
PipeReader çağıranın adına belleği yönetir. çağrısı yaptıktan sonra her zaman arayın PipeReader.AdvanceTo PipeReader.ReadAsync. Bu, çağıranın PipeReader
bellekle ne zaman yapıldığını bilmelerini ve böylece izlenebilmesini sağlar. döndürülen ReadOnlySequence<byte>
PipeReader.ReadAsync
yalnızca çağrısına PipeReader.AdvanceTo
kadar geçerlidir. çağrısı PipeReader.AdvanceTo
yaptıktan sonra kullanmak ReadOnlySequence<byte>
geçersizdir.
PipeReader.AdvanceTo
iki SequencePosition bağımsız değişken alır:
Verileri tüketilen olarak işaretlemek, kanalın belleği temel alınan arabellek havuzuna döndürebileceği anlamına gelir. Verileri gözlemlenen olarak işaretlemek, bir sonraki çağrının PipeReader.ReadAsync
ne yaptığını denetler. Her şeyi gözlemlendi olarak işaretlemek, kanala daha fazla veri yazılana PipeReader.ReadAsync
kadar sonraki çağrısının döndürülmeyeceği anlamına gelir. Diğer tüm değerler, gözlemlenen ve gözlemlenmeyen verilerle hemen döndürülmek üzere PipeReader.ReadAsync
bir sonraki çağrıyı yapar, ancak zaten tüketilmiş olan verileri döndürmez.
Akış verilerini okumaya çalışırken ortaya çıkan birkaç tipik desen vardır:
Aşağıdaki örneklerde, bir ReadOnlySequence<byte>
'den iletileri ayrıştırma yöntemi kullanılırTryParseLines
. TryParseLines
tek bir iletiyi ayrıştırıp giriş arabelleğinden ayrıştırılan iletiyi kırpmak için güncelleştirir. TryParseLines
.NET'in bir parçası değildir, aşağıdaki bölümlerde kullanılan kullanıcı tarafından yazılmış bir yöntemdir.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Aşağıdaki kod' dan PipeReader
tek bir iletiyi okur ve çağırana döndürür.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Yukarıdaki kod:
SequencePosition
ve incelenen SequencePosition
öğesini kırpılan giriş arabelleğinin başlangıcına işaret eden şekilde güncelleştirir.İki SequencePosition
bağımsız değişken, ayrıştırılmış iletiyi giriş arabelleğinden kaldırdığından TryParseLines
güncelleştirilir. Genellikle, arabellekten tek bir iletiyi ayrıştırırken, incelenen konum aşağıdakilerden biri olmalıdır:
Tek bir ileti durumunda en fazla hata olasılığı vardır. İncelenmesi gereken yanlış değerlerin geçirilmesi bellek yetersiz özel durumu veya sonsuz döngüye neden olabilir. Daha fazla bilgi için bu makaledeki PipeReader yaygın sorunları bölümüne bakın.
Aşağıdaki kod, bir'den PipeReader
gelen tüm iletileri okur ve her birine çağrı ProcessMessageAsync
yapar.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader.ReadAsync
:
CancellationToken
bir OperationCanceledException atar.PipeReader.CancelPendingRead
, geçerli veya sonraki çağrısının PipeReader.ReadAsync
ile olarak IsCanceled
ayarlanmış bir ReadResult döndürmesine true
neden olur. Bu, mevcut okuma döngüsünü yıkıcı olmayan ve istisnai olmayan bir şekilde durdurmak için yararlı olabilir.private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Yanlış değerlerin veya değerlerinin geçirilmesi consumed
examined
, zaten okunan verilerin okunmasıyla sonuçlanabilir.
İncelendiği şekilde geçiş buffer.End
aşağıdakilere neden olabilir:
PipeReader.AdvanceTo(position, buffer.End)
arabellekten bir kerede tek bir iletiyi işlerken.Yanlış değerlerin öğesine consumed
geçirilmesi veya examined
sonsuz döngüye neden olabilir. Örneğin, değişmediyse, PipeReader.AdvanceTo(buffer.Start)
buffer.Start
yeni veriler gelmeden hemen önce bir sonraki çağrının döndürülmesi gerekir PipeReader.ReadAsync
.
Yanlış değerlerin 'e consumed
geçirilmesi veya examined
sonsuz arabelleğe alma (nihai OOM) ile sonuçlanabilir.
Çağrıdan PipeReader.AdvanceTo
sonra komutunun ReadOnlySequence<byte>
kullanılması bellek bozulmasına neden olabilir (ücretsiz kullanımdan sonra kullanın).
Çağrı PipeReader.Complete/CompleteAsync
yapılamaması bellek sızıntısına neden olabilir.
Arabelleği işlemeden önce okuma mantığının denetlenip ReadResult.IsCompleted çıkılması veri kaybına neden olur. Döngü çıkış koşulu ve ReadResult.IsCompleted
temel ReadResult.Buffer.IsEmpty
almalıdır. Bunu yanlış yapmak sonsuz döngüye neden olabilir.
❌Veri kaybı
ReadResult
olarak ayarlandığında true
verinin IsCompleted
son kesimini döndürebilir. Okuma döngüsünden çıkmadan önce bu verilerin okunmaması veri kaybına neden olur.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.
❌Sonsuz döngü
Aşağıdaki mantık, ise ancak arabellekte hiçbir zaman tam bir ileti yoksa sonsuz bir döngüye Result.IsCompleted
true
neden olabilir.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.
Aynı sorunla ilgili başka bir kod parçası aşağıdadır. denetlemeden önce boş olmayan bir arabelleği denetler ReadResult.IsCompleted
. içinde olduğundan, arabellekte else if
hiçbir zaman tam bir ileti yoksa sonsuza kadar döngüye alır.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.
❌Yanıt vermeyen uygulama
konumunda ile buffer.End
examined
koşulsuz çağrılmasıPipeReader.AdvanceTo
, uygulamanın tek bir iletiyi ayrıştırırken yanıt vermemeye başlamasına neden olabilir. Sonraki arama PipeReader.AdvanceTo
şu zamana kadar döndürülmeyecek:
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.
❌Yetersiz Bellek (OOM)
Aşağıdaki koşullarla, aşağıdaki kod bir OutOfMemoryException gerçekleşene kadar arabelleğe almayı tutar:
PipeReader
döndürülen veriler tam bir ileti oluşturmaz. Örneğin, diğer taraf büyük bir ileti yazdığından (örneğin, 4 GB'lık bir ileti) tam bir ileti oluşturmaz.Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.
❌Bellek Bozulması
Arabelleği okuyan yardımcılar yazarken, döndürülen yük çağrılmadan Advance
önce kopyalanmalıdır. Aşağıdaki örnek, atılan belleği Pipe
döndürür ve bir sonraki işlem (okuma/yazma) için yeniden kullanabilir.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. PipeReader Yaygın sorunlarını açıklamak için aşağıdaki örnek sağlanmıştır.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, kilitlenmelere, güvenlik sorunlarına neden olur ve KOPYALANMAMALIDIR. Önceki örnek, PipeReader Yaygın sorunlarını açıklamak için sağlanmıştır.
, PipeWriter arayan adına yazılacak arabellekleri yönetir. PipeWriter
uygular IBufferWriter<byte>
. IBufferWriter<byte>
ek arabellek kopyaları olmadan yazma işlemleri gerçekleştirmek için arabelleklere erişim elde etmek mümkün hale getirir.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Önceki kod:
PipeWriter
GetMemory."Hello"
için baytları döndürülen Memory<byte>
öğesine yazar.PipeWriter
temel alınan cihaza gönderen öğesini temizler.Önceki yazma yöntemi tarafından PipeWriter
sağlanan arabellekleri kullanır. Şunu da kullanmış PipeWriter.WriteAsyncolabilir:
PipeWriter
kopyalar.Advance
çağırır GetSpan
ve öğesini çağırırFlushAsync.async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
FlushAsync bir CancellationTokengeçirmeyi destekler. Bekleyen bir CancellationToken
temizleme işlemi varken belirteç iptal edilirse bir sonuç OperationCanceledException
geçirilir. PipeWriter.FlushAsync
, özel durum oluşturmadan aracılığıyla PipeWriter.CancelPendingFlush geçerli temizleme işlemini iptal etmenin bir yolunu destekler. Çağırma PipeWriter.CancelPendingFlush
, veya öğesine yapılan geçerli veya sonraki çağrının PipeWriter.FlushAsync
PipeWriter.WriteAsync
olarak ayarlanmış olarak bir FlushResult IsCanceled
döndürmesine true
neden olur. Bu, boşaltmayı yıkıcı olmayan ve istisnai olmayan bir şekilde durdurmak için yararlı olabilir.
GetMemory
veya GetSpan
tamamlanmamış bir arama FlushAsync
olduğunda güvenli değildir.CompleteAsync
Veya Complete
şişirilmemiş veriler olduğunda çağrılması bellek bozulmasına neden olabilir.Aşağıdaki ipuçları sınıfları başarıyla kullanmanıza System.IO.Pipelines yardımcı olur:
await
PipeWriter.FlushAsync ve her zaman denetleyin FlushResult.IsCompleted. okuyucunun tamamlandığını ve artık ne yazıldığını umursamadığını gösterdiğinden , ise IsCompleted
true
yazmayı durdurun.PipeReader
bir şey yazdıktan sonra arama PipeWriter.FlushAsync yapın.FlushAsync
başlayamazsa aramayın FlushAsync
çünkü bu kilitlenmeye neden olabilir.PipeReader
PipeWriter
olduğundan veya bu bağlamlara eriştiğine emin olun. Bu türler iş parçacığı açısından güvenli değildir.AdvanceTo
veya tamamladıktan sonra hiçbir zaman öğesine ReadResult.Buffer erişemezPipeReader
.IDuplexPipe, hem okumayı hem de yazmayı destekleyen türler için bir sözleşmedir. Örneğin, bir ağ bağlantısı ile IDuplexPipe
temsil edilir.
ve içeren PipeReader
sürümünden farklı Pipe
olarakPipeWriter
, IDuplexPipe
tam çift yönlü bağlantının tek bir tarafını temsil eder. Bu, öğesine PipeWriter
yazılanların'dan PipeReader
okunmayacağı anlamına gelir.
Akış verilerini okurken veya yazarken genellikle seri hale getirici kullanarak verileri okur ve seri hale getirici kullanarak veri yazarsınız. Bu okuma ve yazma akışı API'lerinin çoğunun parametresi Stream
vardır. Bu mevcut API'lerle PipeReader
tümleştirmeyi kolaylaştırmak ve PipeWriter
bir AsStream yöntemi kullanıma sunma. AsStreamveya PipeWriter
çevresinde PipeReader
bir Stream
uygulama döndürür.
PipeReader
ve PipeWriter
örnekleri, bir Stream nesne ve isteğe bağlı olarak ilgili oluşturma seçenekleri verilen statik Create
yöntemler kullanılarak oluşturulabilir.
aşağıdaki StreamPipeReaderOptions parametrelerle örneğin oluşturulması PipeReader
üzerinde denetime izin verir:
4096
olarak kullanılır.PipeReader
açık bırakılıp bırakılmayacağını belirler ve varsayılan olarak false
olarak ayarlanır.1024
.MemoryPool<byte>
bellek ayrılırken kullanılır ve varsayılan olarak null
kullanılır.aşağıdaki StreamPipeWriterOptions parametrelerle örneğin oluşturulması PipeWriter
üzerinde denetime izin verir:
PipeWriter
açık bırakılıp bırakılmayacağını belirler ve varsayılan olarak false
olarak ayarlanır.4096
ve varsayılan olarak olarak kullanılır.MemoryPool<byte>
bellek ayrılırken kullanılır ve varsayılan olarak null
kullanılır.Önemli
yöntemleri kullanarak Create
ve PipeWriter
örnekleri oluştururken PipeReader
nesne ömrünü Stream
göz önünde bulundurmanız gerekir. Okuyucu veya yazıcıyla işlem tamamlandıktan sonra akışa erişmeniz gerekiyorsa, oluşturma seçeneklerinde bayrağını true
olarak ayarlamanız LeaveOpen
gerekir. Aksi takdirde akış kapatılır.
Aşağıdaki kod, bir akıştan yöntemleri kullanarak ve PipeWriter
örneklerinin PipeReader
oluşturulmasını Create
gösterir.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
UygulamaStreamReader, lorem-ipsum.txt dosyasını akış olarak okumak için kullanır ve boş bir satırla bitmelidir. FileStream öğesine geçirilir PipeReader.Createve bu da bir PipeReader
nesnenin örneğini oluşturur. Konsol uygulaması daha sonra standart çıkış akışını kullanarak Console.OpenStandardOutput()öğesine PipeWriter.Create geçirir. Örnek iptali destekler.
.NET geri bildirimi
.NET, açık kaynak bir projedir. Geri bildirim sağlamak için bir bağlantı seçin:
Eğitim
Modül
Komutları bir işlem hattına bağlama - Training
Bu modülde komutları bir işlem hattına bağlamayı öğreneceksiniz.