Bagikan melalui


Panduan: Membuat Alur Aliran Data

Meski Anda dapat menggunakan metode DataflowBlock.Receive, DataflowBlock.ReceiveAsync, dan DataflowBlock.TryReceive untuk menerima pesan dari blok sumber, Anda juga dapat menyambungkan blok pesan untuk membentuk alur aliran data. Alur aliran data adalah serangkaian komponen, atau blok aliran data, yang masing-masing melakukan tugas tertentu yang berkontribusi pada tujuan yang lebih besar. Setiap blok aliran data dalam alur aliran data melakukan pekerjaan saat menerima pesan dari blok aliran data lain. Analogi untuk ini adalah jalur assembly untuk manufaktur mobil. Saat setiap kendaraan melewati jalur assembly, satu stasiun merakit rangka, yang berikutnya memasang mesin, dan sebagainya. Karena jalur assembly memungkinkan beberapa kendaraan untuk dirakit pada saat yang sama, jalur ini memberikan throughput yang lebih baik daripada merakit kendaraan lengkap satu per satu.

Dokumen ini menunjukkan alur aliran data yang mengunduh buku The Iliad of Homer dari situs web dan mencari teks untuk mencocokkan kata individual dengan kata-kata yang membalikkan karakter kata pertama. Pembentukan alur aliran data dalam dokumen ini terdiri dari langkah-langkah berikut:

  1. Buat blok aliran data yang berpartisipasi dalam alur.

  2. Sambungkan setiap blok aliran data ke blok berikutnya dalam alur. Setiap blok menerima sebagai input output dari blok sebelumnya dalam alur.

  3. Untuk setiap blok aliran data, buat tugas kelanjutan yang mengatur blok berikutnya ke status selesai setelah blok sebelumnya selesai.

  4. Kirim data ke kepala alur.

  5. Tandai kepala alur sebagai selesai.

  6. Tunggu hingga alur menyelesaikan semua pekerjaan.

Prasyarat

Baca Aliran Data sebelum Anda memulai panduan ini.

Membuat Aplikasi Konsol

Di Visual Studio, buat Visual C# atau proyek Visual Basic Console Application. Instal paket System.Threading.Tasks.Dataflow NuGet.

Catatan

Pustaka Aliran Data TPL (namespace layanan System.Threading.Tasks.Dataflow) tidak didistribusikan dengan .NET. Untuk menginstal namespace layanan System.Threading.Tasks.Dataflow di Visual Studio, buka proyek, pilih Kelola Paket NuGet dari menu Proyek, dan cari paket System.Threading.Tasks.Dataflow secara online. Atau, untuk menginstalnya menggunakan .NET Core CLI, jalankan dotnet add package System.Threading.Tasks.Dataflow.

Tambahkan kode berikut ke proyek Anda untuk membuat aplikasi dasar.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Membuat Blok Aliran Data

Tambahkan kode berikut ke metode Main untuk membuat blok aliran data yang berpartisipasi dalam alur. Tabel berikut ini meringkas peran setiap anggota alur.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Anggota Jenis Deskripsi
downloadString TransformBlock<TInput,TOutput> Mengunduh teks buku dari Web.
createWordList TransformBlock<TInput,TOutput> Memisahkan teks buku menjadi array kata- kata.
filterWordList TransformBlock<TInput,TOutput> Menghapus kata-kata pendek dan duplikat dari array kata.
findReversedWords TransformManyBlock<TInput,TOutput> Menemukan semua kata dalam kumpulan array kata yang difilter yang kebalikannya juga terjadi dalam array kata.
printReversedWords ActionBlock<TInput> Menampilkan kata-kata dan kata terbalik yang sesuai ke konsol.

Meski Anda dapat menggabungkan beberapa langkah dalam alur aliran data dalam contoh ini menjadi satu langkah, contoh tersebut menggambarkan konsep menyusun beberapa tugas aliran data independen untuk melakukan tugas yang lebih besar. Contohnya menggunakan TransformBlock<TInput,TOutput> untuk memungkinkan setiap anggota alur melakukan operasi pada data inputnya dan mengirim hasilnya ke langkah berikutnya dalam alur. Anggota findReversedWords alur adalah TransformManyBlock<TInput,TOutput> objek karena menghasilkan beberapa output independen untuk setiap input. Ekor alur, printReversedWords, adalah ActionBlock<TInput> objek karena melakukan tindakan pada inputnya, dan tidak menghasilkan hasil.

Membentuk Alur

Tambahkan kode berikut untuk menyambungkan setiap blok ke blok berikutnya dalam alur.

Saat Anda memanggil metode LinkTo untuk menyambungkan blok aliran data sumber ke blok aliran data target, blok aliran data sumber menyebarkan data ke blok target saat data tersedia. Jika Anda juga menyediakan DataflowLinkOptions dengan PropagateCompletion diatur ke true, berhasil, atau gagal menyelesaikan satu blok dalam alur akan menyebabkan penyelesaian blok berikutnya dalam alur.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Mengirim Data ke Alur

Tambahkan kode berikut untuk memposting URL buku The Iliad of Homer ke kepala alur aliran data.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

Contoh ini menggunakan DataflowBlock.Post untuk mengirim data secara sinkron ke kepala alur. Gunakan metode DataflowBlock.SendAsync saat Anda harus mengirim data secara asinkron ke node aliran data.

Menyelesaikan Aktivitas Alur

Tambahkan kode berikut untuk menandai kepala alur sebagai selesai. Kepala alur menyebarluaskan penyelesaiannya setelah memproses semua pesan yang di-buffer.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

Contoh ini mengirimkan satu URL melalui alur aliran data yang akan diproses. Jika Anda mengirim lebih dari satu input melalui alur, panggil metode IDataflowBlock.Complete setelah Anda mengirimkan semua input. Anda dapat menghilangkan langkah ini jika aplikasi Anda tidak memiliki titik yang ditentukan dengan baik di mana data tidak lagi tersedia atau aplikasi tidak perlu menunggu alur selesai.

Menunggu Alur Selesai

Tambahkan kode berikut untuk menunggu alur selesai. Operasi keseluruhan selesai saat ekor alur selesai.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

Anda dapat menunggu penyelesaian aliran data dari utas apa pun atau dari beberapa utas secara bersamaan.

Contoh Lengkap

Contoh berikut menunjukkan kode lengkap untuk panduan ini.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Langkah berikutnya

Contoh ini mengirimkan satu URL proses melalui alur aliran data. Jika Anda mengirim lebih dari satu nilai input melalui alur, Anda dapat memperkenalkan bentuk paralelisme ke dalam aplikasi Anda yang menyerupai bagaimana bagian mungkin bergerak melalui pabrik mobil. Saat anggota pertama alur mengirimkan hasilnya ke anggota kedua, itu dapat memproses item lain secara paralel saat anggota kedua memproses hasil pertama.

Paralelisme yang dicapai dengan menggunakan alur aliran data dikenal sebagai paralelisme kasar karena biasanya terdiri dari tugas yang lebih sedikit dan lebih besar. Anda juga dapat menggunakan paralelisme yang lebih halus dari tugas yang lebih kecil dan berjalan pendek dalam alur aliran data. Dalam contoh ini, findReversedWords anggota alur menggunakan PLINQ untuk memproses beberapa item dalam daftar kerja secara paralel. Penggunaan paralelisme halus dalam alur kasar dapat meningkatkan throughput keseluruhan.

Anda juga dapat menyambungkan blok aliran data sumber ke beberapa blok target untuk membuat jaringan aliran data. Versi metode LinkTo yang overload mengambil Predicate<T> objek yang menentukan apakah blok target menerima setiap pesan berdasarkan nilainya. Sebagian besar jenis blok aliran data yang bertindak sebagai sumber menawarkan pesan ke semua blok target yang terhubung, dalam urutan di mana mereka tersambung, hingga salah satu blok menerima pesan tersebut. Dengan menggunakan mekanisme pemfilteran ini, Anda dapat membuat sistem blok aliran data yang terhubung yang mengarahkan data tertentu melalui satu jalur dan data lainnya melalui jalur lain. Untuk contoh yang membuat jaringan aliran data yang lebih kompleks, lihat Panduan: Menggunakan Aliran Data dalam Aplikasi Formulir Windows.

Lihat juga