雖然您可以使用 DataflowBlock.Receive、 DataflowBlock.ReceiveAsync和 DataflowBlock.TryReceive 方法來接收來源區塊的訊息,但您也可以連接消息區塊以形成 數據流管線。 數據流管線是一系列元件或 數據流區塊,每個元件都會執行特定工作,以促成較大的目標。 數據流管線中的每個數據流區塊都會在收到來自另一個數據流區塊的訊息時執行工作。 類比來說,就像汽車製造中的組裝線。 當每輛車通過組裝線時,一個月臺組裝框架,下一個月臺安裝發動機等等。 由於組裝線可同時組裝多輛車,因此可提供比一次組裝完整車輛更好的輸送量。
本文件示範一個數據流管線,此管線會從網站下載荷馬的《伊利亞德》一書,並搜尋文本以將每個單字與其字元反轉的單字匹配。 本文件中數據流管線的形成包含下列步驟:
建立參與管線的數據流區塊。
將每個數據流區塊連接到管線中的下一個區塊。 每個區塊都會將上一個區塊的輸出作為輸入接收。
針對每個數據流區塊,建立接續工作,將下一個區塊設定為上一個區塊完成之後的狀態。
將數據張貼至管線的前端。
將管線的前端標示為已完成。
等候管線完成所有工作。
先決條件
開始本逐步解說之前,請先閱讀 數據流 。
建立主控台應用程式
在 Visual Studio 中,建立 Visual C# 或 Visual Basic 控制台應用程式專案。 安裝 System.Threading.Tasks.Dataflow NuGet 套件。
備註
TPL 資料流連結庫 (System.Threading.Tasks.Dataflow 命名空間) 不會與 .NET 一起散發。 若要在 Visual Studio 中安裝 System.Threading.Tasks.Dataflow 命名空間,請開啟您的專案,從 [專案] 功能表選擇 [管理 NuGet 套件],然後在線搜尋 System.Threading.Tasks.Dataflow
套件。 或者,若要使用 .NET Core CLI安裝它,請執行 dotnet add package System.Threading.Tasks.Dataflow
。
將下列程式代碼新增至您的專案,以建立基本應用程式。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
static void Main()
{
}
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
End Sub
End Module
建立數據流區塊
將下列程式代碼新增至 Main
方法,以建立參與管線的數據流區塊。 下表摘要說明管線每個成員的角色。
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine($"Downloading '{uri}'...");
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
會員 | 類型 | 說明 |
---|---|---|
downloadString |
TransformBlock<TInput,TOutput> | 從 Web 下載書籍文字。 |
createWordList |
TransformBlock<TInput,TOutput> | 將書籍文字分隔成由多個單字組成的數組。 |
filterWordList |
TransformBlock<TInput,TOutput> | 從詞彙陣列中移除短字和重複的字。 |
findReversedWords |
TransformManyBlock<TInput,TOutput> | 尋找篩選後的文字陣列集合中的所有單字,其反向也出現在單字陣列中。 |
printReversedWords |
ActionBlock<TInput> | 將單字和對應的反向單字顯示至主控台。 |
雖然您可以將此範例中的數據流管線中的多個步驟合併成一個步驟,但此範例說明撰寫多個獨立數據流工作以執行較大工作的概念。 此範例會使用 TransformBlock<TInput,TOutput> 來讓管線的每個成員在其輸入數據上執行作業,並將結果傳送至管線中的下一個步驟。
findReversedWords
管線的成員是 TransformManyBlock<TInput,TOutput> 對象,因為它會為每個輸入產生多個獨立輸出。 管線 printReversedWords
的尾端是 ActionBlock<TInput> 對象,因為它在其輸入上執行動作,而且不會產生結果。
建立管線
新增下列程式代碼,將每個區塊連接到管線中的下一個區塊。
當您呼叫 LinkTo 方法將源資料流區塊連接到目標資料流區塊時,源資料流區塊會將資料傳播至目標區塊,當資料變得可用時。 如果您也提供 DataflowLinkOptions 並將 PropagateCompletion 設定為 true,則管線中某個區塊的成功或失敗完成會導致管線中下一個區塊的完成。
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
將資料發送至管線
新增下列程序代碼,將 Homer 的 Iliad 書籍 URL 張貼至數據流管線的前端。
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
這個範例會使用 DataflowBlock.Post 同步方式將數據傳送至管線的前端。 DataflowBlock.SendAsync當您必須以異步方式將數據傳送至數據流節點時,請使用 方法。
正在完成管線活動
新增下列程序代碼,將管線的前端標示為已完成。 管線的前端在處理完所有緩衝訊息後,會宣布其完成。
// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()
此範例會透過要處理的數據流管線傳送一個URL。 如果您透過管線傳送多個輸入,請在提交所有輸入之後呼叫 IDataflowBlock.Complete 方法。 如果您的應用程式在有關資料不再可用時沒有明確的定義點,或者應用程式不需要等候管線處理完成,則可以省略此步驟。
等候管線完成
新增下列程式代碼以等候管線完成。 當管線結尾完成時,整體作業就會完成。
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
您可以等候任何線程或多個線程同時完成數據流。
完整範例
下列範例示範本逐步解說的完整程序代碼。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
static void Main()
{
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine($"Downloading '{uri}'...");
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
// Mark the head of the pipeline as complete.
downloadString.Complete();
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
}
}
/* Sample output:
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words doom/mood
Found reversed words draw/ward
Found reversed words aera/area
Found reversed words seat/taes
Found reversed words live/evil
Found reversed words port/trop
Found reversed words sleek/keels
Found reversed words area/aera
Found reversed words tops/spot
Found reversed words evil/live
Found reversed words mood/doom
Found reversed words speed/deeps
Found reversed words moor/room
Found reversed words trop/port
Found reversed words spot/tops
Found reversed words spots/stops
Found reversed words stops/spots
Found reversed words reed/deer
Found reversed words keels/sleek
Found reversed words deeps/speed
Found reversed words deer/reed
Found reversed words taes/seat
Found reversed words room/moor
Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
' Mark the head of the pipeline as complete.
downloadString.Complete()
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
End Sub
End Module
' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw
後續步驟
此範例會傳送一個 URL 以透過資料流管線進行處理。 如果您透過管線傳送多個輸入值,您可以將平行處理原則的形式引入您的應用程式,類似於零件在汽車工廠中移動的方式。 當管線的第一個成員將其結果傳送至第二個成員時,當第二個成員處理第一個結果時,它可以平行處理另一個專案。
使用數據流管線達成的 平行處理原則稱為粗略平行處理原則 ,因為它通常包含較少的較大型工作。 您也可以在數據流管道中使用更精細的並行性,以處理較小而短暫的任務。 在此範例中, findReversedWords
管線的成員會使用 PLINQ 平行處理工作清單中的多個專案。 粗細管線中使用精細平行處理原則可以改善整體輸送量。
您也可以將來源數據流區塊連線到多個目標區塊,以建立 數據流網路。 LinkTo 方法的重載版本會使用 Predicate<T> 物件來定義目標區塊是否根據其值接受每個訊息。 大部分作為來源的數據流區塊類型都會以連線的順序,提供訊息給所有已連線的目標區塊,直到其中一個區塊接受該訊息為止。 藉由使用此篩選機制,您可以建立由資料流區塊組成的系統,以將特定資料引導至一個路徑,而其他資料則透過另一個路徑。 如需使用篩選來建立數據流網路的範例,請參閱 逐步解說:在 Windows Forms 應用程式中使用數據流。