Panduan: Membuat Alur Aliran Data
Meski Anda dapat menggunakan metode DataflowBlock.Receive, DataflowBlock.ReceiveAsync, dan DataflowBlock.TryReceive untuk menerima pesan dari blok sumber, Anda juga dapat menyambungkan blok pesan untuk membentuk alur aliran data. Alur aliran data adalah serangkaian komponen, atau blok aliran data, yang masing-masing melakukan tugas tertentu yang berkontribusi pada tujuan yang lebih besar. Setiap blok aliran data dalam alur aliran data melakukan pekerjaan saat menerima pesan dari blok aliran data lain. Analogi untuk ini adalah jalur assembly untuk manufaktur mobil. Saat setiap kendaraan melewati jalur assembly, satu stasiun merakit rangka, yang berikutnya memasang mesin, dan sebagainya. Karena jalur assembly memungkinkan beberapa kendaraan untuk dirakit pada saat yang sama, jalur ini memberikan throughput yang lebih baik daripada merakit kendaraan lengkap satu per satu.
Dokumen ini menunjukkan alur aliran data yang mengunduh buku The Iliad of Homer dari situs web dan mencari teks untuk mencocokkan kata individual dengan kata-kata yang membalikkan karakter kata pertama. Pembentukan alur aliran data dalam dokumen ini terdiri dari langkah-langkah berikut:
Buat blok aliran data yang berpartisipasi dalam alur.
Sambungkan setiap blok aliran data ke blok berikutnya dalam alur. Setiap blok menerima sebagai input output dari blok sebelumnya dalam alur.
Untuk setiap blok aliran data, buat tugas kelanjutan yang mengatur blok berikutnya ke status selesai setelah blok sebelumnya selesai.
Kirim data ke kepala alur.
Tandai kepala alur sebagai selesai.
Tunggu hingga alur menyelesaikan semua pekerjaan.
Prasyarat
Baca Aliran Data sebelum Anda memulai panduan ini.
Membuat Aplikasi Konsol
Di Visual Studio, buat Visual C# atau proyek Visual Basic Console Application. Instal paket System.Threading.Tasks.Dataflow NuGet.
Catatan
Pustaka Aliran Data TPL (namespace layanan System.Threading.Tasks.Dataflow) tidak didistribusikan dengan .NET. Untuk menginstal namespace layanan System.Threading.Tasks.Dataflow di Visual Studio, buka proyek, pilih Kelola Paket NuGet dari menu Proyek, dan cari paket System.Threading.Tasks.Dataflow
secara online. Atau, untuk menginstalnya menggunakan .NET Core CLI, jalankan dotnet add package System.Threading.Tasks.Dataflow
.
Tambahkan kode berikut ke proyek Anda untuk membuat aplikasi dasar.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
static void Main()
{
}
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
End Sub
End Module
Membuat Blok Aliran Data
Tambahkan kode berikut ke metode Main
untuk membuat blok aliran data yang berpartisipasi dalam alur. Tabel berikut ini meringkas peran setiap anggota alur.
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine("Found reversed words {0}/{1}",
reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
Anggota | Jenis | Deskripsi |
---|---|---|
downloadString |
TransformBlock<TInput,TOutput> | Mengunduh teks buku dari Web. |
createWordList |
TransformBlock<TInput,TOutput> | Memisahkan teks buku menjadi array kata- kata. |
filterWordList |
TransformBlock<TInput,TOutput> | Menghapus kata-kata pendek dan duplikat dari array kata. |
findReversedWords |
TransformManyBlock<TInput,TOutput> | Menemukan semua kata dalam kumpulan array kata yang difilter yang kebalikannya juga terjadi dalam array kata. |
printReversedWords |
ActionBlock<TInput> | Menampilkan kata-kata dan kata terbalik yang sesuai ke konsol. |
Meski Anda dapat menggabungkan beberapa langkah dalam alur aliran data dalam contoh ini menjadi satu langkah, contoh tersebut menggambarkan konsep menyusun beberapa tugas aliran data independen untuk melakukan tugas yang lebih besar. Contohnya menggunakan TransformBlock<TInput,TOutput> untuk memungkinkan setiap anggota alur melakukan operasi pada data inputnya dan mengirim hasilnya ke langkah berikutnya dalam alur. Anggota findReversedWords
alur adalah TransformManyBlock<TInput,TOutput> objek karena menghasilkan beberapa output independen untuk setiap input. Ekor alur, printReversedWords
, adalah ActionBlock<TInput> objek karena melakukan tindakan pada inputnya, dan tidak menghasilkan hasil.
Membentuk Alur
Tambahkan kode berikut untuk menyambungkan setiap blok ke blok berikutnya dalam alur.
Saat Anda memanggil metode LinkTo untuk menyambungkan blok aliran data sumber ke blok aliran data target, blok aliran data sumber menyebarkan data ke blok target saat data tersedia. Jika Anda juga menyediakan DataflowLinkOptions dengan PropagateCompletion diatur ke true, berhasil, atau gagal menyelesaikan satu blok dalam alur akan menyebabkan penyelesaian blok berikutnya dalam alur.
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
Mengirim Data ke Alur
Tambahkan kode berikut untuk memposting URL buku The Iliad of Homer ke kepala alur aliran data.
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
Contoh ini menggunakan DataflowBlock.Post untuk mengirim data secara sinkron ke kepala alur. Gunakan metode DataflowBlock.SendAsync saat Anda harus mengirim data secara asinkron ke node aliran data.
Menyelesaikan Aktivitas Alur
Tambahkan kode berikut untuk menandai kepala alur sebagai selesai. Kepala alur menyebarluaskan penyelesaiannya setelah memproses semua pesan yang di-buffer.
// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()
Contoh ini mengirimkan satu URL melalui alur aliran data yang akan diproses. Jika Anda mengirim lebih dari satu input melalui alur, panggil metode IDataflowBlock.Complete setelah Anda mengirimkan semua input. Anda dapat menghilangkan langkah ini jika aplikasi Anda tidak memiliki titik yang ditentukan dengan baik di mana data tidak lagi tersedia atau aplikasi tidak perlu menunggu alur selesai.
Menunggu Alur Selesai
Tambahkan kode berikut untuk menunggu alur selesai. Operasi keseluruhan selesai saat ekor alur selesai.
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
Anda dapat menunggu penyelesaian aliran data dari utas apa pun atau dari beberapa utas secara bersamaan.
Contoh Lengkap
Contoh berikut menunjukkan kode lengkap untuk panduan ini.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
static void Main()
{
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine("Found reversed words {0}/{1}",
reversedWord, new string(reversedWord.Reverse().ToArray()));
});
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
// Mark the head of the pipeline as complete.
downloadString.Complete();
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
}
}
/* Sample output:
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words doom/mood
Found reversed words draw/ward
Found reversed words aera/area
Found reversed words seat/taes
Found reversed words live/evil
Found reversed words port/trop
Found reversed words sleek/keels
Found reversed words area/aera
Found reversed words tops/spot
Found reversed words evil/live
Found reversed words mood/doom
Found reversed words speed/deeps
Found reversed words moor/room
Found reversed words trop/port
Found reversed words spot/tops
Found reversed words spots/stops
Found reversed words stops/spots
Found reversed words reed/deer
Found reversed words keels/sleek
Found reversed words deeps/speed
Found reversed words deer/reed
Found reversed words taes/seat
Found reversed words room/moor
Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
' Mark the head of the pipeline as complete.
downloadString.Complete()
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
End Sub
End Module
' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw
Langkah berikutnya
Contoh ini mengirimkan satu URL proses melalui alur aliran data. Jika Anda mengirim lebih dari satu nilai input melalui alur, Anda dapat memperkenalkan bentuk paralelisme ke dalam aplikasi Anda yang menyerupai bagaimana bagian mungkin bergerak melalui pabrik mobil. Saat anggota pertama alur mengirimkan hasilnya ke anggota kedua, itu dapat memproses item lain secara paralel saat anggota kedua memproses hasil pertama.
Paralelisme yang dicapai dengan menggunakan alur aliran data dikenal sebagai paralelisme kasar karena biasanya terdiri dari tugas yang lebih sedikit dan lebih besar. Anda juga dapat menggunakan paralelisme yang lebih halus dari tugas yang lebih kecil dan berjalan pendek dalam alur aliran data. Dalam contoh ini, findReversedWords
anggota alur menggunakan PLINQ untuk memproses beberapa item dalam daftar kerja secara paralel. Penggunaan paralelisme halus dalam alur kasar dapat meningkatkan throughput keseluruhan.
Anda juga dapat menyambungkan blok aliran data sumber ke beberapa blok target untuk membuat jaringan aliran data. Versi metode LinkTo yang overload mengambil Predicate<T> objek yang menentukan apakah blok target menerima setiap pesan berdasarkan nilainya. Sebagian besar jenis blok aliran data yang bertindak sebagai sumber menawarkan pesan ke semua blok target yang terhubung, dalam urutan di mana mereka tersambung, hingga salah satu blok menerima pesan tersebut. Dengan menggunakan mekanisme pemfilteran ini, Anda dapat membuat sistem blok aliran data yang terhubung yang mengarahkan data tertentu melalui satu jalur dan data lainnya melalui jalur lain. Untuk contoh yang membuat jaringan aliran data yang lebih kompleks, lihat Panduan: Menggunakan Aliran Data dalam Aplikasi Formulir Windows.
Lihat juga
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk