Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Meskipun Anda dapat menggunakan metode DataflowBlock.Receive, DataflowBlock.ReceiveAsync, dan DataflowBlock.TryReceive untuk menerima pesan dari blok sumber, Anda juga dapat menghubungkan blok pesan untuk membentuk alur aliran data . Alur aliran data adalah serangkaian komponen, atau blok aliran data, yang masing-masing melakukan tugas tertentu yang berkontribusi pada tujuan yang lebih besar. Setiap blok aliran data dalam alur aliran data melakukan pekerjaan saat menerima pesan dari blok aliran data lain. Analogi untuk ini adalah jalur perakitan untuk manufaktur mobil. Ketika setiap kendaraan melewati jalur perakitan, satu stasiun merakitan bingkai, yang berikutnya memasang mesin, dan sebagainya. Karena jalur perakitan memungkinkan beberapa kendaraan untuk dirakit pada saat yang sama, jalur ini memberikan throughput yang lebih baik daripada merakit kendaraan lengkap satu per satu.
Dokumen ini menunjukkan alur aliran data yang mengunduh buku The Iliad of Homer dari situs web dan mencari teks untuk mencocokkan kata individual dengan kata-kata yang membalikkan karakter kata pertama. Pembentukan alur aliran data dalam dokumen ini terdiri dari langkah-langkah berikut:
Buat blok aliran data yang berpartisipasi dalam pipa data.
Sambungkan setiap blok aliran data ke blok berikutnya dalam alur. Setiap blok menerima output blok sebelumnya sebagai input dalam alur kerja.
Untuk setiap blok aliran data, buat tugas kelanjutan yang mengatur blok berikutnya ke status selesai setelah blok sebelumnya selesai.
Mengirim data ke ujung depan alur.
Tandai ujung alur kerja sebagai telah selesai.
Tunggu hingga proses selesai semua pekerjaan.
Prasyarat
Baca Aliran Data sebelum Anda memulai panduan ini.
Membuat Aplikasi Konsol
Di Visual Studio, buat proyek Aplikasi Visual C# atau Visual Basic Console. Instal paket System.Threading.Tasks.Dataflow NuGet.
Nota
Pustaka Aliran Data TPL (namespace System.Threading.Tasks.Dataflow) tidak didistribusikan dengan .NET. Untuk menginstal namespace System.Threading.Tasks.Dataflow di Visual Studio, buka proyek Anda, pilih Kelola Paket NuGet dari menu Project, dan cari paket System.Threading.Tasks.Dataflow secara online. Atau, untuk menginstalnya menggunakan .NET Core CLI , jalankan dotnet add package System.Threading.Tasks.Dataflow.
Tambahkan kode berikut ke proyek Anda untuk membuat aplikasi dasar.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
static void Main()
{
}
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
End Sub
End Module
Membuat Blok Aliran Data
Tambahkan kode berikut ke metode Main untuk membuat blok aliran data yang berpartisipasi dalam alur. Tabel berikut ini meringkas peran setiap anggota alur.
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine($"Downloading '{uri}'...");
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
| Anggota | Tipe | Deskripsi |
|---|---|---|
downloadString |
TransformBlock<TInput,TOutput> | Mengunduh teks buku dari Web. |
createWordList |
TransformBlock<TInput,TOutput> | Memisahkan teks buku menjadi sebuah array kata. |
filterWordList |
TransformBlock<TInput,TOutput> | Menghapus kata-kata pendek dan duplikat dari array kata. |
findReversedWords |
TransformManyBlock<TInput,TOutput> | Menemukan semua kata dalam kumpulan array kata yang difilter yang kebalikannya juga ada dalam array kata. |
printReversedWords |
ActionBlock<TInput> | Menampilkan kata-kata dan kata terbalik yang sesuai ke konsol. |
Meskipun Anda dapat menggabungkan beberapa langkah dalam alur aliran data dalam contoh ini menjadi satu langkah, contoh tersebut mengilustrasikan konsep pembuatan beberapa tugas aliran data independen untuk melakukan tugas yang lebih besar. Contoh ini menggunakan TransformBlock<TInput,TOutput> untuk memungkinkan setiap anggota alur melakukan operasi pada data inputnya dan mengirim hasilnya ke langkah berikutnya dalam alur. Anggota findReversedWords alur adalah objek TransformManyBlock<TInput,TOutput> karena menghasilkan beberapa output independen untuk setiap input. Ekor alur, printReversedWords, adalah objek ActionBlock<TInput> karena melakukan tindakan pada inputnya, dan tidak menghasilkan hasil.
Membentuk Alur
Tambahkan kode berikut untuk menyambungkan setiap blok ke blok berikutnya dalam alur.
Saat Anda memanggil metode LinkTo untuk menyambungkan blok aliran data sumber ke blok aliran data target, blok aliran data sumber menyebarkan data ke blok target saat data tersedia. Jika Anda juga menyediakan DataflowLinkOptions dengan PropagateCompletion diatur ke true, penyelesaian satu blok yang berhasil atau gagal dalam pipa data akan mengakibatkan penyelesaian blok berikutnya dalam pipa data.
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
Memposting Data ke Alur Kerja
Tambahkan kode berikut untuk memposting URL buku Iliad of Homer ke kepala alur aliran data.
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
Contoh ini menggunakan DataflowBlock.Post untuk mengirim data secara sinkron ke kepala alur. Gunakan metode DataflowBlock.SendAsync saat Anda harus mengirim data secara asinkron ke simpul aliran data.
Menyelesaikan Aktivitas Alur
Tambahkan kode berikut untuk menandai ujung alur sebagai telah selesai. Kepala saluran mengumumkan penyelesaiannya setelah memproses semua pesan yang disimpan di buffer.
// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()
Contoh ini mengirimkan satu URL melalui alur aliran data yang akan diproses. Jika Anda mengirim lebih dari satu input melalui alur, panggil metode IDataflowBlock.Complete setelah Anda mengirimkan semua input. Anda dapat menghilangkan langkah ini jika aplikasi Anda tidak memiliki titik yang ditentukan dengan baik di mana data tidak lagi tersedia atau aplikasi tidak perlu menunggu alur selesai.
Menunggu Alur Selesai
Tambahkan kode berikut untuk menunggu hingga pipeline selesai. Operasi keseluruhan selesai ketika ekor pipa selesai.
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
Anda dapat menunggu penyelesaian aliran data dari utas apa pun atau dari beberapa utas secara bersamaan.
Contoh Lengkap
Contoh berikut menunjukkan kode lengkap untuk panduan ini.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
static void Main()
{
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine($"Downloading '{uri}'...");
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
// Mark the head of the pipeline as complete.
downloadString.Complete();
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
}
}
/* Sample output:
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words doom/mood
Found reversed words draw/ward
Found reversed words aera/area
Found reversed words seat/taes
Found reversed words live/evil
Found reversed words port/trop
Found reversed words sleek/keels
Found reversed words area/aera
Found reversed words tops/spot
Found reversed words evil/live
Found reversed words mood/doom
Found reversed words speed/deeps
Found reversed words moor/room
Found reversed words trop/port
Found reversed words spot/tops
Found reversed words spots/stops
Found reversed words stops/spots
Found reversed words reed/deer
Found reversed words keels/sleek
Found reversed words deeps/speed
Found reversed words deer/reed
Found reversed words taes/seat
Found reversed words room/moor
Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
' Mark the head of the pipeline as complete.
downloadString.Complete()
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
End Sub
End Module
' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw
Langkah Berikutnya
Contoh ini mengirimkan satu URL untuk diproses melalui alur aliran data. Jika Anda mengirim lebih dari satu nilai input melalui alur, Anda dapat memperkenalkan bentuk paralelisme ke dalam aplikasi Anda yang menyerupai bagaimana bagian mungkin bergerak melalui pabrik mobil. Ketika anggota pertama alur mengirimkan hasilnya ke anggota kedua, itu dapat memproses item lain secara paralel saat anggota kedua memproses hasil pertama.
Paralelisme yang dicapai dengan menggunakan alur aliran data dikenal sebagai paralelisme kasar karena biasanya terdiri dari tugas yang lebih sedikit dan lebih besar. Anda juga dapat menggunakan paralelisme yang lebih terperinci dengan tugas-tugas kecil dan waktu eksekusi singkat dalam pipeline aliran data. Dalam contoh ini, anggota findReversedWords alur menggunakan PLINQ untuk memproses beberapa item dalam daftar kerja secara paralel. Penggunaan paralelisme butir halus dalam pemrosesan butir kasar dapat meningkatkan throughput keseluruhan.
Anda juga dapat menyambungkan blok aliran data sumber ke beberapa blok target untuk membuat jaringan aliran data . Versi metode LinkTo yang kelebihan beban mengambil objek Predicate<T> yang menentukan apakah blok target menerima setiap pesan berdasarkan nilainya. Sebagian besar jenis blok aliran data yang bertindak sebagai sumber menawarkan pesan ke semua blok target yang terhubung, dalam urutan di mana mereka terhubung, hingga salah satu blok menerima pesan tersebut. Dengan menggunakan mekanisme pemfilteran ini, Anda dapat membuat sistem blok aliran data yang terhubung yang mengarahkan data tertentu melalui satu jalur dan data lainnya melalui jalur lain. Untuk contoh yang menggunakan pemfilteran untuk membuat jaringan aliran data, lihat panduan : Menggunakan Aliran Data di Aplikasi Formulir Windows.