Vorgehensweise: Implementieren eines Producer-Consumer-Musters
In diesem Artikel wird beschrieben, wie die TPL-Dataflowbibliothek verwendet wird, um ein Producer-Consumer-Muster zu implementieren. Bei diesem Muster sendet der Producer Nachrichten an einen Nachrichtenblock, während der Consumer Nachrichten aus diesem Block ausliest.
Hinweis
Die TPL-Datenflussbibliothek (System.Threading.Tasks.Dataflow-Namespace) wird nicht mit .NET ausgeliefert. Öffnen Sie zum Installieren des System.Threading.Tasks.Dataflow-Namespace in Visual Studio Ihr Projekt, wählen Sie im Menü Projekt die Option NuGet-Pakete verwalten aus, und suchen Sie online nach dem System.Threading.Tasks.Dataflow
-Paket. Alternativ können Sie es mithilfe der .NET Core-CLI installieren und dazu dotnet add package System.Threading.Tasks.Dataflow
ausführen.
Beispiel
Das folgende Beispiel zeigt ein grundlegendes Producer-Consumer-Modell, das Datenfluss verwendet. Die Produce
-Methode schreibt Arrays, die zufällige Datenbytes enthalten, in ein System.Threading.Tasks.Dataflow.ITargetBlock<TInput>-Objekt, und die Consume
-Methode liest Bytes aus einem System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>-Objekt. Da Sie sich mit den Schnittstellen ISourceBlock<TOutput> und ITargetBlock<TInput> befassen, anstatt mit ihren abgeleiteten Typen, können Sie wiederverwendbaren Code schreiben und diesen auf eine Vielzahl von Datenflussblock-Typen anwenden. In diesem Beispiel wird die BufferBlock<T>-Klasse verwendet. Da die BufferBlock<T>-Klasse sowohl als Quell- als auch als Zielblock fungiert, können der Producer und der Consumer ein freigegebenes Objekt zum Übertragen von Daten verwenden.
Die Produce
-Methode ruft die Post-Methode in einer Schleife auf, um die Daten synchron in den Zielblock zu schreiben. Nachdem die Produce
-Methode alle Daten in den Zielblock geschrieben hat, ruft sie die Complete-Methode auf, um anzugeben, dass der Block niemals über zusätzliche Daten verfügen wird. Die Consume
-Methode verwendet die Operatoren async und await (Async und Await in Visual Basic) zur asynchronen Berechnung der Gesamtzahl von Bytes, die vom ISourceBlock<TOutput>-Objekt empfangen werden. Für die Asynchronität ruft die Consume
-Methode die OutputAvailableAsync-Methode auf, um eine Benachrichtigung zu erhalten, wenn der Quellblock über Daten verfügt und wenn dem Quellblock niemals zusätzliche Daten zur Verfügung stehen werden.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
Stabile Programmierung
Im obigen Beispiel wird nur ein Consumer verwendet, um die Quelldaten zu verarbeiten. Wenn Sie in Ihrer Anwendung über mehrere Consumer verfügen, verwenden Sie die TryReceive-Methode, um wie im folgenden Beispiel Daten aus dem Quellblock zu lesen.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
Die TryReceive-Methode gibt False
zurück, wenn keine Daten verfügbar sind. Wenn mehrere Consumer gleichzeitig auf den Quellblock zugreifen müssen, gewährleistet dieser Mechanismus, dass Daten nach dem Aufruf von OutputAvailableAsync weiterhin verfügbar sind.