Anmerkung
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen, dich anzumelden oder die Verzeichnisse zu wechseln.
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen , die Verzeichnisse zu wechseln.
Sie können zwar die DataflowBlock.Receive, DataflowBlock.ReceiveAsync und DataflowBlock.TryReceive Methoden verwenden, um Nachrichten von Quellblöcken zu empfangen, aber Sie können auch Nachrichtenblöcke verbinden, um eine Datenflusspipeline zu bilden. Eine Datenflusspipeline ist eine Reihe von Komponenten oder Datenflussblöcken, von denen jeder eine bestimmte Aufgabe ausführt, die zu einem größeren Ziel beiträgt. Jeder Datenflussblock in einer Datenflusspipeline führt Arbeit durch, wenn er eine Nachricht von einem anderen Datenflussblock empfängt. Eine Analogie dazu ist eine Montagelinie für die Automobilherstellung. Wenn jedes Fahrzeug durch die Montagelinie geht, montiert eine Station den Rahmen, das nächste installiert den Motor usw. Da eine Montagelinie mehrere Fahrzeuge gleichzeitig montiert werden kann, bietet sie einen besseren Durchsatz als die Montage kompletter Fahrzeuge gleichzeitig.
Dieses Dokument veranschaulicht eine Datenflusspipeline, die das Buch The Iliad of Homer von einer Website herunterlädt und den Text durchsucht, um einzelne Wörter mit Wörtern abzugleichen, die die Zeichen des ersten Worts umkehren. Die Bildung der Datenflusspipeline in diesem Dokument besteht aus den folgenden Schritten:
Erstellen Sie die Datenflussblöcke, die an der Pipeline teilnehmen.
Verbinden Sie jeden Datenflussblock mit dem nächsten Block in der Pipeline. Jeder Block empfängt als Eingabe die Ausgabe des vorherigen Blocks in der Pipeline.
Erstellen Sie für jeden Datenflussblock eine Fortsetzungsaufgabe, die den nächsten Block nach Abschluss des vorherigen Blocks auf den abgeschlossenen Zustand festlegt.
Senden Sie Daten an den Anfang der Pipeline.
Markieren Sie den Leiter der Pipeline als abgeschlossen.
Warten Sie, bis die Pipeline alle Arbeiten abgeschlossen hat.
Voraussetzungen
Lesen Sie Dataflow, bevor Sie diese Anleitung starten.
Erstellen einer Konsolenanwendung
Erstellen Sie in Visual Studio ein Visual C#- oder Visual Basic-Konsolenanwendungsprojekt. Installieren Sie das System.Threading.Tasks.Dataflow NuGet-Paket.
Hinweis
Die TPL-Datenflussbibliothek (der System.Threading.Tasks.Dataflow Namespace) ist in .NET 6 und höheren Versionen enthalten. Für .NET Framework- und .NET Standard-Projekte müssen Sie das 📦 NuGet-Paket "System.Threading.Tasks.Dataflow" installieren.
Fügen Sie Ihrem Projekt den folgenden Code hinzu, um die einfache Anwendung zu erstellen.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
static void Main()
{
}
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
End Sub
End Module
Erstellen der Datenflussblöcke
Fügen Sie der Main Methode den folgenden Code hinzu, um die Datenflussblöcke zu erstellen, die an der Pipeline teilnehmen. In der folgenden Tabelle wird die Rolle der einzelnen Mitglieder der Pipeline zusammengefasst.
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine($"Downloading '{uri}'...");
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
| Mitglied | Typ | Description |
|---|---|---|
downloadString |
TransformBlock<TInput,TOutput> | Lädt den Buchtext aus dem Web herunter. |
createWordList |
TransformBlock<TInput,TOutput> | Trennt den Buchtext in ein Array von Wörtern. |
filterWordList |
TransformBlock<TInput,TOutput> | Entfernt kurze Wörter und Duplikate aus dem Wort-Array. |
findReversedWords |
TransformManyBlock<TInput,TOutput> | Sucht alle Wörter in der gefilterten Wort-Array-Sammlung, deren Umkehrung auch im Wort-Array vorkommt. |
printReversedWords |
ActionBlock<TInput> | Zeigt Wörter und die entsprechenden umgekehrten Wörter auf der Konsole an. |
Obwohl Sie in diesem Beispiel mehrere Schritte in der Datenflusspipeline in einem Schritt kombinieren können, veranschaulicht das Beispiel das Konzept der Erstellung mehrerer unabhängiger Datenflussaufgaben, um eine größere Aufgabe auszuführen. Das Beispiel verwendet TransformBlock<TInput,TOutput> , um jedem Mitglied der Pipeline zu ermöglichen, einen Vorgang für die Eingabedaten auszuführen und die Ergebnisse an den nächsten Schritt in der Pipeline zu senden. Das findReversedWords Element der Pipeline ist ein TransformManyBlock<TInput,TOutput> Objekt, da es für jede Eingabe mehrere unabhängige Ausgaben erzeugt. Das Endstück der Pipeline, printReversedWords, ist ein ActionBlock<TInput> Objekt, da es eine Aktion auf seine Eingabe ausführt, ohne ein Ergebnis zu erzeugen.
Formieren der Pipeline
Fügen Sie den folgenden Code hinzu, um jeden Block mit dem nächsten Block in der Pipeline zu verbinden.
Wenn Sie die LinkTo Methode aufrufen, um einen Quelldatenflussblock mit einem Zieldatenflussblock zu verbinden, verteilt der Quelldatenflussblock Daten an den Zielblock, sobald Daten verfügbar werden. Wenn Sie auch den Wert "true" angeben DataflowLinkOptionsPropagateCompletion , führt der erfolgreiche oder erfolglose Abschluss eines Blocks in der Pipeline zum Abschluss des nächsten Blocks in der Pipeline.
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
Senden von Daten an die Pipeline
Fügen Sie den folgenden Code hinzu, um die URL des Buchs The Iliad of Homer an den Kopf der Datenflusspipeline zu posten.
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
In diesem Beispiel werden DataflowBlock.Post Daten synchron an den Leiter der Pipeline gesendet. Verwenden Sie die DataflowBlock.SendAsync Methode, wenn Sie Daten asynchron an einen Datenflussknoten senden müssen.
Abschluss der Pipelineaktivität
Fügen Sie den folgenden Code hinzu, um den Kopf der Pipeline als abgeschlossen zu markieren. Der Kopf der Pipeline signalisiert seinen Abschluss, nachdem er alle gepufferten Nachrichten verarbeitet hat.
// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()
In diesem Beispiel wird eine URL über die Datenflusspipeline gesendet, die verarbeitet werden soll. Wenn Sie mehrere Eingaben über eine Pipeline senden, rufen Sie die IDataflowBlock.Complete Methode auf, nachdem Sie alle Eingaben übermittelt haben. Sie können diesen Schritt auslassen, wenn Ihre Anwendung keinen gut definierten Punkt aufweist, an dem Daten nicht mehr verfügbar sind oder die Anwendung nicht warten muss, bis die Pipeline abgeschlossen ist.
Warten auf die Fertigstellung der Pipeline
Fügen Sie den folgenden Code hinzu, um auf das Ende der Pipeline zu warten. Der gesamte Vorgang ist abgeschlossen, wenn das Ende der Pipeline abgeschlossen ist.
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
Sie können auf den Abschluss des Datenflusses von einem beliebigen Thread oder von mehreren Threads gleichzeitig warten.
Vollständiges Beispiel
Das folgende Beispiel zeigt den vollständigen Code für diese exemplarische Vorgehensweise.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
static void Main()
{
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine($"Downloading '{uri}'...");
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
// Mark the head of the pipeline as complete.
downloadString.Complete();
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
}
}
/* Sample output:
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words doom/mood
Found reversed words draw/ward
Found reversed words aera/area
Found reversed words seat/taes
Found reversed words live/evil
Found reversed words port/trop
Found reversed words sleek/keels
Found reversed words area/aera
Found reversed words tops/spot
Found reversed words evil/live
Found reversed words mood/doom
Found reversed words speed/deeps
Found reversed words moor/room
Found reversed words trop/port
Found reversed words spot/tops
Found reversed words spots/stops
Found reversed words stops/spots
Found reversed words reed/deer
Found reversed words keels/sleek
Found reversed words deeps/speed
Found reversed words deer/reed
Found reversed words taes/seat
Found reversed words room/moor
Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
' Mark the head of the pipeline as complete.
downloadString.Complete()
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
End Sub
End Module
' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw
Nächste Schritte
In diesem Beispiel wird eine URL gesendet, die über die Datenflusspipeline verarbeitet werden soll. Wenn Sie mehr als einen Eingabewert über eine Pipeline senden, können Sie eine Form von Parallelität in Ihre Anwendung einführen, die ähnelt, wie Teile sich durch eine Automobilfabrik bewegen können. Wenn das erste Mitglied der Pipeline sein Ergebnis an das zweite Element sendet, kann es ein anderes Element parallel verarbeiten, während das zweite Element das erste Ergebnis verarbeitet.
Die Parallelität, die durch die Verwendung von Datenflusspipelines erreicht wird, wird als grobkörniger Parallelismus bezeichnet, da sie in der Regel aus weniger, größeren Vorgängen besteht. Sie können auch einen feineren Parallelismus kleinerer, kurz ausgeführter Aufgaben in einer Datenflusspipeline verwenden. In diesem Beispiel verwendet das findReversedWords Mitglied der Pipeline PLINQ , um mehrere Elemente in der Arbeitsliste parallel zu verarbeiten. Die Verwendung feinkörniger Parallelität in einer grobkörnigen Pipeline kann den Gesamtdurchsatz verbessern.
Sie können auch einen Quelldatenflussblock mit mehreren Zielblöcken verbinden, um ein Datenflussnetzwerk zu erstellen. Die überladene Version der LinkTo Methode verwendet ein Predicate<T> Objekt, das definiert, ob der Zielblock jede Nachricht basierend auf seinem Wert akzeptiert. Die meisten Datenflussblocktypen, die als Quellen fungieren, bieten Nachrichten an alle verbundenen Zielblöcke in der Reihenfolge, in der sie verbunden waren, bis eine der Blöcke diese Nachricht akzeptiert. Mithilfe dieses Filtermechanismus können Sie Systeme mit verbundenen Datenflussblöcken erstellen, die bestimmte Daten über einen Pfad und andere Daten über einen anderen Pfad leiten. Ein Beispiel, das filtert, um ein Datenflussnetzwerk zu erstellen, finden Sie unter Walkthrough: Using Dataflow in a Windows Forms Application.