Datenfluss (Task Parallel Library)
Die Task Parallel Library (TPL) stellt Datenflusskomponenten bereit, um die Stabilität von nebenläufigkeitsfähigen Anwendungen zu erhöhen. Diese Datenflusskomponenten werden zusammen als TPL-Datenflussbibliothek bezeichnet. Dieses Datenflussmodell begünstigt die akteurbasierte Programmierung durch eine prozessinterne Nachrichtenübergabe für simple Datenfluss- und Pipelineaufgaben. Die Datenflusskomponenten basieren auf den Typen und der Planungsinfrastruktur der TPL und sind in die C#-, Visual Basic- und F#-Sprachunterstützung für asynchrone Programmierung integriert. Diese Datenflusskomponenten sind hilfreich, wenn mehrere Vorgänge vorliegen, die asynchron miteinander kommunizieren müssen, oder wenn Sie Daten verarbeiten möchten, die gerade verfügbar werden. Denken Sie beispielsweise an eine Anwendung, die Bilddaten von einer Webcam verarbeitet. Durch das Datenflussmodell kann die Anwendung Bildframes verarbeiten, sobald diese verfügbar sind. Wenn die Anwendung Bildframes beispielsweise durch Lichtkorrektur oder Rote-Augen-Reduktion aufbessert, können Sie eine Pipeline von Datenflusskomponenten erstellen. Jede Phase der Pipeline kann grober strukturierte Parallelitätsfunktionen verwenden, wie z. B. die von der TPL bereitgestellten Funktionen zum Transformieren des Bilds.
Dieses Dokument enthält eine Übersicht über die TPL-Datenflussbibliothek. Es bietet Informationen über das Programmiermodell, die vordefinierten Datenflussblocktypen sowie die Konfiguration der Datenflussblöcke für die speziellen Anforderungen Ihrer Anwendungen.
Hinweis
Die TPL-Datenflussbibliothek (System.Threading.Tasks.Dataflow-Namespace) wird nicht mit .NET ausgeliefert. Öffnen Sie zum Installieren des System.Threading.Tasks.Dataflow-Namespace in Visual Studio Ihr Projekt, wählen Sie im Menü Projekt die Option NuGet-Pakete verwalten aus, und suchen Sie online nach dem System.Threading.Tasks.Dataflow
-Paket. Alternativ können Sie es mithilfe der .NET Core-CLI installieren und dazu dotnet add package System.Threading.Tasks.Dataflow
ausführen.
Programmiermodell
Die TPL-Datenflussbibliothek bietet eine Grundlage für die Nachrichtenübergabe und die Parallelisierung CPU-intensiver und E/A-intensiver Anwendungen mit hohem Durchsatz und niedriger Latenz. Außerdem erhalten Sie damit explizite Kontrolle darüber, wie Daten gepuffert und im System übermittelt werden. Stellen Sie sich zum besseren Verständnis des Datenflussprogrammiermodells eine Anwendung vor, die Bilder asynchron vom Datenträger lädt und ein Kompositum dieser Bilder erstellt. Herkömmliche Programmiermodelle erfordern in der Regel die Verwendung von Rückrufen und Synchronisierungsobjekten wie Sperren, um Aufgaben und den Zugriff auf freigegebene Daten zu koordinieren. Mit dem Datenflussprogrammiermodell können Sie Datenflussobjekte erstellen, die Bilder beim Lesen vom Datenträger verarbeiten. Unter dem Datenflussmodell deklarieren Sie, wie Daten behandelt werden, sobald diese verfügbar werden, und legen außerdem Abhängigkeiten zwischen Daten fest. Da die Laufzeit Abhängigkeiten zwischen Daten verwaltet, können Sie häufig die Anforderung vermeiden, Zugriff auf freigegebene Daten synchronisieren zu müssen. Da die Arbeit von der Laufzeit basierend auf dem asynchronen Eintreffen von Daten geplant wird, können Reaktionsgeschwindigkeit und Durchsatz des Datenflusses verbessert werden, indem die zugrunde liegenden Threads effizient verwaltet werden. Ein Beispiel, bei dem mit dem Datenflussprogrammiermodell eine Bildverarbeitung in einer Windows Forms-Anwendung implementiert wird, finden Sie unter Walkthrough: Using Dataflow in a Windows Forms Application (Exemplarische Vorgehensweise: Verwenden von Datenflüssen in einer Windows Forms-Anwendung).
Quellen und Ziele
Die TPL-Datenflussbibliothek besteht aus Datenflussblöcken, bei denen es sich um Datenstrukturen handelt, die Daten puffern und verarbeiten. Die TPL definiert drei Arten von Datenflussblöcken: Quellblöcke, Zielblöcke und Weitergabeblöcke. Ein Quellblock fungiert als Datenquelle, aus der gelesen werden kann. Ein Zielblock fungiert als Datenempfänger, in den geschrieben werden kann. Ein Weitergabeblock fungiert als Quellblock und als Zielblock, aus dem gelesen und in den geschrieben werden kann. Die TPL definiert die System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>-Schnittstelle, um Quellen darzustellen, System.Threading.Tasks.Dataflow.ITargetBlock<TInput>, um Ziele darzustellen, und System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput>, um Weitergaben darzustellen. IPropagatorBlock<TInput,TOutput> erbt von ISourceBlock<TOutput> und ITargetBlock<TInput>.
Die TPL-Datenflussbibliothek enthält mehrere vordefinierte Datenflussblocktypen, die die Schnittstellen ISourceBlock<TOutput>, ITargetBlock<TInput> und IPropagatorBlock<TInput,TOutput> implementieren. Diese Datenflussblocktypen werden in diesem Dokument im Abschnitt Vordefinierte Datenflussblocktypen beschrieben.
Verbinden von Blöcken
Sie können Datenflussblöcke mit Formularpipelines (lineare Sequenzen von Datenflussblöcken) oder Netzwerken (Diagramme von Datenflussblöcken) verbinden. Eine Pipeline ist eine Form von Netzwerk. In einer Pipeline oder einem Netzwerk geben Quellen asynchron Daten an Ziele weiter, sobald diese Daten verfügbar werden. Die ISourceBlock<TOutput>.LinkTo-Methode verknüpft einen Quelldatenflussblock mit einem Zielblock. Eine Quelle kann mit null oder mehr Zielen verknüpft werden. Ziele können mit null oder mehr Quellen verknüpft werden. Sie können Datenflussblöcke in einer Pipeline oder einem Netzwerk gleichzeitig hinzufügen oder entfernen. Die vordefinierten Datenflussblocktypen behandeln alle Threadsicherheitsaspekte bezüglich Verknüpfungen und des Lösens von Verknüpfungen.
Ein Beispiel für die Verbindung von Datenflussblöcken zum Erstellen einer einfachen Pipeline finden Sie unter Walkthrough: Creating a Dataflow Pipeline (Exemplarische Vorgehensweise: Erstellen einer Datenflusspipeline). Ein Beispiel für die Verbindung von Datenflussblöcken zum Erstellen eines komplexeren Netzwerks finden Sie unter Walkthrough: Using Dataflow in a Windows Forms Application (Exemplarische Vorgehensweise: Verwenden von Datenflüssen in einer Windows Forms-Anwendung). Ein Beispiel, bei dem die Verknüpfung eines Ziels mit einer Quelle gelöst wird, nachdem die Quelle dem Ziel eine Nachricht anbietet, finden Sie unter Vorgehensweise: Aufheben der Verknüpfungen von Datenflussblöcken.
Filtern
Wenn Sie die ISourceBlock<TOutput>.LinkTo-Methode aufrufen, um eine Quelle mit einem Ziel zu verknüpfen, können Sie einen Delegaten bereitstellen, der anhand des Werts einer Nachricht bestimmt, ob der Zielblock die Nachricht annimmt oder ablehnt. Dieser Filtermechanismus ist eine hilfreiche Möglichkeit, um sicherzustellen, dass nur bestimmte Werte von einem Datenflussblock empfangen werden. Für die meisten der vordefinierten Datenflussblocktypen gilt, dass eine Nachricht von der Quelle dem nächsten Ziel angeboten wird, wenn ein Quellblock mit mehreren Zielblöcken verbunden ist und ein Zielblock die Nachricht ablehnt. Die Reihenfolge, in der Nachrichten von einer Quelle Zielen angeboten werden, wird durch die Quelle definiert und kann je nach Typ der Quelle variieren. Die meisten Quellblocktypen bieten eine Nachricht nicht weiter an, nachdem diese von einem Ziel akzeptiert wurde. Eine Ausnahme dieser Regel ist die BroadcastBlock<T>-Klasse, die jede Nachricht allen Zielen anbietet, auch wenn einige Ziele die Nachricht ablehnen. Ein Beispiel, bei dem durch Filterung nur bestimmte Nachrichten verarbeitet werden, finden Sie unter Walkthrough: Using Dataflow in a Windows Forms Application (Exemplarische Vorgehensweise: Verwenden von Datenflüssen in einer Windows Forms-Anwendung).
Wichtig
Da jeder vordefinierte Quelldatenflussblocktyp sicherstellt, dass Nachrichten in der Reihenfolge weitergegeben werden, in der sie empfangen werden, muss jede Nachricht vom Quellblock gelesen werden, bevor der Quellblock die nächste Nachricht verarbeiten kann. Wenn Sie daher Filter verwenden, um mehrere Ziele mit einer Quelle zu verbinden, sollten Sie sicherstellen, dass jede Nachricht von mindestens einem Zielblock empfangen wird. Andernfalls kann bei der Anwendung ein Deadlock auftreten.
Nachrichtenübergabe
Das Datenflussprogrammiermodell bezieht sich auf das Konzept der Nachrichtenübergabe, bei dem unabhängige Komponenten eines Programms miteinander kommunizieren, indem sie Nachrichten senden. Eine Möglichkeit zur Weitergabe von Nachrichten zwischen Anwendungskomponenten besteht darin, die Post-Methode (synchron) und die SendAsync-Methode (asynchron) aufzurufen, um Nachrichten an Zieldatenflussblöcke zu senden, und die Methoden Receive, ReceiveAsync und TryReceive aufzurufen, um Nachrichten von Quellblöcken zu empfangen. Sie können diese Methoden mit Datenflusspipelines oder -netzwerken kombinieren, indem Sie Eingabedaten an den Hauptknoten (ein Zielblock) senden und Ausgabedaten vom Terminalknoten der Pipeline bzw. von den Terminalknoten des Netzwerks (ein oder mehrere Quellblöcke) empfangen. Sie können auch die Choose-Methode verwenden, um aus der ersten der bereitgestellten Quellen zu lesen, die Daten zur Verfügung stellt, und Aktionen für diese Daten ausführen.
Quellblöcke bieten Zielblöcken Daten an, indem die ITargetBlock<TInput>.OfferMessage-Methode aufgerufen wird. Der Zielblock reagiert auf eine angebotene Nachricht auf eine von drei Arten: Er kann die Nachricht akzeptieren, ablehnen oder zurückstellen. Wenn das Ziel die Nachricht akzeptiert, gibt die OfferMessage-Methode Accepted zurück. Wenn das Ziel die Nachricht ablehnt, gibt die OfferMessage-Methode Declined zurück. Wenn das Ziel erfordert, dass es keine weiteren Nachrichten mehr von der Quelle empfängt, gibt die OfferMessage-Methode DecliningPermanently zurück. Die vordefinierten Quellblocktypen bieten verknüpften Zielen keine Nachrichten an, nachdem ein solcher Rückgabewert empfangen wird, und die Verknüpfung zu solchen Zielen wird automatisch gelöst.
Wenn ein Zielblock die Nachricht für die spätere Verwendung zurückstellt, gibt die OfferMessage-Methode Postponed zurück. Ein Zielblock, der eine Nachricht zurückstellt, kann später die Methode ISourceBlock<TOutput>.ReserveMessage aufrufen, um zu versuchen, die angebotene Nachricht zu reservieren. In diesem Moment ist die Nachricht entweder weiterhin verfügbar und kann vom Zielblock verwendet werden, oder die Nachricht wurde von einem anderen Ziel angenommen. Wenn der Zielblock die Nachricht später benötigt oder die Nachricht nicht mehr benötigt, wird die ISourceBlock<TOutput>.ConsumeMessage- bzw. die ReleaseReservation-Methode aufgerufen. Nachrichtenreservierung wird in der Regel von Datenflussblocktypen verwendet, die sich im nicht gierigen Modus befinden. "Nicht gieriger Modus" wird weiter unten in diesem Dokument erläutert. Anstatt eine zurückgestellte Nachricht zu reservieren, kann ein Zielblock auch die ISourceBlock<TOutput>.ConsumeMessage-Methode verwenden, um zu versuchen, die zurückgestellte Nachricht direkt zu verarbeiten.
Abschluss von Datenflussblöcken
Datenflussblöcke unterstützen auch das Konzept des Abschlusses. Ein Datenflussblock, der sich im abgeschlossenen Zustand befindet, führt keine weitere Arbeit mehr aus. Jeder Datenflussblock verfügt über ein zugehöriges System.Threading.Tasks.Task-Objekt, das als Abschlussaufgabe bezeichnet wird und den Abschlusszustand des Blocks darstellt. Da Sie mithilfe von Abschlussaufgaben darauf warten können, dass ein Task-Objekt abgeschlossen wird, können Sie auf den Abschluss von einem oder mehreren Terminalknoten eines Datenflussnetzwerks warten. Die IDataflowBlock-Schnittstelle definiert die Complete-Methode, die den Datenflussblock über eine abzuschließende Anforderung informiert, sowie die Completion-Eigenschaft, die die Abschlussaufgabe für den Datenflussblock zurückgibt. ISourceBlock<TOutput> und ITargetBlock<TInput> erben beide von der IDataflowBlock-Schnittstelle.
Es gibt zwei Möglichkeiten, um zu bestimmen, ob ein Datenflussblock ohne Fehler abgeschlossen wurde, ob mindestens ein Fehler aufgetreten ist oder ob er abgebrochen wurde. Die erste Möglichkeit besteht darin, die Task.Wait-Methode für die Abschlussaufgabe in einem try
---Block (catch
Try
Catch
in Visual Basic) aufzurufen. Im folgenden Beispiel wird ein ActionBlock<TInput>-Objekt erstellt, das ArgumentOutOfRangeException auslöst, wenn der Eingabewert kleiner als null ist. AggregateException wird ausgelöst, wenn in diesem Beispiel Wait für die Abschlussaufgabe aufgerufen wird. Der Zugriff auf ArgumentOutOfRangeException erfolgt über die InnerExceptions-Eigenschaft des AggregateException-Objekts.
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine("n = {0}", n);
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine("Encountered {0}: {1}",
e.GetType().Name, e.Message);
return true;
});
}
/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
In diesem Beispiel wird der Fall erläutert, in dem eine Ausnahme im Delegaten eines Ausführungsdatenflussblocks unbehandelt bleibt. Es wird empfohlen, Ausnahmen in den Codetexten solcher Blöcke zu behandeln. Wenn dies jedoch nicht möglich ist, verhält sich der Block, als ob er abgebrochen wurde, und verarbeitet keine eingehenden Nachrichten.
Wenn ein Datenflussblock explizit abgebrochen wird, enthält das AggregateException-Objekt OperationCanceledException in der InnerExceptions-Eigenschaft. Weitere Informationen zum Abbrechen von Datenflüssen finden Sie im Abschnitt Aktivieren des Abbruchs.
Die zweite Möglichkeit, den Abschlussstatus eines Datenflussblocks zu bestimmen, ist die Verwendung einer Fortsetzung außerhalb der Abschlussaufgabe oder die Verwendung der asynchronen Sprachfunktionen von C# und Visual Basic, um auf die Abschlussaufgabe asynchron zu warten. Der Delegat, den Sie für die Task.ContinueWith-Methode bereitstellen, nimmt ein Task-Objekt an, das die vorangehende Aufgabe darstellt. Im Fall der Completion-Eigenschaft nimmt der Delegat für die Fortsetzung die Abschlussaufgabe selbst an. Das folgende Beispiel ähnelt dem vorherigen, es wird jedoch auch die ContinueWith-Methode verwendet, um eine Abschlussaufgabe zu erstellen, die den Status des gesamten Datenflussvorgangs ausgibt.
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine("n = {0}", n);
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
Console.WriteLine("The status of the completion task is '{0}'.",
task.Status);
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine("Encountered {0}: {1}",
e.GetType().Name, e.Message);
return true;
});
}
/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Create a continuation task that prints the overall
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' The status of the completion task is 'Faulted'.
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
Sie können auch Eigenschaften wie IsCanceled im Codetext der Fortsetzungsaufgabe verwenden, um zusätzliche Informationen über den Abschlussstatus eines Datenflussblocks zu ermitteln. Weitere Informationen zu Fortsetzungsaufgaben und deren Beziehung zum Abbruch und zur Fehlerbehandlung finden Sie unter Verketten von Aufgaben durch Fortsetzungsaufgaben, Aufgabenabbruch und Ausnahmebehandlung.
Vordefinierte Datenflussblocktypen
Die TPL-Datenflussbibliothek enthält mehrere vordefinierte Datenflussblocktypen. Diese Typen sind in drei Kategorien unterteilt: Pufferblöcke, Ausführungsblöcke und Gruppierungsblöcke. In den folgenden Abschnitten werden die Blocktypen dieser Kategorien beschrieben.
Pufferblöcke
Pufferblöcke enthalten Daten zur Verwendung durch Datenconsumer. Die TPL-Datenflussbibliothek enthält drei Pufferblocktypen: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T> und System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.
BufferBlock<T>
Die BufferBlock<T>-Klasse stellt eine allgemeine asynchrone Nachrichtenstruktur dar. Diese Klasse speichert eine FIFO-Nachrichtenwarteschlange (First In, First Out), in die mehrere Quellen Nachrichten schreiben oder aus der mehrere Ziele Nachrichten auslesen können. Wenn ein Ziel eine Nachricht von einem BufferBlock<T>-Objekt empfängt, wird diese Nachricht aus der Nachrichtenwarteschlange entfernt. Daher können die einzelnen Nachrichten nur von einem Ziel empfangen werden, obwohl ein BufferBlock<T>-Objekt mehrere Ziele haben kann. Die BufferBlock<T>-Klasse ist hilfreich, wenn Sie mehrere Nachrichten an eine andere Komponente übergeben möchten und diese Komponente alle Nachrichten empfangen muss.
Im folgenden grundlegenden Beispiel werden mehrere Int32-Werte an ein BufferBlock<T>-Objekt gesendet und anschließend wieder aus dem Objekt gelesen.
// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
/* Output:
0
1
2
*/
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()
' Post several messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
' Output:
' 0
' 1
' 2
'
Ein vollständiges Beispiel zur Veranschaulichung, wie Nachrichten in ein BufferBlock<T>-Objekt geschrieben oder daraus gelesen werden, finden Sie unter Vorgehensweise: Schreiben und Lesen von Nachrichten in einem Datenflussblock.
BroadcastBlock<T>
Die BroadcastBlock<T>-Klasse ist hilfreich, wenn mehrere Nachrichten an eine andere Komponente übergeben werden müssen, diese Komponente jedoch nur den letzten Wert benötigt. Diese Klasse ist darüber hinaus auch hilfreich, wenn Sie eine Nachricht an mehreren Komponenten übertragen möchten.
Im folgenden grundlegenden Beispiel wird ein Double-Wert an ein BroadcastBlock<T>-Objekt gesendet und dann mehrmals aus dem Objekt gelesen. Da Werte nicht aus BroadcastBlock<T>-Objekten entfernt werden, nachdem sie gelesen wurden, ist der gleiche Wert jedes Mal verfügbar.
// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);
// Post a message to the block.
broadcastBlock.Post(Math.PI);
// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(broadcastBlock.Receive());
}
/* Output:
3.14159265358979
3.14159265358979
3.14159265358979
*/
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)
' Post a message to the block.
broadcastBlock.Post(Math.PI)
' Receive the messages back from the block several times.
For i As Integer = 0 To 2
Console.WriteLine(broadcastBlock.Receive())
Next i
' Output:
' 3.14159265358979
' 3.14159265358979
' 3.14159265358979
'
Ein vollständiges Beispiel zur Veranschaulichung, wie eine Nachricht mit BroadcastBlock<T> an mehrere Zielblöcke gesendet wird, finden Sie unter Vorgehensweise: Angeben eines Taskplaners in einem Datenflussblock.
WriteOnceBlock<T>
Die WriteOnceBlock<T>-Klasse ähnelt der BroadcastBlock<T>-Klasse, ein WriteOnceBlock<T>-Objekt kann jedoch nur einmal hineingeschrieben werden. WriteOnceBlock<T> ähnelt dem readonly-Schlüsselwort in C# (ReadOnly in Visual Basic), ein WriteOnceBlock<T>-Objekt wird jedoch unveränderlich, nachdem es einen Wert anstatt einer Konstruktion empfängt. Wenn ein Ziel eine Nachricht von einem BroadcastBlock<T>-Objekt empfängt, wird diese Nachricht wie bei der WriteOnceBlock<T>-Klasse nicht aus diesem Objekt entfernt. Daher empfangen mehrere Ziele eine Kopie der Nachricht. Die WriteOnceBlock<T>-Klasse ist hilfreich, wenn Sie nur die erste von mehreren Nachrichten weitergeben möchten.
Im folgenden Beispiel werden mehrere String-Werte an ein WriteOnceBlock<T>-Objekt gesendet. Anschließend wird der Wert aus dem Objekt gelesen. Da in ein WriteOnceBlock<T>-Objekt nur einmal geschrieben werden kann, nachdem ein WriteOnceBlock<T>-Objekt eine Nachricht empfängt, werden nachfolgende Nachrichten verworfen.
// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);
// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
() => writeOnceBlock.Post("Message 1"),
() => writeOnceBlock.Post("Message 2"),
() => writeOnceBlock.Post("Message 3"));
// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());
/* Sample output:
Message 2
*/
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)
' Post several messages to the block in parallel. The first
' message to be received is written to the block.
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))
' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())
' Sample output:
' Message 2
'
Ein vollständiges Beispiel zur Veranschaulichung, wie mit WriteOnceBlock<T> der Wert des ersten abgeschlossenen Vorgangs empfangen wird, finden Sie unter Vorgehensweise: Aufheben der Verknüpfungen von Datenflussblöcken.
Ausführungsblöcke
Ausführungsblöcke rufen einen vom Benutzer bereitgestellten Delegaten für jedes Element empfangener Daten auf. Die TPL-Datenflussbibliothek enthält drei Typen von Ausführungsblöcken: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> und System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.
ActionBlock<T>
Die ActionBlock<TInput>-Klasse ist ein Zielblock, der einen Delegaten aufruft, wenn er Daten empfängt. Stellen Sie sich ein ActionBlock<TInput>-Objekt als Delegaten vor, der asynchron ausgeführt wird, wenn Daten verfügbar werden. Der Delegat, den Sie für ein ActionBlock<TInput>-Objekt bereitstellen, kann vom Typ Action<T> oder vom Typ System.Func<TInput, Task>
sein. Wenn Sie ein ActionBlock<TInput>-Objekt mit Action<T> verwenden, wird die Verarbeitung jedes Eingabeelements als abgeschlossen betrachtet, wenn der Delegat zurückgegeben wird. Wenn Sie ein ActionBlock<TInput>-Objekt mit System.Func<TInput, Task>
verwenden, wird die Verarbeitung jedes Eingabeelements nur dann als abgeschlossen betrachtet, wenn das zurückgegebene Task-Objekt abgeschlossen ist. Mit diesen beiden Mechanismen können Sie ActionBlock<TInput> zur synchronen und asynchronen Verarbeitung der einzelnen Eingabeelemente verwenden.
Im folgenden grundlegenden Beispiel werden mehrere Int32-Werte an ein ActionBlock<TInput>-Objekt gesendet. Das ActionBlock<TInput>-Objekt gibt diese Werte in der Konsole aus. Im Beispiel wird der Block dann in den abgeschlossenen Zustand versetzt, und es wird gewartet, bis alle Datenflussaufgaben abgeschlossen sind.
// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
actionBlock.Post(i * 10);
}
// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();
/* Output:
0
10
20
*/
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))
' Post several messages to the block.
For i As Integer = 0 To 2
actionBlock.Post(i * 10)
Next i
' Set the block to the completed state and wait for all
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()
' Output:
' 0
' 10
' 20
'
Ausführliche Beispiele für die Verwendung von Delegaten mit der ActionBlock<TInput>-Klasse finden Sie unter Vorgehensweise: Ausführen einer Aktion, wenn ein Datenflussblock Daten empfängt.
TransformBlock<TInput, TOutput>
Die TransformBlock<TInput,TOutput>-Klasse ähnelt der ActionBlock<TInput>-Klasse, sie fungiert jedoch als Quelle und als Ziel. Der Delegat, den Sie an ein TransformBlock<TInput,TOutput>-Objekt übergeben, gibt einen Wert vom Typ TOutput
zurück. Der Delegat, den Sie für ein TransformBlock<TInput,TOutput>-Objekt bereitstellen, kann vom Typ System.Func<TInput, TOutput>
oder vom Typ System.Func<TInput, Task<TOutput>>
sein. Wenn Sie ein TransformBlock<TInput,TOutput>-Objekt mit System.Func<TInput, TOutput>
verwenden, wird die Verarbeitung jedes Eingabeelements als abgeschlossen betrachtet, wenn der Delegat zurückgegeben wird. Wenn Sie ein TransformBlock<TInput,TOutput>-Objekt mit System.Func<TInput, Task<TOutput>>
verwenden, wird die Verarbeitung jedes Eingabeelements nur dann als abgeschlossen betrachtet, wenn das zurückgegebene Task<TResult>-Objekt abgeschlossen ist. Wie bei ActionBlock<TInput> können Sie mit diesen beiden Mechanismen TransformBlock<TInput,TOutput> zur synchronen und asynchronen Verarbeitung der einzelnen Eingabeelemente verwenden.
Im folgenden grundlegenden Beispiel wird ein TransformBlock<TInput,TOutput>-Objekt erstellt, das die Quadratwurzel seiner Eingabe berechnet. Das TransformBlock<TInput,TOutput>-Objekt nimmt Int32-Werte als Eingabe an und erzeugt Double-Werte als Ausgabe.
// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));
// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);
// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(transformBlock.Receive());
}
/* Output:
3.16227766016838
4.47213595499958
5.47722557505166
*/
' Create a TransformBlock<int, double> object that
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))
' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)
' Read the output messages from the block.
For i As Integer = 0 To 2
Console.WriteLine(transformBlock.Receive())
Next i
' Output:
' 3.16227766016838
' 4.47213595499958
' 5.47722557505166
'
Ausführliche Beispiele für die Verwendung von TransformBlock<TInput,TOutput> in einem Netzwerk von Datenflussblöcken, das eine Bildverarbeitung in einer Windows Forms-Anwendung ausführt, finden Sie unter Walkthrough: Using Dataflow in a Windows Forms Application (Exemplarische Vorgehensweise: Verwenden von Datenflüssen in einer Windows Forms-Anwendung).
TransformManyBlock<TInput, TOutput>
Die TransformManyBlock<TInput,TOutput>-Klasse ähnelt der TransformBlock<TInput,TOutput>-Klasse, außer dass TransformManyBlock<TInput,TOutput> null oder mehr Ausgabewerte für jeden Eingabewert erzeugt, anstatt nur eines Ausgabewerts für jeden Eingabewert. Der Delegat, den Sie für ein TransformManyBlock<TInput,TOutput>-Objekt bereitstellen, kann vom Typ System.Func<TInput, IEnumerable<TOutput>>
oder vom Typ System.Func<TInput, Task<IEnumerable<TOutput>>>
sein. Wenn Sie ein TransformManyBlock<TInput,TOutput>-Objekt mit System.Func<TInput, IEnumerable<TOutput>>
verwenden, wird die Verarbeitung jedes Eingabeelements als abgeschlossen betrachtet, wenn der Delegat zurückgegeben wird. Wenn Sie ein TransformManyBlock<TInput,TOutput>-Objekt mit System.Func<TInput, Task<IEnumerable<TOutput>>>
verwenden, wird die Verarbeitung jedes Eingabeelements nur dann als abgeschlossen betrachtet, wenn das zurückgegebene System.Threading.Tasks.Task<IEnumerable<TOutput>>
-Objekt abgeschlossen ist.
Im folgenden grundlegenden Beispiel wird ein TransformManyBlock<TInput,TOutput>-Objekt erstellt, das Zeichenfolgen in ihre einzelnen Zeichensequenzen aufteilt. Das TransformManyBlock<TInput,TOutput>-Objekt nimmt String-Werte als Eingabe an und erzeugt Char-Werte als Ausgabe.
// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
s => s.ToCharArray());
// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");
// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
Console.WriteLine(transformManyBlock.Receive());
}
/* Output:
H
e
l
l
o
W
o
r
l
d
*/
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())
' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")
' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
Console.WriteLine(transformManyBlock.Receive())
Next i
' Output:
' H
' e
' l
' l
' o
' W
' o
' r
' l
' d
'
Ausführliche Beispiele, bei denen mit TransformManyBlock<TInput,TOutput> mehrere unabhängige Ausgaben für jede Eingabe in einer Datenflusspipeline erzeugt werden, finden Sie unter Walkthrough: Creating a Dataflow Pipeline (Exemplarische Vorgehensweise: Erstellen einer Datenflusspipeline).
Grad der Parallelität
Jedes ActionBlock<TInput>-, TransformBlock<TInput,TOutput>- und TransformManyBlock<TInput,TOutput>-Objekt puffert eingehende Nachrichten, bis der Block bereit für die Verarbeitung ist. In der Standardeinstellung verarbeiten diese Klassen Nachrichten nacheinander in der Reihenfolge, in der sie empfangen werden. Sie können auch den Grad der Parallelität angeben, sodass ActionBlock<TInput>-, TransformBlock<TInput,TOutput>- und TransformManyBlock<TInput,TOutput>-Objekte mehrere Nachrichten gleichzeitig verarbeiten können. Weitere Informationen über die gleichzeitige Ausführung finden Sie im Abschnitt "Festlegen des Grads der Parallelität" weiter unten in diesem Dokument. Ein Beispiel, bei dem der Grad der Parallelität so festgelegt wird, dass ein Ausführungsdatenflussblock mehrere Nachrichten gleichzeitig verarbeiten kann, finden Sie unter Vorgehensweise: Festlegen des Parallelitätsgrads in einem Datenflussblock.
Übersicht über Delegattypen
In der folgenden Tabelle sind die Delegattypen zusammengefasst, die Sie für ActionBlock<TInput>-, TransformBlock<TInput,TOutput>- und TransformManyBlock<TInput,TOutput>-Objekte bereitstellen können. In dieser Tabelle wird auch angegeben, ob der Delegattyp synchron oder asynchron arbeitet.
Typ | Synchroner Delegattyp | Asynchroner Delegattyp |
---|---|---|
ActionBlock<TInput> | System.Action |
System.Func<TInput, Task> |
TransformBlock<TInput,TOutput> | System.Func<TInput, TOutput> |
System.Func<TInput, Task<TOutput>> |
TransformManyBlock<TInput,TOutput> | System.Func<TInput, IEnumerable<TOutput>> |
System.Func<TInput, Task<IEnumerable<TOutput>>> |
Sie können auch Lambda-Ausdrücke verwenden, wenn Sie mit Ausführungsblocktypen arbeiten. Ein Beispiel für die Verwendung eines Lambda-Ausdrucks mit einem Ausführungsblock finden Sie unter Vorgehensweise: Ausführen einer Aktion, wenn ein Datenflussblock Daten empfängt.
Gruppierungsblöcke
Gruppierungsblöcke kombinieren Daten aus einer oder mehreren Quellen und unter verschiedenen Einschränkungen. Die TPL-Datenflussbibliothek enthält drei Typen von Gruppierungsblöcken: BatchBlock<T>, JoinBlock<T1,T2> und BatchedJoinBlock<T1,T2>.
BatchBlock<T>
Die BatchBlock<T>-Klasse kombiniert Sätze von Eingabedaten, die als Batches bezeichnet werden, in Arrays von Ausgabedaten. Beim Erstellen eines BatchBlock<T>-Objekts geben Sie die Größe von jedem Batch an. Wenn das BatchBlock<T>-Objekt die angegebene Anzahl von Eingabeelementen empfängt, wird ein Array mit diesen Elementen asynchron weitergegeben. Wenn ein BatchBlock<T>-Objekt in den abgeschlossenen Zustand versetzt wird, jedoch nicht genügend Elemente zum Bilden eines Batches enthält, wird ein abschließendes Array mit den restlichen Eingabeelementen weitergegeben.
Die BatchBlock<T>-Klasse arbeitet entweder im gierigen oder im nicht gierigen Modus. Im gierigen Modus (Standard) nimmt ein BatchBlock<T>-Objekt jede Nachricht an, die angeboten wird, und gibt ein Array weiter, nachdem die angegebene Anzahl von Elementen empfangen wurde. Im nicht gierigen Modus stellt ein BatchBlock<T>-Objekt alle eingehenden Nachrichten zurück, bis dem Block genügend Nachrichten von Quellen angeboten wurden, um einen Batch zu bilden. Der gierige Modus erzielt in der Regel eine bessere Leistung als der nicht gierige Modus, da er weniger Verarbeitungsaufwand erfordert. Sie können den nicht gierigen Modus jedoch verwenden, wenn Sie die Nutzung von mehreren Quellen auf atomische Weise koordinieren müssen. Legen Sie den nicht gierigen Modus fest, indem Sie Greedy auf False
im dataflowBlockOptions
Parameter im BatchBlock<T>-Konstruktor festlegen.
Im folgenden grundlegenden Beispiel werden mehrere Int32-Werte an ein BatchBlock<T>-Objekt gesendet, das zehn Elemente in einem Batch enthält. Um sicherzustellen, dass alle Werte aus BatchBlock<T> weitergegeben werden, wird in diesem Beispiel die Complete-Methode aufgerufen. Die Complete-Methode legt das BatchBlock<T>-Objekt auf den abgeschlossenen Zustand fest. Daher werden vom BatchBlock<T>-Objekt alle verbleibenden Elemente in einem abschließenden Batch weitergegeben.
// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);
// Post several values to the block.
for (int i = 0; i < 13; i++)
{
batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();
// Print the sum of both batches.
Console.WriteLine("The sum of the elements in batch 1 is {0}.",
batchBlock.Receive().Sum());
Console.WriteLine("The sum of the elements in batch 2 is {0}.",
batchBlock.Receive().Sum());
/* Output:
The sum of the elements in batch 1 is 45.
The sum of the elements in batch 2 is 33.
*/
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)
' Post several values to the block.
For i As Integer = 0 To 12
batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()
' Print the sum of both batches.
Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())
Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())
' Output:
' The sum of the elements in batch 1 is 45.
' The sum of the elements in batch 2 is 33.
'
Ein vollständiges Beispiel für die Verwendung von BatchBlock<T> zur Verbesserung der Effizienz von Datenbankeinfügevorgängen finden Sie unter Exemplarische Vorgehensweise: Effizienzverbesserung durch Verwendung von BatchBlock und BatchedJoinBlock.
JoinBlock<T1, T2, ...>
Die JoinBlock<T1,T2>- und die JoinBlock<T1,T2,T3>-Klasse sammeln Eingabeelemente und geben System.Tuple<T1,T2>- oder System.Tuple<T1,T2,T3>-Objekte weiter, die diese Elemente enthalten. Die JoinBlock<T1,T2>- und die JoinBlock<T1,T2,T3>-Klasse erben nicht von ITargetBlock<TInput>. Stattdessen stellen sie die Eigenschaften Target1, Target2 und Target3 bereit, die ITargetBlock<TInput> implementieren.
Wie BatchBlock<T> arbeiten auch JoinBlock<T1,T2> und JoinBlock<T1,T2,T3> entweder im gierigen oder im nicht gierigen Modus. Im gierigen Modus (Standard) nimmt ein JoinBlock<T1,T2>- oder JoinBlock<T1,T2,T3>-Objekt jede Nachricht an, die angeboten wird, und gibt ein Tupel weiter, nachdem jedes der zugehörigen Ziele mindestens eine Nachricht empfangen hat. Im nicht gierigen Modus stellt ein JoinBlock<T1,T2>- oder JoinBlock<T1,T2,T3>-Objekt alle eingehenden Nachrichten zurück, bis allen Zielen die Daten angeboten wurden, die zum Erstellen eines Tupels erforderlich sind. An diesem Punkt initiiert der Block ein Zweiphasencommit-Protokoll, um alle erforderlichen Elemente aus den Quellen atomar abzurufen. Durch diese Zurückstellung ist es möglich, dass eine andere Entität die Daten in der Zwischenzeit nutzt, sodass das gesamte System mit der Verarbeitung fortschreitet.
Das folgende grundlegende Beispiel zeigt einen Fall, in dem ein JoinBlock<T1,T2,T3>-Objekt mehrere Daten benötigt, einen Wert zu berechnen. In diesem Beispiel wird ein JoinBlock<T1,T2,T3>-Objekt erstellt, das zwei Int32-Werte und einen Char-Wert erfordert, um eine arithmetische Operation durchzuführen.
// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();
// Post two values to each target of the join.
joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);
joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);
joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');
// Receive each group of values and apply the operator part
// to the number parts.
for (int i = 0; i < 2; i++)
{
var data = joinBlock.Receive();
switch (data.Item3)
{
case '+':
Console.WriteLine("{0} + {1} = {2}",
data.Item1, data.Item2, data.Item1 + data.Item2);
break;
case '-':
Console.WriteLine("{0} - {1} = {2}",
data.Item1, data.Item2, data.Item1 - data.Item2);
break;
default:
Console.WriteLine("Unknown operator '{0}'.", data.Item3);
break;
}
}
/* Output:
3 + 5 = 8
6 - 4 = 2
*/
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()
' Post two values to each target of the join.
joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)
joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)
joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)
' Receive each group of values and apply the operator part
' to the number parts.
For i As Integer = 0 To 1
Dim data = joinBlock.Receive()
Select Case data.Item3
Case "+"c
Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
Case "-"c
Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
Case Else
Console.WriteLine("Unknown operator '{0}'.", data.Item3)
End Select
Next i
' Output:
' 3 + 5 = 8
' 6 - 4 = 2
'
Ein vollständiges Beispiel für die Verwendung von JoinBlock<T1,T2>-Objekten im nicht gierigen Modus zur kooperativen Nutzung einer Ressource finden Sie unter Vorgehensweise: Verwenden von JoinBlock zum Lesen aus mehreren Quellen.
BatchedJoinBlock<T1, T2, ...>
Die BatchedJoinBlock<T1,T2>- und die BatchedJoinBlock<T1,T2,T3>-Klasse sammeln Batches von Eingabeelementen und geben System.Tuple(IList(T1), IList(T2))
- oder System.Tuple(IList(T1), IList(T2), IList(T3))
-Objekte weiter, die diese Elemente enthalten. Stellen Sie sich BatchedJoinBlock<T1,T2> als eine Kombination aus BatchBlock<T> und JoinBlock<T1,T2> vor. Beim Erstellen eines BatchedJoinBlock<T1,T2>-Objekts geben Sie die Größe von jedem Batch an. BatchedJoinBlock<T1,T2> stellt außerdem die Eigenschaften Target1 und Target2 bereit, die ITargetBlock<TInput> implementieren. Wenn die angegebene Anzahl von Eingabeelementen auf allen Zielen empfangen wurde, gibt das BatchedJoinBlock<T1,T2>-Objekt ein System.Tuple(IList(T1), IList(T2))
-Objekt asynchron weiter, das die Elemente enthält.
Im folgenden grundlegenden Beispiel wird ein BatchedJoinBlock<T1,T2>-Objekt erstellt, das Ergebnisse, Int32-Werte und Fehler enthält, bei denen es sich um Exception-Objekte handelt. In diesem Beispiel werden mehrere Vorgänge durchgeführt. Ergebnisse werden in die Target1-Eigenschaft und Fehler in die Target2-Eigenschaft des BatchedJoinBlock<T1,T2>-Objekts geschrieben. Da die Anzahl der erfolgreichen und fehlgeschlagenen Vorgänge im Voraus unbekannt ist, ermöglichen die IList<T>-Objekte, dass jedes Ziel null oder mehr Werte empfängt.
// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
if (n < 0)
throw new ArgumentOutOfRangeException();
return n;
};
// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);
// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
try
{
// Post the result of the worker to the
// first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i));
}
catch (ArgumentOutOfRangeException e)
{
// If an error occurred, post the Exception to the
// second target of the block.
batchedJoinBlock.Target2.Post(e);
}
}
// Read the results from the block.
var results = batchedJoinBlock.Receive();
// Print the results to the console.
// Print the results.
foreach (int n in results.Item1)
{
Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
Console.WriteLine(e.Message);
}
/* Output:
5
6
13
55
0
Specified argument was out of the range of valid values.
Specified argument was out of the range of valid values.
*/
' For demonstration, create a Func<int, int> that
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
Return n
End Function
' Create a BatchedJoinBlock<int, Exception> object that holds
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)
' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
Try
' Post the result of the worker to the
' first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i))
Catch e As ArgumentOutOfRangeException
' If an error occurred, post the Exception to the
' second target of the block.
batchedJoinBlock.Target2.Post(e)
End Try
Next i
' Read the results from the block.
Dim results = batchedJoinBlock.Receive()
' Print the results to the console.
' Print the results.
For Each n As Integer In results.Item1
Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
Console.WriteLine(e.Message)
Next e
' Output:
' 5
' 6
' 13
' 55
' 0
' Specified argument was out of the range of valid values.
' Specified argument was out of the range of valid values.
'
Ein vollständiges Beispiel, bei dem mit BatchedJoinBlock<T1,T2> die Ergebnisse und Ausnahmen erfasst werden, die beim Lesen des Programms aus einer Datenbank auftreten, finden Sie unter Exemplarische Vorgehensweise: Effizienzverbesserung durch Verwendung von BatchBlock und BatchedJoinBlock.
Konfigurieren des Datenflussblockverhaltens
Sie können zusätzliche Optionen aktivieren, indem Sie ein System.Threading.Tasks.Dataflow.DataflowBlockOptions-Objekt für den Konstruktor von Datenflussblocktypen bereitstellen. Diese Optionen steuern das Verhalten wie z. B. den Planer, der die zugrunde liegende Aufgabe und den Grad der Parallelität verwaltet. DataflowBlockOptions verfügt außerdem über abgeleitete Typen, die das Verhalten für bestimmte Datenflussblocktypen festlegen. In der folgenden Tabelle ist zusammengefasst, welcher Optionstyp den einzelnen Datenflussblocktypen zugeordnet ist.
Die folgenden Abschnitte enthalten zusätzliche Informationen über die wichtigen Arten von Optionen für Datenflussblöcke, die durch die Klassen System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions und System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions zur Verfügung stehen.
Festlegen des Aufgabenplaners
Jeder vordefinierte Datenflussblock verwendet den TPL-Aufgabenplanungsmechanismus zum Durchführen von Aktivitäten, wie das Weitergeben von Daten an ein Ziel, das Empfangen von Daten aus einer Quelle und das Ausführen benutzerdefinierter Delegate, wenn Daten verfügbar werden. TaskScheduler ist eine abstrakte Klasse, die einen Aufgabenplaner darstellt, der Aufgaben in Warteschlangen für Threads stellt. Der Standardtaskplaner Default verwendet die ThreadPool-Klasse, um Arbeit in die Warteschlange zu stellen und auszuführen. Sie können den Standardtaskplaner überschreiben, indem Sie die TaskScheduler-Eigenschaft festlegen, wenn Sie ein Datenflussblockobjekt erstellen.
Wenn der gleiche Aufgabenplaner mehrere Datenflussblöcke verwaltet, kann er Richtlinien für sie erzwingen. Wenn beispielsweise für mehrere Datenflussblöcke jeweils der exklusive Planer des gleichen ConcurrentExclusiveSchedulerPair-Objekts als Ziel konfiguriert ist, wird die gesamte Arbeit serialisiert, die auf diesen Blöcken ausgeführt wird. Wenn für diese Blöcke der parallele Planer des gleichen ConcurrentExclusiveSchedulerPair-Objekts als Ziel konfiguriert wird und dieser Planer für die maximale Nebenläufigkeitsebene konfiguriert ist, wird die gesamte Arbeit von diesen Blöcken auf diese Anzahl paralleler Vorgänge beschränkt. Ein Beispiel, bei dem mithilfe der ConcurrentExclusiveSchedulerPair-Klasse Lesevorgänge parallel ausgeführt werden, Schreibvorgänge jedoch exklusiv von allen anderen Vorgängen erfolgen, finden Sie unter Vorgehensweise: Angeben eines Taskplaners in einem Datenflussblock. Weitere Informationen zum Taskplaner in der TPL finden Sie im Thema zur TaskScheduler-Klasse.
Festlegen des Grads der Parallelität
In der Standardeinstellung verarbeitet die drei Ausführungsblocktypen, die die TPL-Datenflussbibliothek bereitstellt, ActionBlock<TInput>, TransformBlock<TInput,TOutput> und TransformManyBlock<TInput,TOutput>, jeweils nur eine Nachricht. Diese Datenflussblocktypen verarbeiten Nachrichten ebenfalls in der Reihenfolge, in der sie empfangen werden. Damit diese Datenflussblöcke Nachrichten gleichzeitig verarbeiten können, muss die ExecutionDataflowBlockOptions.MaxDegreeOfParallelism-Eigenschaft beim Erstellen des Datenflussblockobjekts festgelegt werden.
Der Standardwert von MaxDegreeOfParallelism ist 1. Dadurch wird gewährleistet, dass der Datenflussblock jeweils nur eine Nachricht verarbeitet. Durch das Festlegen dieser Eigenschaft auf einen Wert, der größer als 1 ist, kann der Datenflussblock mehrere Nachrichten gleichzeitig verarbeiten. Durch das Festlegen dieser Eigenschaft auf DataflowBlockOptions.Unbounded kann der zugrunde liegende Aufgabenplaner den maximalen Grad von Nebenläufigkeit verwalten.
Wichtig
Wenn Sie einen maximalen Grad an Parallelität angeben, der größer als 1 ist, werden mehrere Nachrichten gleichzeitig verarbeitet. Daher werden Nachrichten möglicherweise nicht in der Reihenfolge verarbeitet, in der sie empfangen werden. Die Reihenfolge, in der die Nachrichten aus dem Block ausgegeben werden, ist jedoch mit der Reihenfolge identisch, in der sie empfangen werden.
Da die MaxDegreeOfParallelism-Eigenschaft den maximalen Grad an Parallelität darstellt, wird der Datenflussblock u. U. mit einem geringeren Grad an Parallelität ausgeführt als angegeben. Der Datenflussblock kann zum Erfüllen seiner Funktionsanforderungen oder aufgrund eines Mangels an verfügbaren Systemressourcen einen geringeren Grad an Parallelität verwenden. Ein Datenflussblock wird nie mit mehr Parallelität ausgeführt als Sie angeben.
Der Wert der MaxDegreeOfParallelism-Eigenschaft ist exklusiv für jedes Datenflussblockobjekt. Wenn beispielsweise für vier Datenflussblockobjekte als maximaler Grad an Parallelität jeweils 1 angegeben wird, können u. U. alle vier Datenflussblockobjekte parallel ausgeführt werden.
Ein Beispiel, bei dem der maximale Grad an Parallelität so festgelegt wird, dass lang andauernde Operationen parallel ausgeführt werden können, finden Sie unter Vorgehensweise: Festlegen des Parallelitätsgrads in einem Datenflussblock.
Festlegen der Anzahl von Nachrichten pro Aufgabe
Die vordefinierten Datenflussblocktypen verwenden Aufgaben, um mehrere Eingabeelemente zu verarbeiten. Dadurch kann die Anzahl der zum Verarbeiten von Daten erforderlichen Aufgabenobjekte minimiert werden, sodass Anwendungen effizienter ausgeführt werden können. Wenn Daten von den Aufgaben aus einem Satz von Datenflussblöcken verarbeitet werden, müssen die Aufgaben von anderen Datenflussblöcken jedoch möglicherweise auf Verarbeitungszeit warten, indem Nachrichten in die Warteschlange gestellt werden. Legen Sie die Eigenschaft MaxMessagesPerTask fest, um eine bessere Ausgewogenheit zwischen Datenflussaufgaben zu ermöglichen. Wenn MaxMessagesPerTask auf DataflowBlockOptions.Unbounded (Standard) festgelegt ist, verarbeitet die von einem Datenflussblock verwendete Aufgabe alle verfügbaren Nachrichten. Wenn MaxMessagesPerTask auf einen anderen Wert als Unbounded festgelegt wird, verarbeitet der Datenflussblock höchstens diese Anzahl an Nachrichten pro Task-Objekt. Obwohl das Festlegen der MaxMessagesPerTask-Eigenschaft die Ausgewogenheit zwischen Aufgaben erhöhen kann, werden vom System u. U. mehr Aufgaben als notwendig erstellt, sodass die Leistung beeinträchtigt wird.
Aktivieren des Abbruchs
Die TPL bietet einen Mechanismus, durch den Aufgaben Abbrüche kooperative koordinieren können. Legen Sie die CancellationToken-Eigenschaft fest, damit Datenflussblöcke diesen Abbruchmechanismus nutzen. Wenn dieses CancellationToken-Objekt in den abgebrochenen Zustand versetzt wird, beenden alle Datenflussblöcke, die dieses Token überwachen, die Ausführung ihres aktuellen Elements, beginnen jedoch nicht mit der Verarbeitung nachfolgender Elemente. Diese Datenflussblöcke löschen außerdem alle gepufferten Nachrichten, geben Verbindungen zu allen Quell- und Zielblöcken frei und wechseln in den abgebrochenen Zustand. Durch den Übergang in den abgebrochenen Zustand, wird die Completion-Eigenschaft der Status-Eigenschaft auf Canceled festgelegt, wenn während der Verarbeitung keine Aufnahme aufgetreten ist. In diesem Fall wird Status auf Faulted festgelegt.
Ein Beispiel für die Verwendung eines Abbruchs in einer Windows Forms-Anwendung finden Sie unter Vorgehensweise: Abbrechen eines Datenflussblocks. Weitere Informationen über Abbrüche in der TPL finden Sie unter Aufgabenabbruch.
Festlegen von gierigem Verhalten im Vergleich zu nicht gierigem Verhalten
Mehrere Gruppierungsdatenflussblock-Typen können im gierigen oder im nicht gierigen Modus arbeiten. In der Standardeinstellung arbeiten die vordefinierten Datenflussblocktypen im gierigen Modus.
Bei Gruppierungsblocktypen wie JoinBlock<T1,T2> bedeutet der gierige Modus, dass Daten vom Block sofort angenommen werden, auch wenn die entsprechenden Daten, mit denen eine gemeinsame Gruppierung erfolgt, noch nicht verfügbar sind. Nicht gieriger Modus bedeutet, dass der Block alle eingehenden Nachrichten zurückstellt, bis eine Nachricht an jedem der zugehörigen Ziele verfügbar ist, um die Gruppierung zu vervollständigen. Wenn eine der zurückgestellten Nachrichten nicht mehr verfügbar ist, gibt der Gruppierungsblock alle zurückgestellten Nachrichten frei und startet den Prozess neu. Bei der BatchBlock<T>-Klasse ist das gierige und das nicht gierige Verhalten ähnlich, außer dass ein BatchBlock<T>-Objekt im nicht gierigen Modus alle eingehenden Nachrichten zurückstellt, bis genügend Nachrichten aus verschiedenen Quellen verfügbar sind, um einen Batch zu vervollständigen.
Legen Sie Greedy auf False
fest, um den nicht gierigen Modus für einen Datenflussblock festzulegen. Ein Beispiel für die Verwendung des nicht gierigen Modus, sodass mehrere Gruppierungsblöcke eine Datenquelle effizienter gemeinsam verwenden können, finden Sie unter Vorgehensweise: Verwenden von JoinBlock zum Lesen aus mehreren Quellen.
Benutzerdefinierte Datenflussblöcke
Obwohl die TPL-Datenflussbibliothek viele vordefinierte Blocktypen bereitstellt, können Sie zusätzliche Blockstypen mit einem benutzerdefinierten Verhalten erstellen. Implementieren Sie die ISourceBlock<TOutput>- oder die ITargetBlock<TInput>-Schnittstelle direkt, oder verwenden Sie die Encapsulate-Methode, um einen komplexen Block zu erstellen, der das Verhalten vorhandener Blocktypen kapselt. Beispiele für die Implementierung benutzerdefinierter Datenflussblock-Funktionen finden Sie unter Exemplarische Vorgehensweise: Erstellen eines Datenflussblocktyps.
Verwandte Themen
Titel | Beschreibung |
---|---|
How to: Schreiben und Lesen von Nachrichten in einem Datenflussblock | Veranschaulicht, wie Nachrichten in ein BufferBlock<T>-Objekt geschrieben und daraus gelesen werden. |
How to: Implementieren eines Producer-Consumer-Musters | Beschreibt, wie mit dem Datenflussmodell ein Producer-Consumer-Muster implementiert wird, bei dem der Producer Nachrichten an einen Datenflussblock sendet und der Consumer Nachrichten aus diesem Block liest. |
How to: Ausführen einer Aktion, wenn ein Datenflussblock Daten empfängt | Beschreibt, wie Delegaten für die Ausführungsdatenflussblocktypen ActionBlock<TInput>, TransformBlock<TInput,TOutput> und TransformManyBlock<TInput,TOutput> bereitgestellt werden. |
Exemplarische Vorgehensweise: Erstellen einer Datenflusspipeline | Beschreibt, wie eine Datenflusspipeline erstellt wird, die Text aus dem Web herunterlädt und Vorgänge für diesen Text ausführt. |
How to: Aufheben der Verknüpfungen von Datenflussblöcken | Veranschaulicht, wie mit der LinkTo-Methode die Verknüpfung zwischen einem Zielblock und der zugehörigen Quelle aufgelöst wird, nachdem die Quelle dem Ziel eine Nachricht anbietet. |
Exemplarische Vorgehensweise: Verwenden von Dataflow in einer Windows Forms-Anwendung | Veranschaulicht, wie ein Netzwerk von Datenflussblöcken erstellt wird, die eine Bildverarbeitung in einer Windows Forms-Anwendung durchführen. |
How to: Abbrechen eines Datenflussblocks | Veranschaulicht die Verwendung von Abbrüchen in einer Windows Forms-Anwendung. |
How to: Verwenden von JoinBlock zum Lesen aus mehreren Quellen | Erläutert die Verwendung der JoinBlock<T1,T2>-Klasse, um einen Vorgang durchzuführen, wenn Daten aus mehreren Quellen zur Verfügung stehen, sowie die Verwendung des nicht gierigen Modus, sodass mehrere Gruppierungsblöcke eine Datenquelle effizienter gemeinsam verwenden können. |
How to: Festlegen des Parallelitätsgrads in einem Datenflussblock | Beschreibt, wie die MaxDegreeOfParallelism-Eigenschaft festgelegt wird, damit ein Ausführungsdatenflussblock mehrere Nachrichten gleichzeitig verarbeiten kann. |
How to: Angeben eines Taskplaners in einem Datenflussblock | Veranschaulicht, wie ein bestimmter Aufgabenplaner zugeordnet wird, wenn Sie Datenfluss in Ihrer Anwendung verwenden. |
Exemplarische Vorgehensweise: Effizienzverbesserung durch Verwendung von BatchBlock und BatchedJoinBlock | Beschreibt, wie mit der BatchBlock<T>-Klasse die Effizienz von Datenbankeinfügevorgängen verbessert wird und wie mit der BatchedJoinBlock<T1,T2>-Klasse sowohl die Ergebnisse als auch Ausnahmen erfasst werden, die auftreten, während das Programm aus einer Datenbank liest. |
Exemplarische Vorgehensweise: Erstellen eines Datenflussblocktyps | Veranschaulicht zwei Möglichkeiten, einen Datenflussblocktyp zu erstellen, der benutzerdefiniertes Verhalten implementiert. |
Task Parallel Library (TPL) | Stellt die TPL vor, eine Bibliothek, die die parallele Programmierung in .NET Framework-Anwendungen vereinfacht. |