Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu makalede, üretici-tüketici deseni uygulamak için TPL veri akışı kitaplığını kullanmayı öğreneceksiniz. Bu düzende , üretici bir ileti bloğuna ileti gönderir ve tüketici bu bloktaki iletileri okur.
Uyarı
TPL veri akışı kitaplığı ( System.Threading.Tasks.Dataflow ad alanı), .NET 6 ve sonraki sürümlerde bulunur. .NET Framework ve .NET Standard projeleri için System.Threading.Tasks.Dataflow NuGet paketini yüklemeniz📦 gerekir.
Example
Aşağıdaki örnekte veri akışını kullanan temel bir üretici-tüketici modeli gösterilmektedir. yöntemi, Produce bir nesneye System.Threading.Tasks.Dataflow.ITargetBlock<TInput> rastgele veri baytları içeren diziler yazar ve Consume yöntem bir System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> nesneden bayt okur.
ISourceBlock<TOutput> ve ITargetBlock<TInput> arabirimleri üzerinde işlem yaparak, türetilmiş türler yerine, çeşitli veri akışı blok türleri üzerinde çalışabilecek yeniden kullanılabilir kodlar yazabilirsiniz. Bu örnekte sınıfı kullanılır BufferBlock<T> .
BufferBlock<T> sınıfı hem kaynak blok hem de hedef blok olarak davrandığından, üretici ve tüketici verileri aktarmak için paylaşılan bir nesne kullanabilir.
Produce yöntemi, veriyi zaman uyumlu bir şekilde hedef bloğa yazmak için Post yöntemini bir döngüde çağırır.
Produce yöntemi tüm verileri hedef bloğa yazdıktan sonra, bloğun Complete hiçbir zaman kullanılabilir ek veriye sahip olmadığını belirtmek için yöntemini çağırır. Yöntemi, Consume nesnesinden zaman uyumsuz olarak alınan toplam bayt sayısını hesaplamak için async ve await işleçlerini (Async ve Await in Visual Basic) kullanır. Eşzamansız olarak hareket etmek için Consume metodu, kaynak bloğunda kullanılabilir veri olduğunda ve hiçbir zaman ek veri olmayacağında bildirim almak üzere OutputAvailableAsync metodunu çağırır.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
Sağlam programlama
Yukarıdaki örnek, kaynak verileri işlemek için yalnızca bir tüketici kullanır. Uygulamanızda birden çok tüketici varsa, aşağıdaki örnekte gösterildiği gibi kaynak bloktaki verileri okumak için yöntemini kullanın TryReceive .
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
Yöntem TryReceive , kullanılabilir veri olmadığında döndürür False . Birden çok tüketicinin kaynak bloğuna eşzamanlı olarak erişmesi gerektiğinde, bu mekanizma çağrısından OutputAvailableAsyncsonra verilerin hala kullanılabilir olmasını garanti eder.