Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Die Task Parallel Library (TPL) stellt Datenflusskomponenten bereit, um die Stabilität von parallelen Anwendungen zu erhöhen. Diese Datenflusskomponenten werden gemeinsam als TPL-Datenflussbibliothek bezeichnet. Dieses Datenflussmodell fördert die akteurbasierte Programmierung, indem prozessinterne Nachrichtenübergaben für grobkörnige Datenflow- und Pipeliningaufgaben bereitgestellt werden. Die Dataflow-Komponenten basieren auf den Typen und der Planungsinfrastruktur der TPL und integrieren sie in die C#-, Visual Basic- und F#-Sprachunterstützung für die asynchrone Programmierung. Diese Datenflusskomponenten sind nützlich, wenn Sie über mehrere Vorgänge verfügen, die asynchron miteinander kommunizieren müssen oder wenn Sie Daten verarbeiten möchten, sobald sie verfügbar sind. Betrachten Sie beispielsweise eine Anwendung, die Bilddaten von einer Webkamera verarbeitet. Mithilfe des Datenflussmodells kann die Anwendung Bildframes verarbeiten, sobald sie verfügbar sind. Wenn die Anwendung Bildrahmen verbessert, z. B. durch Lichtkorrektur oder Rote-Augen-Reduzierung, können Sie eine Pipeline von Datenflusskomponenten erstellen. Jede Phase der Pipeline kann grober strukturierte Parallelitätsfunktionen verwenden, wie z. B. die von der TPL bereitgestellten Funktionen zum Transformieren des Bilds.
Dieses Dokument bietet eine Übersicht über die TPL-Datenflussbibliothek. Es beschreibt das Programmiermodell, die vordefinierten Datenflussblocktypen und das Konfigurieren von Datenflussblöcken, um die spezifischen Anforderungen Ihrer Anwendungen zu erfüllen.
Hinweis
Die TPL Dataflow Library (der System.Threading.Tasks.Dataflow Namespace) wird nicht mit .NET verteilt. Um den System.Threading.Tasks.Dataflow Namespace in Visual Studio zu installieren, öffnen Sie Ihr Projekt, wählen Sie "NuGet-Pakete verwalten " im Menü "Projekt " aus, und suchen Sie online nach dem System.Threading.Tasks.Dataflow
Paket. Führen Sie alternativ das .NET Core CLI aus, um es zu installieren, indem Sie dotnet add package System.Threading.Tasks.Dataflow
ausführen.
Programmiermodell
Die TPL-Datenflussbibliothek bietet eine Grundlage für das Übergeben und Parallelisieren von CPU-intensiven und E/O-intensiven Anwendungen mit hohem Durchsatz und geringer Latenz. Außerdem erhalten Sie explizite Kontrolle darüber, wie Daten gepuffert und um das System verschoben werden. Um das Datenflussprogrammierungsmodell besser zu verstehen, sollten Sie eine Anwendung in Betracht ziehen, die Bilder asynchron vom Datenträger lädt und eine Kombination dieser Bilder erstellt. Herkömmliche Programmiermodelle erfordern in der Regel, dass Sie Rückrufe und Synchronisierungsobjekte wie Sperren verwenden, um Aufgaben zu koordinieren und auf freigegebene Daten zuzugreifen. Mithilfe des Datenflussprogrammiermodells können Sie Datenflussobjekte erstellen, die Bilder verarbeiten, während sie vom Datenträger gelesen werden. Unter dem Datenflussmodell deklarieren Sie, wie Daten verarbeitet werden, wenn sie verfügbar sind, sowie alle Abhängigkeiten zwischen Daten. Da die Laufzeit Abhängigkeiten zwischen Daten verwaltet, können Sie häufig die Anforderung vermeiden, den Zugriff auf freigegebene Daten zu synchronisieren. Zusätzlich kann der Datenfluss die Reaktionsfähigkeit und den Durchsatz verbessern, da die Laufzeitumgebung Arbeiten auf der Grundlage der asynchronen Ankunft von Daten plant und die zugrunde liegenden Threads effizient verwaltet. Ein Beispiel, das das Datenflussprogrammierungsmodell zum Implementieren der Bildverarbeitung in einer Windows Forms-Anwendung verwendet, finden Sie unter Walkthrough: Using Dataflow in a Windows Forms Application.
Quellen und Ziele
Die TPL-Datenflussbibliothek besteht aus Datenflussblöcken, die Datenstrukturen sind, die Daten puffern und verarbeiten. Die TPL definiert drei Arten von Datenflussblöcken: Quellblöcke, Zielblöcke und Verteilungsblöcke. Ein Quellblock fungiert als Datenquelle und kann ausgelesen werden. Ein Zielblock fungiert als Empfänger von Daten und kann in diese geschrieben werden. Ein Verteilungsblock fungiert sowohl als Quellblock als auch als Zielblock und kann ausgelesen und in diesen geschrieben werden. Die TPL definiert die System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> Schnittstelle zum Darstellen von Quellen, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> zum Darstellen von Zielen und System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> zum Darstellen von Verteilungsoren. IPropagatorBlock<TInput,TOutput> erbt von beiden ISourceBlock<TOutput>und ITargetBlock<TInput>.
Die TPL-Datenflussbibliothek bietet mehrere vordefinierte Datenflussblocktypen, die die ISourceBlock<TOutput>, ITargetBlock<TInput>und IPropagatorBlock<TInput,TOutput> Schnittstellen implementieren. Diese Datenflussblocktypen werden in diesem Dokument im Abschnitt Vordefinierte Dataflow-Blocktypen beschrieben.
Verbinden von Blöcken
Sie können Datenflussblöcke mit Pipelines verbinden, bei denen es sich um lineare Sequenzen von Datenflussblöcken oder Netzwerken handelt, bei denen es sich um Diagramme von Datenflussblöcken handelt. Eine Pipeline ist eine Form des Netzwerks. In einer Pipeline oder einem Netzwerk verteilen Quellen daten asynchron an Ziele, sobald diese Daten verfügbar sind. Die ISourceBlock<TOutput>.LinkTo Methode verknüpft einen Quelldatenflussblock mit einem Zielblock. Eine Quelle kann mit null oder mehr Zielen verknüpft werden; Ziele können aus null oder mehr Quellen verknüpft werden. Sie können Datenflussblöcke gleichzeitig zu einer Pipeline oder einem Netzwerk hinzufügen oder daraus entfernen. Die vordefinierten Dataflow-Blocktypen behandeln alle Threadsicherheitsaspekte von Verknüpfungen und Aufheben der Verknüpfung.
Ein Beispiel zum Verbinden von Datenflussblöcken mit einer einfachen Pipeline finden Sie unter Walkthrough: Creating a Dataflow Pipeline. Ein Beispiel, das Datenflussblöcke mit komplexeren Netzwerken verbindet, finden Sie unter Walkthrough: Using Dataflow in a Windows Forms Application. Ein Beispiel, das die Verknüpfung eines Ziels aus einer Quelle aufhebt, nachdem die Quelle dem Ziel eine Nachricht angezeigt hat, finden Sie unter How to: Unlink Dataflow Blocks.
Filterung
Wenn Sie die ISourceBlock<TOutput>.LinkTo Methode aufrufen, um eine Quelle mit einem Ziel zu verknüpfen, können Sie einen Delegaten angeben, der bestimmt, ob der Zielblock eine Nachricht basierend auf dem Wert dieser Nachricht akzeptiert oder ablehnt. Dieser Filtermechanismus ist eine nützliche Möglichkeit, um sicherzustellen, dass ein Datenflussblock nur bestimmte Werte empfängt. Bei den meisten vordefinierten Datenflussblocktypen, wenn ein Quellblock mit mehreren Zielblöcken verbunden ist, wenn ein Zielblock eine Nachricht ablehnt, bietet die Quelle diese Nachricht an das nächste Ziel an. Die Reihenfolge, in der eine Quelle Nachrichten für Ziele anbietet, wird durch die Quelle definiert und kann je nach Typ der Quelle variieren. Die meisten Quellblocktypen beenden das Anbieten einer Nachricht, nachdem ein Ziel diese Nachricht akzeptiert hat. Eine Ausnahme dieser Regel ist die BroadcastBlock<T> Klasse, die jede Nachricht für alle Ziele anbietet, auch wenn einige Ziele die Nachricht ablehnen. Ein Beispiel, das filtert, um nur bestimmte Nachrichten zu verarbeiten, finden Sie unter Walkthrough: Using Dataflow in a Windows Forms Application.
Von Bedeutung
Da jeder vordefinierte Quelldatenflussblocktyp garantiert, dass Nachrichten in der Reihenfolge verteilt werden, in der sie empfangen werden, muss jede Nachricht aus dem Quellblock gelesen werden, bevor der Quellblock die nächste Nachricht verarbeiten kann. Stellen Sie daher beim Filtern zum Verbinden mehrerer Ziele mit einer Quelle sicher, dass mindestens ein Zielblock jede Nachricht empfängt. Andernfalls kann bei der Anwendung ein Deadlock auftreten.
Nachrichtenübergabe
Das Datenflussprogrammierungsmodell bezieht sich auf das Konzept der Nachrichtenübergabe, bei dem unabhängige Komponenten eines Programms miteinander kommunizieren, indem Nachrichten gesendet werden. Eine Möglichkeit zum Verteilen von Nachrichten zwischen Anwendungskomponenten besteht darin, die Post (synchronen) und SendAsync (asynchronen) Methoden zum Senden von Nachrichten an Zieldatenflussblöcke sowie die ReceiveMethoden ReceiveAsynczum TryReceive Empfangen von Nachrichten aus Quellblöcken aufzurufen. Sie können diese Methoden mit Datenflusspipelines oder Netzwerken kombinieren, indem Sie Eingabedaten an den Kopfknoten (einen Zielblock) senden und Ausgabedaten vom Terminalknoten der Pipeline oder den Terminalknoten des Netzwerks empfangen (eine oder mehrere Quellblöcke). Sie können die Choose-Methode auch verwenden, um aus der ersten der bereitgestellten Quellen zu lesen, die Daten verfügbar hat, und Aktionen für diese Daten durchzuführen.
Quellblöcke bieten Daten für Zielblöcke durch Aufrufen der ITargetBlock<TInput>.OfferMessage Methode. Der Zielblock antwortet auf eine angebotene Nachricht auf eine von drei Arten: Er kann die Nachricht annehmen, die Nachricht ablehnen oder die Nachricht verschieben. Wenn das Ziel die Nachricht akzeptiert, gibt die OfferMessage Methode zurück Accepted. Wenn das Ziel die Nachricht ablehnt, gibt die OfferMessage Methode zurück Declined. Wenn das Ziel erfordert, dass es keine Nachrichten mehr von der Quelle empfängt, OfferMessage wird zurückgegeben DecliningPermanently. Die vordefinierten Quellblocktypen bieten keine Nachrichten an verknüpfte Ziele, nachdem ein solcher Rückgabewert empfangen wurde, und sie heben die Verknüpfung von solchen Zielen automatisch auf.
Wenn ein Zielblock die Nachricht zur späteren Verwendung verschiebt, gibt die OfferMessage Methode zurück Postponed. Ein Zielblock, der eine Nachricht verschiebt, kann später die ISourceBlock<TOutput>.ReserveMessage Methode aufrufen, um zu versuchen, die angebotene Nachricht zu reservieren. An diesem Punkt ist die Nachricht entweder noch verfügbar und kann vom Zielblock verwendet werden, oder die Nachricht wurde von einem anderen Ziel übernommen. Wenn der Zielblock später die Nachricht benötigt oder sie nicht mehr benötigt, wird die ISourceBlock<TOutput>.ConsumeMessage oder die ReleaseReservation Methode aufgerufen. Die Nachrichtenreservierung wird in der Regel von den Datenflussblocktypen verwendet, die im nicht gierigen Modus ausgeführt werden. "Nicht gieriger Modus" wird weiter unten in diesem Dokument erläutert. Anstatt eine verschobene Nachricht zu reservieren, kann ein Zielblock auch die ISourceBlock<TOutput>.ConsumeMessage Methode verwenden, um die verschobene Nachricht direkt zu nutzen.
Abschluss des Datenflussblocks
Dataflow-Blöcke unterstützen auch das Konzept der Fertigstellung. Ein Datenflussblock, der sich im abgeschlossenen Zustand befindet, führt keine weiteren Arbeiten aus. Jeder Datenflussblock verfügt über ein zugeordnetes System.Threading.Tasks.Task Objekt, das als Abschlussaufgabe bezeichnet wird und den Abschlussstatus des Blocks darstellt. Da Sie warten können, bis ein Task Objekt abgeschlossen ist, können Sie mithilfe von Abschlussaufgaben warten, bis ein oder mehrere Terminalknoten eines Datenflussnetzwerks abgeschlossen sind. Die IDataflowBlock Schnittstelle definiert die Complete Methode, die den Datenflussblock über eine Anfrage zu seiner Fertigstellung benachrichtigt, sowie die Eigenschaft Completion, die die Abschlussaufgabe für den Datenflussblock zurückgibt. Sowohl ISourceBlock<TOutput> als auch ITargetBlock<TInput> erben die IDataflowBlock Schnittstelle.
Es gibt zwei Möglichkeiten, um zu ermitteln, ob ein Datenflussblock ohne Fehler abgeschlossen wurde, einen oder mehrere Fehler aufgetreten ist oder abgebrochen wurde. Die erste Möglichkeit besteht darin, die Task.Wait Methode für die Fertigstellungsaufgabe in einemtry
-catch
Block (Try
-Catch
in Visual Basic) aufzurufen. Im folgenden Beispiel wird ein ActionBlock<TInput>-Objekt erstellt, das ArgumentOutOfRangeException auslöst, wenn der Eingabewert kleiner als 0 ist.
AggregateException wird ausgelöst, wenn in diesem Beispiel Wait für die Abschlussaufgabe aufgerufen wird. Der ArgumentOutOfRangeException Zugriff erfolgt über die InnerExceptions Eigenschaft des AggregateException Objekts.
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine($"n = {n}");
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
return true;
});
}
/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
In diesem Beispiel wird der Fall veranschaulicht, in dem eine Ausnahme im Delegat eines Ausführungsdatenflussblocks nicht behandelt wird. Es wird empfohlen, Ausnahmen in den Codetexten solcher Blöcke zu behandeln. Sollten Sie dies jedoch nicht tun können, verhält sich der Block, als wäre er abgebrochen worden und verarbeitet eingehende Nachrichten nicht.
Wenn ein Datenflussblock explizit abgebrochen wird, enthält das AggregateException-Objekt OperationCanceledException in der InnerExceptions-Eigenschaft. Weitere Informationen zum Abbruch des Datenflusses finden Sie im Abschnitt "Abbruch aktivieren ".
Die zweite Möglichkeit zum Ermitteln des Abschlussstatus eines Datenflussblocks besteht darin, eine Fortsetzung der Fertigstellungsaufgabe zu verwenden oder die asynchronen Sprachfeatures von C# und Visual Basic zu verwenden, um asynchron auf die Fertigstellungsaufgabe zu warten. Der Delegat, den Sie für die Task.ContinueWith Methode bereitstellen, empfängt ein Task Objekt, das die vorherige Aufgabe darstellt. Im Fall der Completion-Eigenschaft nimmt der Delegat für die Fortsetzung die Abschlussaufgabe selbst an. Das folgende Beispiel ähnelt dem vorherigen, mit der Ausnahme, dass es auch die ContinueWith Methode verwendet, um eine Fortsetzungsaufgabe zu erstellen, die den Status des gesamten Datenflussvorgangs druckt.
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine($"n = {n}");
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
return true;
});
}
/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Create a continuation task that prints the overall
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' The status of the completion task is 'Faulted'.
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
Sie können auch Eigenschaften wie IsCanceled im Textkörper der Fortsetzungsaufgabe verwenden, um zusätzliche Informationen zum Abschlussstatus eines Datenflussblocks zu ermitteln. Weitere Informationen zu Fortsetzungsaufgaben und deren Beziehung zur Abbruch- und Fehlerbehandlung finden Sie unter Verketten von Aufgaben mithilfe von Fortsetzungsaufgaben, Aufgabenabbruch und Ausnahmebehandlung.
Vordefinierte Dataflow-Blocktypen
Die TPL-Datenflussbibliothek bietet mehrere vordefinierte Datenflussblocktypen. Diese Typen sind in drei Kategorien unterteilt: Pufferblöcke, Ausführungsblöcke und Gruppierungsblöcke. In den folgenden Abschnitten werden die Blocktypen beschrieben, aus denen diese Kategorien bestehen.
Pufferblöcke
Pufferungsblöcke enthalten Daten zur Verwendung durch Datenkonsumenten. Die TPL-Datenflussbibliothek bietet drei Pufferblocktypen: System.Threading.Tasks.Dataflow.BufferBlock<T>, , System.Threading.Tasks.Dataflow.BroadcastBlock<T>und System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.
BufferBlock<T>
Die BufferBlock<T> Klasse stellt eine allgemeine asynchrone Messagingstruktur dar. Diese Klasse speichert eine FIFO-Nachrichtenwarteschlange (First In, First Out), in die mehrere Quellen Nachrichten schreiben oder aus der mehrere Ziele Nachrichten auslesen können. Wenn ein Ziel eine Nachricht von einem BufferBlock<T> Objekt empfängt, wird diese Nachricht aus der Nachrichtenwarteschlange entfernt. Daher kann zwar ein BufferBlock<T> Objekt mehrere Ziele haben, aber jede Nachricht wird nur an ein einziges Ziel gesendet. Die BufferBlock<T> Klasse ist nützlich, wenn Sie mehrere Nachrichten an eine andere Komponente übergeben möchten, und diese Komponente muss jede Nachricht empfangen.
Im folgenden grundlegenden Beispiel werden mehrere Int32 Werte in ein BufferBlock<T> Objekt gepostet, und dann werden diese Werte wieder aus diesem Objekt gelesen.
// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
/* Output:
0
1
2
*/
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()
' Post several messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
' Output:
' 0
' 1
' 2
'
Ein vollständiges Beispiel, das veranschaulicht, wie Nachrichten in ein BufferBlock<T> Objekt geschrieben und gelesen werden, finden Sie unter How to: Write Messages to and Read Messages from a Dataflow Block.
BroadcastBlock<T>
Die BroadcastBlock<T> Klasse ist nützlich, wenn Sie mehrere Nachrichten an eine andere Komponente übergeben müssen, diese Komponente benötigt jedoch nur den letzten Wert. Diese Klasse ist auch hilfreich, wenn Sie eine Nachricht an mehrere Komponenten übertragen möchten.
Im folgenden einfachen Beispiel wird ein Double Wert in ein BroadcastBlock<T> Objekt gepostet, und dieser Wert wird dann mehrmals aus diesem Objekt gelesen. Da Werte nach dem Lesen nicht aus BroadcastBlock<T> Objekten entfernt werden, ist derselbe Wert jedes Mal verfügbar.
// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);
// Post a message to the block.
broadcastBlock.Post(Math.PI);
// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(broadcastBlock.Receive());
}
/* Output:
3.14159265358979
3.14159265358979
3.14159265358979
*/
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)
' Post a message to the block.
broadcastBlock.Post(Math.PI)
' Receive the messages back from the block several times.
For i As Integer = 0 To 2
Console.WriteLine(broadcastBlock.Receive())
Next i
' Output:
' 3.14159265358979
' 3.14159265358979
' 3.14159265358979
'
Ein vollständiges Beispiel, das veranschaulicht, wie Sie mit BroadcastBlock<T> eine Nachricht an mehrere Zielblöcke senden, finden Sie im Abschnitt How to: Specify a Task Scheduler in a Dataflow Block.
WriteOnceBlock<T>
Die WriteOnceBlock<T> Klasse ähnelt der BroadcastBlock<T> Klasse, mit der Ausnahme, dass ein WriteOnceBlock<T> Objekt nur einmal geschrieben werden kann. Sie können sich WriteOnceBlock<T> ähnlich wie das C#-readonly-Schlüsselwort (ReadOnly in Visual Basic) vorstellen, mit der Ausnahme, dass ein WriteOnceBlock<T>-Objekt unveränderlich wird, nachdem es einen Wert erhält, anstatt bei der Konstruktion. Wie die BroadcastBlock<T> Klasse, wenn ein Ziel eine Nachricht von einem WriteOnceBlock<T> Objekt empfängt, wird diese Nachricht nicht aus diesem Objekt entfernt. Daher erhalten mehrere Ziele eine Kopie der Nachricht. Die WriteOnceBlock<T> Klasse ist nützlich, wenn Sie nur die erste von mehreren Nachrichten weitergeben möchten.
Im folgenden grundlegenden Beispiel werden mehrere String Werte an ein WriteOnceBlock<T> Objekt übermittelt und dann der Wert aus dem Objekt gelesen. Da ein WriteOnceBlock<T> Objekt nur einmal geschrieben werden kann, nachdem ein WriteOnceBlock<T> Objekt eine Nachricht empfängt, verwirft es nachfolgende Nachrichten.
// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);
// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
() => writeOnceBlock.Post("Message 1"),
() => writeOnceBlock.Post("Message 2"),
() => writeOnceBlock.Post("Message 3"));
// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());
/* Sample output:
Message 2
*/
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)
' Post several messages to the block in parallel. The first
' message to be received is written to the block.
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))
' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())
' Sample output:
' Message 2
'
Ein vollständiges Beispiel, das zeigt, wie Sie WriteOnceBlock<T> verwenden, um den Wert des zuerst abgeschlossenen Vorgangs zu erhalten, finden Sie unter How to: Unlink Dataflow Blocks.
Ausführungsblöcke
Ausführungsblöcke rufen einen vom Benutzer bereitgestellten Delegat für jeden empfangenen Datenabschnitt auf. Die TPL-Datenflussbibliothek bietet drei Ausführungsblocktypen: ActionBlock<TInput>, , System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>und System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.
ActionBlock<T>
Die ActionBlock<TInput> Klasse ist ein Zielblock, der einen Delegaten aufruft, wenn er Daten empfängt. Stellen Sie sich ein ActionBlock<TInput> Objekt als Stellvertretung vor, die asynchron ausgeführt wird, wenn Daten verfügbar werden. Die Stellvertretung, die Sie für ein ActionBlock<TInput> Objekt bereitstellen, kann vom Typ Action<T> oder Typ System.Func<TInput, Task>
sein. Wenn Sie ein ActionBlock<TInput> Objekt mit Action<T> verwenden, gilt die Verarbeitung jedes Eingabeelements als abgeschlossen, wenn der Delegat zurückkehrt. Wenn Sie ein ActionBlock<TInput> Objekt mit System.Func<TInput, Task>
verwenden, gilt die Verarbeitung jedes Eingabeelements nur dann als abgeschlossen, wenn das zurückgegebene Task Objekt abgeschlossen ist. Mithilfe dieser beiden Mechanismen können Sie sowohl für die synchrone als auch für die asynchrone Verarbeitung jedes Eingabeelements verwenden ActionBlock<TInput> .
Im folgenden grundlegenden Beispiel werden mehrere Int32 Werte an ein ActionBlock<TInput> Objekt bereitgestellt. Das ActionBlock<TInput> Objekt druckt diese Werte in der Konsole. In diesem Beispiel wird der Block dann auf den abgeschlossenen Zustand festgelegt und wartet auf den Abschluss aller Datenflussaufgaben.
// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
actionBlock.Post(i * 10);
}
// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();
/* Output:
0
10
20
*/
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))
' Post several messages to the block.
For i As Integer = 0 To 2
actionBlock.Post(i * 10)
Next i
' Set the block to the completed state and wait for all
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()
' Output:
' 0
' 10
' 20
'
Vollständige Beispiele, die erklären, wie man Delegaten mit der ActionBlock<TInput> Klasse verwendet, finden Sie unter Vorgehensweise: Ausführen einer Aktion, wenn ein Datenflussblock Daten empfängt.
TransformBlock<TInput, TOutput>
Die TransformBlock<TInput,TOutput> Klasse ähnelt der ActionBlock<TInput> Klasse, mit der Ausnahme, dass sie sowohl als Quelle als auch als Ziel fungiert. Der Delegat, den Sie an ein TransformBlock<TInput,TOutput>-Objekt übergeben, gibt einen Wert vom Typ TOutput
zurück. Die Stellvertretung, die Sie für ein TransformBlock<TInput,TOutput> Objekt bereitstellen, kann vom Typ System.Func<TInput, TOutput>
oder Typ System.Func<TInput, Task<TOutput>>
sein. Wenn Sie ein TransformBlock<TInput,TOutput> Objekt mit System.Func<TInput, TOutput>
verwenden, gilt die Verarbeitung jedes Eingabeelements als abgeschlossen, wenn der Delegat zurückkehrt. Wenn Sie ein TransformBlock<TInput,TOutput> Objekt verwenden, das mit System.Func<TInput, Task<TOutput>>
verwendet wird, gilt die Verarbeitung jedes Eingabeelements nur dann als abgeschlossen, wenn das zurückgegebene Task<TResult> Objekt abgeschlossen ist. Wie bei der Verwendung dieser beiden Mechanismen können Sie ActionBlock<TInput> sowohl für die synchrone als auch für die asynchrone Verarbeitung jedes Eingabeelements verwenden.
Im folgenden grundlegenden Beispiel wird ein TransformBlock<TInput,TOutput> Objekt erstellt, das die Quadratwurzel der Eingabe berechnet. Das TransformBlock<TInput,TOutput> Objekt akzeptiert Int32 Werte als Eingabe und erzeugt Double Werte als Ausgabe.
// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));
// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);
// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(transformBlock.Receive());
}
/* Output:
3.16227766016838
4.47213595499958
5.47722557505166
*/
' Create a TransformBlock<int, double> object that
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))
' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)
' Read the output messages from the block.
For i As Integer = 0 To 2
Console.WriteLine(transformBlock.Receive())
Next i
' Output:
' 3.16227766016838
' 4.47213595499958
' 5.47722557505166
'
Vollständige Beispiele, die TransformBlock<TInput,TOutput> in einem Netzwerk von Datenflussblöcken verwenden, das die Bildverarbeitung in einer Windows Forms-Anwendung ausführt, finden Sie unter Walkthrough: Using Dataflow in a Windows Forms Application.
TransformManyBlock<TInput, TOutput>
Die TransformManyBlock<TInput,TOutput> Klasse ähnelt der TransformBlock<TInput,TOutput> Klasse, mit der Ausnahme, dass TransformManyBlock<TInput,TOutput> für jeden Eingabewert null oder mehr Ausgabewerte erzeugt werden, statt nur einen Ausgabewert für jeden Eingabewert. Die Stellvertretung, die Sie für ein TransformManyBlock<TInput,TOutput> Objekt bereitstellen, kann vom Typ System.Func<TInput, IEnumerable<TOutput>>
oder Typ System.Func<TInput, Task<IEnumerable<TOutput>>>
sein. Wenn Sie ein TransformManyBlock<TInput,TOutput> Objekt mit System.Func<TInput, IEnumerable<TOutput>>
verwenden, gilt die Verarbeitung jedes Eingabeelements als abgeschlossen, wenn der Delegat zurückkehrt. Wenn Sie ein TransformManyBlock<TInput,TOutput> Objekt mit System.Func<TInput, Task<IEnumerable<TOutput>>>
verwenden, gilt die Verarbeitung jedes Eingabeelements nur dann als abgeschlossen, wenn das zurückgegebene System.Threading.Tasks.Task<IEnumerable<TOutput>>
Objekt abgeschlossen ist.
Im folgenden einfachen Beispiel wird ein TransformManyBlock<TInput,TOutput> Objekt erstellt, das Zeichenfolgen in ihre einzelnen Zeichensequenzen aufteilt. Das TransformManyBlock<TInput,TOutput> Objekt akzeptiert String Werte als Eingabe und erzeugt Char Werte als Ausgabe.
// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
s => s.ToCharArray());
// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");
// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
Console.WriteLine(transformManyBlock.Receive());
}
/* Output:
H
e
l
l
o
W
o
r
l
d
*/
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())
' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")
' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
Console.WriteLine(transformManyBlock.Receive())
Next i
' Output:
' H
' e
' l
' l
' o
' W
' o
' r
' l
' d
'
Vollständige Beispiele, in denen TransformManyBlock<TInput,TOutput> verwendet wird, um mehrere unabhängige Ausgaben für jede Eingabe in einer Datenflusspipeline zu erzeugen, finden Sie unter Walkthrough: Creating a Dataflow Pipeline.
Grad der Parallelität
Jedes ActionBlock<TInput>, TransformBlock<TInput,TOutput> und TransformManyBlock<TInput,TOutput>-Objekt puffert Eingabemeldungen, bis der Block bereit ist, sie zu verarbeiten. Standardmäßig verarbeiten diese Klassen Nachrichten in der Reihenfolge, in der sie empfangen werden, eine Nachricht nach der anderen. Sie können auch den Grad der Parallelität angeben, um es ActionBlock<TInput>, TransformBlock<TInput,TOutput> und TransformManyBlock<TInput,TOutput> Objekten zu ermöglichen, mehrere Nachrichten gleichzeitig zu verarbeiten. Weitere Informationen zur gleichzeitigen Ausführung finden Sie im Abschnitt "Angeben des Grads der Parallelität" weiter unten in diesem Dokument. Ein Beispiel, das den Grad der Parallelität festlegt, damit ein Ausführungsdatenflussblock mehrere Nachrichten gleichzeitig verarbeiten kann, finden Sie unter How to: Specify the Degree of Parallelism in a Dataflow Block.
Übersicht über Delegattypen
In der folgenden Tabelle sind die Delegattypen zusammengefasst, die Sie für ActionBlock<TInput>, TransformBlock<TInput,TOutput>und TransformManyBlock<TInput,TOutput> Objekte bereitstellen können. In dieser Tabelle wird auch angegeben, ob der Delegattyp synchron oder asynchron arbeitet.
Typ | Synchroner Delegattyp | Asynchroner Delegattyp |
---|---|---|
ActionBlock<TInput> | System.Action |
System.Func<TInput, Task> |
TransformBlock<TInput,TOutput> | System.Func<TInput, TOutput> |
System.Func<TInput, Task<TOutput>> |
TransformManyBlock<TInput,TOutput> | System.Func<TInput, IEnumerable<TOutput>> |
System.Func<TInput, Task<IEnumerable<TOutput>>> |
Sie können auch Lambda-Ausdrücke verwenden, wenn Sie mit Ausführungsblocktypen arbeiten. Ein Beispiel für die Verwendung eines Lambda-Ausdrucks mit einem Ausführungsblock finden Sie unter Vorgehensweise: Ausführen einer Aktion, wenn ein Datenflussblock Daten empfängt.
Gruppieren von Blöcken
Gruppierungsblöcke kombinieren Daten aus einer oder mehreren Quellen und unter verschiedenen Einschränkungen. Die TPL-Datenflussbibliothek bietet drei Verknüpfungsblocktypen: BatchBlock<T>, , JoinBlock<T1,T2>und BatchedJoinBlock<T1,T2>.
BatchBlock<T>
Die BatchBlock<T> Klasse kombiniert Eingabedatensätze, die als Batches bezeichnet werden, in Arrays von Ausgabedaten. Beim Erstellen eines BatchBlock<T> Objekts geben Sie die Größe jedes Batches an. Wenn das BatchBlock<T> Objekt die angegebene Anzahl von Eingabeelementen empfängt, verteilt es asynchron ein Array, das diese Elemente enthält. Wenn ein BatchBlock<T> Objekt auf den abgeschlossenen Zustand festgelegt ist, aber nicht genügend Elemente enthält, um einen Batch zu bilden, verteilt es ein endgültiges Array, das die verbleibenden Eingabeelemente enthält.
Die BatchBlock<T> Klasse arbeitet entweder im gierigen oder nicht gierigen Modus. Im gierigen Modus, der standardmäßig aktiv ist, akzeptiert ein BatchBlock<T> Objekt jede Nachricht, die ihm angeboten wird, und gibt ein Array aus, nachdem es die angegebene Anzahl von Elementen erhält. Im nicht gierigen Modus stellt ein BatchBlock<T>-Objekt alle eingehenden Nachrichten zurück, bis dem Block genügend Nachrichten von Quellen angeboten wurden, um einen Batch zu bilden. Der gierige Modus führt in der Regel besser aus als der nicht gierige Modus, da weniger Verarbeitungsaufwand erforderlich ist. Sie können jedoch den nicht-gierigen Modus verwenden, wenn Sie den Verbrauch aus mehreren Quellen auf atomische Weise koordinieren müssen. Geben Sie den Nicht-Giermodus an, indem Sie Greedy auf False
im dataflowBlockOptions
-Parameter des BatchBlock<T>-Konstruktors setzen.
Im folgenden grundlegenden Beispiel werden mehrere Int32-Werte an ein BatchBlock<T>-Objekt gesendet, das zehn Elemente in einem Batch enthält. Um sicherzustellen, dass alle Werte aus dem BatchBlock<T> propagiert werden, ruft dieses Beispiel die Complete-Methode auf. Die Complete Methode legt das BatchBlock<T> Objekt auf den abgeschlossenen Zustand fest, und daher verteilt das BatchBlock<T> Objekt alle verbleibenden Elemente als endgültigen Batch.
// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);
// Post several values to the block.
for (int i = 0; i < 13; i++)
{
batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();
// Print the sum of both batches.
Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");
Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");
/* Output:
The sum of the elements in batch 1 is 45.
The sum of the elements in batch 2 is 33.
*/
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)
' Post several values to the block.
For i As Integer = 0 To 12
batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()
' Print the sum of both batches.
Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())
Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())
' Output:
' The sum of the elements in batch 1 is 45.
' The sum of the elements in batch 2 is 33.
'
Ein vollständiges Beispiel zur BatchBlock<T> Verbesserung der Effizienz von Datenbankeinfügeoperationen finden Sie unter Anleitung: Verwenden von BatchBlock und BatchedJoinBlock zur Effizienzsteigerung.
JoinBlock<T1, T2, ...>
Die Klassen JoinBlock<T1,T2> und JoinBlock<T1,T2,T3> sammeln Eingabeelemente und propagieren System.Tuple<T1,T2>- oder System.Tuple<T1,T2,T3>-Objekte, die diese Elemente enthalten. Die Klassen JoinBlock<T1,T2> und JoinBlock<T1,T2,T3> erben nicht von ITargetBlock<TInput>. Stattdessen stellen sie Eigenschaften, Target1, Target2 und Target3 bereit, die ITargetBlock<TInput> implementieren.
Wie BatchBlock<T>, arbeiten JoinBlock<T1,T2> und JoinBlock<T1,T2,T3> entweder im Greedy- oder im Non-Greedy-Modus. Im gierigen Modus, der standardmäßig eingestellt ist, akzeptiert ein JoinBlock<T1,T2> oder JoinBlock<T1,T2,T3>-Objekt jede Nachricht, die ihm angeboten wird, und verteilt ein Tupel, nachdem jedes seiner Ziele mindestens eine Nachricht empfangen hat. Im nicht-gierigen Modus verschiebt ein JoinBlock<T1,T2>- oder JoinBlock<T1,T2,T3>-Objekt alle eingehenden Nachrichten, bis alle Ziele die Daten angeboten bekommen haben, die zum Erstellen eines Tupels erforderlich sind. An diesem Punkt initiiert der Block ein Zweiphasencommit-Protokoll, um alle erforderlichen Elemente aus den Quellen atomar abzurufen. Diese Verschiebung ermöglicht es einer anderen Entität, die Daten in der Zwischenzeit zu nutzen, damit das gesamte System Fortschritte erzielt.
Im folgenden grundlegenden Beispiel wird ein Fall veranschaulicht, in dem ein JoinBlock<T1,T2,T3> Objekt mehrere Daten zum Berechnen eines Werts benötigt. In diesem Beispiel wird ein JoinBlock<T1,T2,T3> Objekt erstellt, das zwei Int32 Werte und einen Char Wert zum Ausführen einer arithmetischen Operation erfordert.
// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();
// Post two values to each target of the join.
joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);
joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);
joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');
// Receive each group of values and apply the operator part
// to the number parts.
for (int i = 0; i < 2; i++)
{
var data = joinBlock.Receive();
switch (data.Item3)
{
case '+':
Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
break;
case '-':
Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
break;
default:
Console.WriteLine($"Unknown operator '{data.Item3}'.");
break;
}
}
/* Output:
3 + 5 = 8
6 - 4 = 2
*/
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()
' Post two values to each target of the join.
joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)
joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)
joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)
' Receive each group of values and apply the operator part
' to the number parts.
For i As Integer = 0 To 1
Dim data = joinBlock.Receive()
Select Case data.Item3
Case "+"c
Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
Case "-"c
Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
Case Else
Console.WriteLine("Unknown operator '{0}'.", data.Item3)
End Select
Next i
' Output:
' 3 + 5 = 8
' 6 - 4 = 2
'
Ein vollständiges Beispiel, das Objekte im nicht-gierigen Modus verwendet, um eine Ressource kooperativ zu teilen, finden Sie unter JoinBlock<T1,T2>.
BatchedJoinBlock<T1, T2, ...>
Die BatchedJoinBlock<T1,T2>- und BatchedJoinBlock<T1,T2,T3>-Klassen sammeln Batches von Eingabeelementen und verbreiten System.Tuple(IList(T1), IList(T2))
- oder System.Tuple(IList(T1), IList(T2), IList(T3))
-Objekte, die diese Elemente enthalten.
BatchedJoinBlock<T1,T2> Denken Sie an eine Kombination von BatchBlock<T> und JoinBlock<T1,T2>. Geben Sie die Größe jedes Batches an, wenn Sie ein BatchedJoinBlock<T1,T2> Objekt erstellen.
BatchedJoinBlock<T1,T2> bietet außerdem die Eigenschaften, Target1 und Target2, die ITargetBlock<TInput> implementieren. Wenn die angegebene Anzahl von Eingabeelementen von allen Zielen empfangen wird, verteilt das BatchedJoinBlock<T1,T2> Objekt asynchron ein System.Tuple(IList(T1), IList(T2))
Objekt, das diese Elemente enthält.
Im folgenden grundlegenden Beispiel wird ein BatchedJoinBlock<T1,T2>-Objekt erstellt, das Ergebnisse, Int32-Werte und Fehler enthält, die Exception-Objekte sind. In diesem Beispiel werden mehrere Vorgänge ausgeführt und Ergebnisse in die Target1 Eigenschaft und Fehler in die Target2 Eigenschaft des BatchedJoinBlock<T1,T2> Objekts geschrieben. Da die Anzahl der erfolgreichen und fehlgeschlagenen Vorgänge im Voraus unbekannt ist, ermöglichen es die IList<T> Objekte, dass jedes Ziel null oder mehr Werte empfangen kann.
// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
if (n < 0)
throw new ArgumentOutOfRangeException();
return n;
};
// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);
// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
try
{
// Post the result of the worker to the
// first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i));
}
catch (ArgumentOutOfRangeException e)
{
// If an error occurred, post the Exception to the
// second target of the block.
batchedJoinBlock.Target2.Post(e);
}
}
// Read the results from the block.
var results = batchedJoinBlock.Receive();
// Print the results to the console.
// Print the results.
foreach (int n in results.Item1)
{
Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
Console.WriteLine(e.Message);
}
/* Output:
5
6
13
55
0
Specified argument was out of the range of valid values.
Specified argument was out of the range of valid values.
*/
' For demonstration, create a Func<int, int> that
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
Return n
End Function
' Create a BatchedJoinBlock<int, Exception> object that holds
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)
' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
Try
' Post the result of the worker to the
' first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i))
Catch e As ArgumentOutOfRangeException
' If an error occurred, post the Exception to the
' second target of the block.
batchedJoinBlock.Target2.Post(e)
End Try
Next i
' Read the results from the block.
Dim results = batchedJoinBlock.Receive()
' Print the results to the console.
' Print the results.
For Each n As Integer In results.Item1
Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
Console.WriteLine(e.Message)
Next e
' Output:
' 5
' 6
' 13
' 55
' 0
' Specified argument was out of the range of valid values.
' Specified argument was out of the range of valid values.
'
Ein vollständiges Beispiel, bei dem mit BatchedJoinBlock<T1,T2> die Ergebnisse und Ausnahmen erfasst werden, die beim Lesen des Programms aus einer Datenbank auftreten, finden Sie unter Exemplarische Vorgehensweise: Effizienzverbesserung durch Verwendung von BatchBlock und BatchedJoinBlock.
Konfigurieren des Datenflussblockverhaltens
Sie können zusätzliche Optionen aktivieren, indem Sie dem Konstruktor von Datenflussblocktypen ein System.Threading.Tasks.Dataflow.DataflowBlockOptions Objekt bereitstellen. Diese Optionen dienen zur Steuerung des Verhaltens, wie etwa des Schedulers, der die zugrunde liegende Aufgabe und den Grad der Parallelität verwaltet. Das DataflowBlockOptions verfügt zudem über abgeleitete Typen, die das Verhalten definieren, das für bestimmte Datenflussblocktypen spezifisch ist. In der folgenden Tabelle wird zusammengefasst, welcher Optionstyp jedem Datenflussblocktyp zugeordnet ist.
In den folgenden Abschnitten finden Sie zusätzliche Informationen zu den wichtigen Arten von Datenflussblockoptionen, die über die Klassen System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions und System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions verfügbar sind.
Festlegen des Aufgabenplaners
Jeder vordefinierte Datenflussblock verwendet den TPL-Aufgabenplanungsmechanismus, um Aktivitäten wie das Verteilen von Daten an ein Ziel, das Empfangen von Daten aus einer Quelle und das Ausführen von benutzerdefinierten Stellvertretungen auszuführen, wenn Daten verfügbar werden. TaskScheduler ist eine abstrakte Klasse, die einen Aufgabenplaner darstellt, der Aufgaben in Threads in die Warteschlange stellt. Der standardmäßige Aufgabenplaner Default verwendet die ThreadPool Klasse zum Einreihen und Ausführen von Aufgaben. Sie können den Standardaufgabenplaner überschreiben, indem Sie die TaskScheduler Eigenschaft festlegen, wenn Sie ein Dataflow-Blockobjekt erstellen.
Wenn derselbe Aufgabenplaner mehrere Datenflussblöcke verwaltet, kann er Richtlinien für alle anwenden. Wenn beispielsweise mehrere Datenflussblöcke für den exklusiven Zeitplan desselben ConcurrentExclusiveSchedulerPair Objekts konfiguriert sind, werden alle Vorgänge, die über diese Blöcke hinweg ausgeführt werden, serialisiert. Wenn diese Blöcke so konfiguriert sind, dass sie den konkurrierenden Scheduler desselben ConcurrentExclusiveSchedulerPair Objekts anvisieren und dass für diesen Scheduler eine maximale Parallelitätsstufe festgelegt wurde, ist die gesamte Arbeit von diesen Blöcken auf diese Anzahl gleichzeitiger Vorgänge beschränkt. Ein Beispiel, in dem die ConcurrentExclusiveSchedulerPair Klasse verwendet wird, um Lesevorgänge parallel auszuführen, während Schreibvorgänge exklusiv und unabhängig von allen anderen Vorgängen erfolgen, finden Sie unter How to: Specify a Task Scheduler in a Dataflow Block. Weitere Informationen zu Aufgabenplanern in der TPL finden Sie im TaskScheduler Kursthema.
Angeben des Grads der Parallelität
Standardmäßig verarbeiten die drei Ausführungsblocktypen, die die TPL Dataflow Library bereitstellt, ActionBlock<TInput>, TransformBlock<TInput,TOutput> und TransformManyBlock<TInput,TOutput> jeweils eine Nachricht gleichzeitig. Diese Datenflussblocktypen verarbeiten auch Nachrichten in der Reihenfolge, in der sie empfangen werden. Wenn Sie diese Datenflussblöcke zum gleichzeitigen Verarbeiten von Nachrichten aktivieren möchten, legen Sie die ExecutionDataflowBlockOptions.MaxDegreeOfParallelism Eigenschaft fest, wenn Sie das Datenflussblockobjekt erstellen.
Der Standardwert von MaxDegreeOfParallelism ist 1, was garantiert, dass der Datenflussblock jeweils eine Nachricht verarbeitet. Wenn Sie diese Eigenschaft auf einen Wert festlegen, der größer als 1 ist, kann der Datenflussblock mehrere Nachrichten gleichzeitig verarbeiten. Durch Festlegen dieser Eigenschaft DataflowBlockOptions.Unbounded kann der zugrunde liegende Vorgangszeitplaner den maximalen Grad der Parallelität verwalten.
Von Bedeutung
Wenn Sie einen maximalen Grad an Parallelität angeben, der größer als 1 ist, werden mehrere Nachrichten gleichzeitig verarbeitet, und daher werden Nachrichten möglicherweise nicht in der Reihenfolge verarbeitet, in der sie empfangen werden. Die Reihenfolge, in der die Nachrichten aus dem Block ausgegeben werden, ist jedoch dieselbe, in der sie empfangen werden.
Da die MaxDegreeOfParallelism Eigenschaft den maximalen Grad an Parallelität darstellt, kann der Datenflussblock mit einem geringeren Grad an Parallelität ausgeführt werden, als Sie angeben. Der Datenflussblock kann einen geringeren Grad an Parallelität verwenden, um seine funktionalen Anforderungen zu erfüllen oder weil es an verfügbaren Systemressourcen mangelt. Ein Datenflussblock wählt niemals mehr Parallelität aus, als Sie angeben.
Der Wert der MaxDegreeOfParallelism Eigenschaft ist exklusiv für jedes Dataflow-Blockobjekt. Wenn beispielsweise vier Datenflussblockobjekte jeweils 1 für den maximalen Grad an Parallelität angeben, können alle vier Datenflussblockobjekte potenziell parallel ausgeführt werden.
Ein Beispiel, das den maximalen Grad der Parallelität festlegt, um langwierige Vorgänge parallel zu ermöglichen, finden Sie unter How to: Specify the Degree of Parallelism in a Dataflow Block.
Angeben der Anzahl von Nachrichten pro Aufgabe
Die vordefinierten Datenflussblocktypen verwenden Aufgaben zum Verarbeiten mehrerer Eingabeelemente. Dadurch wird die Anzahl der Aufgabenobjekte minimiert, die zum Verarbeiten von Daten erforderlich sind, wodurch Anwendungen effizienter ausgeführt werden können. Wenn die Aufgaben aus einer Reihe von Datenflussblöcken jedoch Daten verarbeiten, müssen die Aufgaben aus anderen Datenflussblöcken möglicherweise auf die Verarbeitungszeit warten, indem Nachrichten in die Warteschlange gestellt werden. Um eine bessere Fairness zwischen Datenflussaufgaben zu ermöglichen, legen Sie die MaxMessagesPerTask Eigenschaft fest. Wenn MaxMessagesPerTask auf DataflowBlockOptions.Unbounded gesetzt ist, was der Standardwert ist, verarbeitet die Aufgabe, die von einem Datenflussblock verwendet wird, so viele Nachrichten, wie verfügbar sind. Wenn MaxMessagesPerTask auf einen anderen Wert als Unbounded festgelegt wird, verarbeitet der Datenflussblock höchstens diese Anzahl von Nachrichten pro Task-Objekt. Obwohl das Festlegen der MaxMessagesPerTask Eigenschaft die Fairness zwischen Aufgaben erhöhen kann, kann es dazu führen, dass das System mehr Aufgaben als erforderlich erstellt, was die Leistung verringern kann.
Aktivieren des Abbruchs
Die TPL bietet einen Mechanismus, durch den Aufgaben Abbrüche kooperative koordinieren können. Um Datenflussblöcke an diesem Abbruchmechanismus teilnehmen zu lassen, legen Sie die CancellationToken Eigenschaft fest. Wenn dieses CancellationToken Objekt auf den abgebrochenen Zustand festgelegt ist, werden alle Datenflussblöcke, die dieses Token überwachen, die Ausführung des aktuellen Elements beenden, die Verarbeitung nachfolgender Elemente jedoch nicht starten. Diese Datenflussblöcke löschen auch alle gepufferten Nachrichten, geben Verbindungen zu beliebigen Quell- und Zielblöcken frei und wechseln zum abgebrochenen Zustand. Durch den Übergang in den abgebrochenen Zustand, wird die Completion-Eigenschaft der Status-Eigenschaft auf Canceled festgelegt, wenn während der Verarbeitung keine Aufnahme aufgetreten ist. In diesem Fall wird Status auf Faulted gesetzt.
Ein Beispiel zur Verwendung des Abbruchs in einer Windows Forms-Anwendung finden Sie unter How to: Cancel a Dataflow Block. Weitere Informationen über Abbrüche in der TPL finden Sie unter Aufgabenabbruch.
Festlegen von gierigem Verhalten im Vergleich zu nicht gierigem Verhalten
Mehrere Gruppierungsdatenfluss-Blocktypen können entweder im gierigen oder nicht gierigen Modus ausgeführt werden. In der Standardeinstellung arbeiten die vordefinierten Datenflussblocktypen im gierigen Modus.
Bei Verknüpfungsblocktypen wie JoinBlock<T1,T2> bedeutet der Giermodus, dass der Block sofort Daten akzeptiert, auch wenn die entsprechenden Daten, mit denen verknüpft werden soll, noch nicht verfügbar sind. Nicht gieriger Modus bedeutet, dass der Block alle eingehenden Nachrichten zurückstellt, bis eine Nachricht an jedem der zugehörigen Ziele verfügbar ist, um die Gruppierung zu vervollständigen. Wenn eine der verschobenen Nachrichten nicht mehr verfügbar ist, gibt der Verknüpfungsblock alle verschobenen Nachrichten frei und startet den Prozess neu. Für die BatchBlock<T> Klasse ist das gierige und nicht-gierige Verhalten ähnlich, mit der Ausnahme, dass ein BatchBlock<T> Objekt im nicht-gierigen Modus alle eingehenden Nachrichten zurückstellt, bis genügend aus unterschiedlichen Quellen verfügbar sind, um eine Charge abzuschließen.
Legen Sie Greedy auf False
fest, um den nicht gierigen Modus für einen Datenflussblock festzulegen. Ein Beispiel, das veranschaulicht, wie der Nicht-Giermodus verwendet wird, um mehrere Verknüpfungsblöcke zum effizienteren Freigeben einer Datenquelle zu ermöglichen, finden Sie unter How to: Use JoinBlock to Read Data From Multiple Sources.
Benutzerdefinierte Datenflussblöcke
Obwohl die TPL-Datenflussbibliothek viele vordefinierte Blocktypen bereitstellt, können Sie zusätzliche Blocktypen erstellen, die benutzerdefiniertes Verhalten ausführen. Implementieren Sie die ISourceBlock<TOutput>- oder ITargetBlock<TInput>-Schnittstellen direkt oder verwenden Sie exakt die Encapsulate-Methode, um einen komplexen Block zu erstellen, der das Verhalten verfügbarer Blocktypen kapselt. Beispiele, die zeigen, wie benutzerdefinierte Dataflow-Blockfunktionen implementiert werden, finden Sie unter Walkthrough: Creating a Custom Dataflow Block Type.
Verwandte Themen
Titel | BESCHREIBUNG |
---|---|
Vorgehensweise: Schreiben von Nachrichten in und Lesen von Nachrichten aus einem Datenflussblock | Veranschaulicht, wie Nachrichten in ein BufferBlock<T> Objekt geschrieben und gelesen werden. |
Vorgehensweise: Implementieren eines Producer-Consumer Dataflow-Musters | Beschreibt die Verwendung des Datenflussmodells zum Implementieren eines Produzenten-Consumer-Musters, bei dem der Produzent Nachrichten an einen Datenflussblock sendet und der Verbraucher Nachrichten aus diesem Block liest. |
Vorgehensweise: Ausführen einer Aktion, wenn ein Datenflussblock Daten empfängt | Beschreibt, wie Delegierte für die Ausführungsdatenflussblocktypen ActionBlock<TInput>, TransformBlock<TInput,TOutput> und TransformManyBlock<TInput,TOutput> bereitgestellt werden. |
Exemplarische Vorgehensweise: Erstellen einer Datenflusspipeline | Beschreibt, wie Eine Datenflusspipeline erstellt wird, die Text aus dem Web herunterlädt und Vorgänge für diesen Text ausführt. |
Vorgehensweise: Aufheben der Verknüpfung von Datenflussblöcken | Veranschaulicht, wie die LinkTo Methode verwendet wird, um die Verknüpfung eines Zielblocks von seiner Quelle aufzuheben, nachdem die Quelle dem Ziel eine Nachricht angezeigt hat. |
Exemplarische Vorgehensweise: Verwenden von Dataflow in einer Windows Forms-Anwendung | Veranschaulicht das Erstellen eines Netzwerks von Datenflussblöcken, die die Bildverarbeitung in einer Windows Forms-Anwendung ausführen. |
Vorgehensweise: Abbrechen eines Datenflussblocks | Zeigt, wie man das Abbrechen in einer Windows Forms-Anwendung verwendet. |
Vorgehensweise: Verwenden von JoinBlock zum Lesen von Daten aus mehreren Quellen | Erläutert, wie Sie mithilfe der JoinBlock<T1,T2> Klasse einen Vorgang ausführen können, wenn Daten aus mehreren Quellen verfügbar sind, und wie Sie den Nicht-Gier-Modus verwenden, um mehrere Verknüpfungsblöcke zum effizienteren Freigeben einer Datenquelle zu ermöglichen. |
Vorgehensweise: Angeben des Grads der Parallelität in einem Datenflussblock | Beschreibt, wie die MaxDegreeOfParallelism Eigenschaft so festgelegt wird, dass ein Ausführungsdatenflussblock mehrere Nachrichten gleichzeitig verarbeitet. |
Vorgehensweise: Angeben eines Aufgabenplaners in einem Datenflussblock | Veranschaulicht, wie Sie einen bestimmten Aufgabenplaner zuordnen, wenn Sie Datenflüsse in Ihrer Anwendung verwenden. |
Exemplarische Vorgehensweise: Verwenden von BatchBlock und BatchedJoinBlock zur Verbesserung der Effizienz | Beschreibt, wie die BatchBlock<T> Klasse verwendet wird, um die Effizienz von Datenbankeinfügungsvorgängen zu verbessern und wie die BatchedJoinBlock<T1,T2> Klasse zum Erfassen der Ergebnisse und aller Ausnahmen verwendet wird, die während des Lesens des Programms aus einer Datenbank auftreten. |
Exemplarische Vorgehensweise: Erstellen eines benutzerdefinierten Datenflussblocktyps | Veranschaulicht zwei Möglichkeiten zum Erstellen eines Datenflussblocktyps, der benutzerdefiniertes Verhalten implementiert. |
Task Parallel Library (TPL) | Führt die TPL ein, eine Bibliothek, die die parallele und gleichzeitige Programmierung in .NET Framework-Anwendungen vereinfacht. |