Anmerkung
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen, dich anzumelden oder die Verzeichnisse zu wechseln.
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen , die Verzeichnisse zu wechseln.
In diesem Artikel erfahren Sie, wie Sie die TPL-Datenflussbibliothek verwenden, um ein Produzenten-Consumer-Muster zu implementieren. In diesem Muster sendet der Produzent Nachrichten an einen Nachrichtenblock, und der Verbraucher liest Nachrichten aus diesem Block vor.
Hinweis
Die TPL-Datenflussbibliothek (der System.Threading.Tasks.Dataflow Namespace) ist in .NET 6 und höheren Versionen enthalten. Für .NET Framework- und .NET Standard-Projekte müssen Sie das 📦 NuGet-Paket "System.Threading.Tasks.Dataflow" installieren.
Example
Im folgenden Beispiel wird ein einfaches Hersteller-Consumer-Modell veranschaulicht, das Datenfluss verwendet. Die Produce Methode schreibt Arrays, die zufällige Byte von Daten in ein System.Threading.Tasks.Dataflow.ITargetBlock<TInput> Objekt enthalten, und die Consume Methode liest Bytes aus einem System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> Objekt. Indem Sie auf die ISourceBlock<TOutput>- und ITargetBlock<TInput>-Schnittstellen reagieren, anstatt ihrer abgeleiteten Typen, können Sie wiederverwendbaren Code schreiben, der auf eine Vielzahl von Datenflussblocktypen arbeiten kann. In diesem Beispiel wird die BufferBlock<T> Klasse verwendet. Da die BufferBlock<T> Klasse sowohl als Quellblock als auch als Zielblock fungiert, können der Produzent und der Konsument ein freigegebenes Objekt zum Übertragen von Daten verwenden.
Die Produce Methode ruft die Post Methode in einer Schleife auf, um Daten synchron in den Zielblock zu schreiben. Nachdem die Produce Methode alle Daten in den Zielblock geschrieben hat, ruft sie die Complete Methode auf, um anzugeben, dass der Block niemals zusätzliche Daten zur Verfügung hat. Die Consume Methode verwendet die asynchronen und await-Operatoren (Async und Await in Visual Basic), um die Gesamtanzahl der Bytes, die vom ISourceBlock<TOutput> Objekt empfangen werden, asynchron zu berechnen. Um asynchron zu arbeiten, ruft die Consume Methode die OutputAvailableAsync Methode auf, um eine Benachrichtigung zu erhalten, wenn der Quellblock Daten verfügbar hat und wann der Quellblock niemals zusätzliche Daten verfügbar sein wird.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
Robuste Programmierung
Im vorherigen Beispiel wird nur ein Verbraucher zum Verarbeiten der Quelldaten verwendet. Wenn Sie über mehrere Consumer in Ihrer Anwendung verfügen, verwenden Sie die TryReceive Methode, um Daten aus dem Quellblock zu lesen, wie im folgenden Beispiel gezeigt.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
Die TryReceive Methode gibt zurück False , wenn keine Daten verfügbar sind. Wenn mehrere Verbraucher gleichzeitig auf den Quellblock zugreifen müssen, garantiert dieser Mechanismus, dass Daten nach dem Aufruf OutputAvailableAsyncweiterhin verfügbar sind.