Exemplarische Vorgehensweise: Erstellen einer Datenflusspipeline
Obwohl Sie die Methoden DataflowBlock.Receive, DataflowBlock.ReceiveAsync und DataflowBlock.TryReceive verwenden können, um Nachrichten von Quellblöcken zu empfangen, können Sie auch Nachrichtenblöcke verbinden, um eine Datenflusspipeline zu bilden. Eine Datenpipeline besteht aus einer Reihe von Komponenten, oder Datenflussblöcken, von denen jede eine bestimmte Aufgabe ausführt, die zu einem größeren Ziel beiträgt. Jeder Datenflussblock in einer Datenflusspipeline erledigt eine Aufgabe, wenn er eine Meldung von einem anderen Datenflussblock empfangen hat. Eine Analogie hierzu ist eine Fertigungsstraße eines Fahrzeugherstellers. Jedes Fahrzeug durchläuft die Fertigungsstraße: In einer Station wird das Fahrgestell montiert, in der nächsten wird der Motor eingebaut usw. Da eine Fertigungsstraße ermöglicht, dass mehrere Fahrzeuge gleichzeitig montiert werden, bietet sie einen besseren Durchsatz als eine Einzelmontage eines vollständigen Fahrzeugs.
Dieses Dokument veranschaulicht eine Datenflusspipeline, die das Buch The Iliad of Homer von einer Website herunterlädt und den Text durchsucht, um einzelne Wörter mit Wörtern abzugleichen, in denen die Zeichen des ersten Worts umgekehrt sind. Die Entwicklung der Datenflusspipeline in diesem Dokument besteht aus den folgenden Schritten:
Erstellen Sie die Datenflussblöcke, die Bestandteile der Pipeline sein sollen.
Verbinden Sie jeden Datenflussblock mit dem nächsten Block in der Pipeline. Jeder Block empfängt als Eingabe die Ausgabe des jeweils vorherigen Blocks in der Pipeline.
Erstellen Sie für jeden Datenflussblock eine Fortsetzungsaufgabe, die den nächsten Datenblock in den Zustand Abgeschlossen schaltet, nachdem der vorherige Block beendet wurde.
Senden Sie Daten an den Anfang der Pipeline.
Kennzeichnen Sie den Anfang der Pipeline als abgeschlossen.
Warten Sie, bis die Pipeline alle Aufgaben ausgeführt hat.
Voraussetzungen
Lesen Sie das Thema Datenfluss, bevor Sie mit dieser exemplarischen Vorgehensweise beginnen.
Erstellen einer Konsolenanwendung
Erstellen Sie in Visual Studio ein Visual C#- oder Visual Basic-Projekt des Typs „Konsolenanwendung“. Installieren Sie das NuGet-Paket „System.Threading.Tasks.Dataflow“.
Hinweis
Die TPL-Datenflussbibliothek (System.Threading.Tasks.Dataflow-Namespace) wird nicht mit .NET ausgeliefert. Öffnen Sie zum Installieren des System.Threading.Tasks.Dataflow-Namespace in Visual Studio Ihr Projekt, wählen Sie im Menü Projekt die Option NuGet-Pakete verwalten aus, und suchen Sie online nach dem System.Threading.Tasks.Dataflow
-Paket. Alternativ können Sie es mithilfe der .NET Core-CLI installieren und dazu dotnet add package System.Threading.Tasks.Dataflow
ausführen.
Fügen Sie Ihrem Projekt den folgenden Code hinzu , um die Grundanwendung zu erstellen.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
static void Main()
{
}
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
End Sub
End Module
Erstellen der Datenflussblöcke
Fügen Sie der Main
-Methode den folgenden Code hinzu, um die Datenflussblöcke zu erstellen, die Bestandteile der Pipeline sein sollen. Die folgende Tabelle enthält eine Übersicht über die Rollen, die die Elemente (Member) der Pipeline haben.
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine("Found reversed words {0}/{1}",
reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
Member | Typ | Beschreibung |
---|---|---|
downloadString |
TransformBlock<TInput,TOutput> | Lädt den Text des Buchs aus dem Internet herunter. |
createWordList |
TransformBlock<TInput,TOutput> | Trennt den Text des Buchs in ein Array aus einzelnen Wörtern. |
filterWordList |
TransformBlock<TInput,TOutput> | Entfernt kurze Wörter und Duplikate aus dem Wortarray. |
findReversedWords |
TransformManyBlock<TInput,TOutput> | Sucht in der gefilterten Wortarrayauflistung nach allen Wörtern, deren Umkehrung ebenfalls im Wortarray enthalten ist. |
printReversedWords |
ActionBlock<TInput> | Zeigt die jeweiligen Wörter und die entsprechenden Umkehrwörter in der Konsole an. |
Obwohl Sie mehrere Schritte in der Datenflusspipeline in diesem Beispiel in einem einzigen Schritt kombinieren könnten, veranschaulicht dieses Beispiel das Konzept des Zusammenstellens mehrerer unabhängiger Datenflussaufgaben, um eine größere Aufgabe auszuführen. In dem Beispiel wird TransformBlock<TInput,TOutput> verwendet, damit jedes Element der Pipeline einen Vorgang für seine Eingabedaten ausführen und die Ergebnisse an den nächsten Schritt in der Pipeline senden kann. Das findReversedWords
-Element der Pipeline ist ein TransformManyBlock<TInput,TOutput>-Objekt, da es für jede Eingabe mehrere unabhängige Ausgaben erzeugt. Das Ende der Pipeline, printReversedWords
, ist ein ActionBlock<TInput>-Objekt, weil es eine Aktion für seine Eingabe ausführt und kein Ergebnis erzeugt.
Aufbauen der Pipeline
Fügen Sie den folgenden Code ein, um jeden Block mit dem nächsten Block in der Pipeline zu verbinden.
Wenn Sie die LinkTo-Methode aufrufen, um einen Quelldatenflussblock mit einem Zieldatenflussblock zu verbinden, überträgt der Quelldatenflussblock Daten an den Zielblock, sobald Daten verfügbar sind. Wenn Sie auch DataflowLinkOptions mit PropagateCompletion (Wert auf „true“ gesetzt) angeben, führt ein erfolgreicher oder nicht erfolgreicher Abschluss eines Blocks in der Pipeline dazu, dass der nächste Block in der Pipeline abgeschlossen wird.
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
Senden von Daten an die Pipeline
Fügen Sie folgenden Code hinzu, um die URL des Buchs The Iliad of Homer an den Anfang der Datenflusspipeline zu senden.
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
In diesem Beispiel wird DataflowBlock.Post verwendet, damit die Daten synchron an den Anfang der Pipeline gesendet werden. Verwenden Sie die DataflowBlock.SendAsync-Methode, wenn Sie Daten asynchron an einen Datenflussknoten senden müssen.
Abschließen der Pipelineaktivität
Fügen Sie den folgenden Code ein, um den Anfang der Pipeline als abgeschlossen zu kennzeichnen. Der Anfang der Pipeline überträgt seinen Abschluss, nachdem er alle gepufferten Nachrichten verarbeitet hat.
// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()
In diesem Beispiel wird eine URL durch die Datenflusspipeline gesendet, um verarbeitet zu werden. Wenn Sie mehrere Eingaben durch eine Pipeline senden, rufen Sie die IDataflowBlock.Complete-Methode auf, nachdem Sie alle Eingaben übertragen haben. Sie können auf diesen Schritt verzichten, wenn Ihre Anwendung keinen wohldefinierten Punkt hat, an dem keine Daten mehr verfügbar sind, oder wenn die Anwendung nicht warten muss, bis die Pipeline beendet ist.
Warten bis zum Beenden der Pipeline
Fügen Sie den folgenden Code hinzu, damit gewartet wird, bis die Pipeline beendet ist. Der gesamte Vorgang ist abgeschlossen, wenn das Ende der Pipeline beendet ist.
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
Sie können von jedem Thread oder von mehreren Threads gleichzeitig auf den Datenflussabschluss warten.
Vollständiges Beispiel
Das folgende Beispiel enthält den vollständigen Code für diese exemplarische Vorgehensweise.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
static void Main()
{
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine("Found reversed words {0}/{1}",
reversedWord, new string(reversedWord.Reverse().ToArray()));
});
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
// Mark the head of the pipeline as complete.
downloadString.Complete();
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
}
}
/* Sample output:
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words doom/mood
Found reversed words draw/ward
Found reversed words aera/area
Found reversed words seat/taes
Found reversed words live/evil
Found reversed words port/trop
Found reversed words sleek/keels
Found reversed words area/aera
Found reversed words tops/spot
Found reversed words evil/live
Found reversed words mood/doom
Found reversed words speed/deeps
Found reversed words moor/room
Found reversed words trop/port
Found reversed words spot/tops
Found reversed words spots/stops
Found reversed words stops/spots
Found reversed words reed/deer
Found reversed words keels/sleek
Found reversed words deeps/speed
Found reversed words deer/reed
Found reversed words taes/seat
Found reversed words room/moor
Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
' Mark the head of the pipeline as complete.
downloadString.Complete()
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
End Sub
End Module
' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw
Nächste Schritte
In diesem Beispiel wird eine zu verarbeitende URL durch die Datenflusspipeline gesendet. Wenn Sie mehrere Eingabewerte durch eine Pipeline senden, können Sie eine Form von Parallelität in Ihrer Anwendung implementieren, die der Art und Weise ähnelt, wie Teile durch eine Automobilfabrik bewegt werden. Wenn das erste Element der Pipeline sein Ergebnis an das zweite Element gesendet hat, kann es parallel ein weiteres Objekt verarbeiten, während das zweite Element das erste Ergebnis verarbeitet.
Die Parallelität, die durch ein Verwenden von Datenflusspipelines erreicht wird, wird als grobmaschige Parallelität bezeichnet, weil sie in der Regel aus weniger, dafür aber größeren Aufgaben besteht. Sie können auch eine feinmaschigere Parallelität von kleineren Aufgaben mit kurzer Ausführungszeit in einer Datenflusspipeline verwenden. In diesem Beispiel wird für das findReversedWords
-Element der Pipeline PLINQ verwendet, um mehrere Elemente in der Arbeitsliste parallel zu verarbeiten. Die Verwendung von feinmaschiger Parallelität in einer grobmaschigen Pipeline kann den Gesamtdurchsatz verbessern.
Sie können auch einen Quelldatenflussblock mit mehreren Zielblöcken verbinden, um ein Datenflussnetzwerk zu erstellen. Die überladene Version der LinkTo-Methode übernimmt ein Predicate<T>-Objekt, das definiert, ob der Zielblock jede Nachricht anhand dessen Wert akzeptiert. Die meisten Datenflussblocktypen, die als Quellen fungieren, senden Nachrichten an alle verbundenen Zielblöcke in der Reihenfolge, in der sie verbunden wurden, bis einer der Blöcke die jeweilige Nachricht akzeptiert hat. Durch Verwenden dieser Filtermechanismus können Sie Systeme von verbundenen Datenflussblöcken erstellen, mit denen bestimmte Daten durch einen Pfad und andere Daten durch einen anderen Pfad geleitet werden. Ein Beispiel, in dem Filterung zum Erstellen eines Datenflussnetzwerks verwendet wird, finden Sie unter Exemplarische Vorgehensweise: Using Dataflow in a Windows Forms Application (Exemplarische Vorgehensweise: Verwenden von Datenflüssen in einer Windows Forms-Anwendung).