Sdílet prostřednictvím


Návod: Vytvoření kanálu toku dat

I když můžete použít DataflowBlock.Receivemetody , DataflowBlock.ReceiveAsynca DataflowBlock.TryReceive k příjmu zpráv ze zdrojových bloků, můžete také připojit bloky zpráv a vytvořit tak kanál toku dat. Kanál toku dat je řada komponent neboli bloků toku dat, z nichž každý provádí určitou úlohu, která přispívá k většímu cíli. Každý blok toku dat v kanálu toku dat provádí práci, když obdrží zprávu z jiného bloku toku dat. Obdobou je montážní linka pro výrobu automobilů. Když každé vozidlo projde montážní linkou, jedna stanice sestaví rám, další nainstaluje motor a tak dále. Vzhledem k tomu, že montážní linka umožňuje montáž více vozidel najednou, poskytuje lepší propustnost než montáž kompletních vozidel po jednom.

Tento dokument ukazuje kanál toku dat, který stáhne z webu knihu Homerův iliad a vyhledá text tak, aby jednotlivá slova odpovídala slovům, která převrací znaky prvního slova. Vytvoření kanálu toku dat v tomto dokumentu se skládá z následujících kroků:

  1. Vytvořte bloky toku dat, které se účastní kanálu.

  2. Připojte každý blok toku dat k dalšímu bloku v kanálu. Každý blok přijme jako vstup výstup předchozího bloku v kanálu.

  3. Pro každý blok toku dat vytvořte úlohu pokračování, která nastaví další blok do stavu dokončení po dokončení předchozího bloku.

  4. Publikujte data do vedoucího kanálu.

  5. Označte vedoucí kanál jako dokončený.

  6. Počkejte, až kanál dokončí veškerou práci.

Požadavky

Než začnete s tímto návodem, přečtěte si téma Tok dat.

Vytvoření konzolové aplikace

V sadě Visual Studio vytvořte projekt konzolové aplikace Visual C# nebo Visual Basic. Nainstalujte balíček NuGet System.Threading.Tasks.Dataflow.

Poznámka

Knihovna toků dat TPL (obor názvů) se s .NET nedistribuuje System.Threading.Tasks.Dataflow . Pokud chcete nainstalovat System.Threading.Tasks.Dataflow obor názvů v sadě Visual Studio, otevřete projekt, v nabídce Projekt zvolte Spravovat balíčky NuGet a vyhledejte System.Threading.Tasks.Dataflow balíček online. Pokud ho chcete nainstalovat pomocí rozhraní příkazového řádku .NET Core, spusťte příkaz dotnet add package System.Threading.Tasks.Dataflow.

Přidejte do projektu následující kód pro vytvoření základní aplikace.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Vytváření bloků toku dat

Do metody přidejte Main následující kód, který vytvoří bloky toku dat, které se účastní kanálu. Následující tabulka shrnuje role jednotlivých členů kanálu.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Člen Typ Description
downloadString TransformBlock<TInput,TOutput> Stáhne text knihy z webu.
createWordList TransformBlock<TInput,TOutput> Rozdělí text knihy na pole slov.
filterWordList TransformBlock<TInput,TOutput> Odebere krátká slova a duplikáty z pole slov.
findReversedWords TransformManyBlock<TInput,TOutput> Vyhledá všechna slova ve filtrované kolekci polí slov, jejichž obrácený text se vyskytuje také v poli slov.
printReversedWords ActionBlock<TInput> Zobrazí slova a odpovídající obrácená slova do konzoly.

I když byste v tomto příkladu mohli zkombinovat několik kroků v kanálu toku dat do jednoho kroku, tento příklad ilustruje koncept vytváření několika nezávislých úloh toku dat k provedení větší úlohy. V příkladu TransformBlock<TInput,TOutput> je možné každému členu kanálu povolit provedení operace se vstupními daty a odeslání výsledků do dalšího kroku v kanálu. Člen findReversedWords kanálu je objekt, TransformManyBlock<TInput,TOutput> protože pro každý vstup vytváří více nezávislých výstupů. Chvost kanálu je ActionBlock<TInput> objekt, printReversedWordsprotože provádí akci se svým vstupem a negeneruje výsledek.

Tvarování potrubí

Přidejte následující kód pro připojení každého bloku k dalšímu bloku v kanálu.

Když zavoláte metodu LinkTo pro připojení zdrojového bloku toku dat k cílovému bloku toku dat, zdrojový blok toku dat rozšíří data do cílového bloku, jakmile budou data k dispozici. Pokud také zadáte hodnotu DataflowLinkOptionsPropagateCompletion nastavenou na true, úspěšné nebo neúspěšné dokončení jednoho bloku v kanálu způsobí dokončení dalšího bloku v kanálu.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Publikování dat do kanálu

Přidejte následující kód, který publikuje adresu URL knihy The Iliad of Homer (Iliad of Homer ) do vedoucího kanálu toku dat.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

V tomto příkladu se data DataflowBlock.Post synchronně odesílají do vedoucího kanálu. Metodu použijte, DataflowBlock.SendAsync když musíte asynchronně odesílat data do uzlu toku dat.

Dokončení aktivity kanálu

Přidejte následující kód, který označí hlavní část kanálu jako dokončenou. Vedoucí kanálu šíří své dokončení po zpracování všech zpráv ve vyrovnávací paměti.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

Tento příklad odešle jednu adresu URL prostřednictvím kanálu toku dat, který se má zpracovat. Pokud prostřednictvím kanálu odesíláte více než jeden vstup, zavolejte metodu IDataflowBlock.Complete po odeslání veškerého vstupu. Tento krok můžete vynechat, pokud vaše aplikace nemá dobře definovaný bod, ve kterém by už data nebyla k dispozici, nebo aplikace nemusí čekat na dokončení kanálu.

Čekání na dokončení kanálu

Přidejte následující kód, který počká na dokončení kanálu. Celková operace je dokončena po dokončení chvostu potrubí.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

Můžete počkat na dokončení toku dat z libovolného vlákna nebo z více vláken najednou.

Kompletní příklad

Následující příklad ukazuje úplný kód pro tento názorný postup.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Další kroky

Tento příklad odešle jednu adresu URL ke zpracování prostřednictvím kanálu toku dat. Pokud prostřednictvím kanálu odešlete více než jednu vstupní hodnotu, můžete do aplikace zavést formu paralelismu, která se podobá přesouvání dílů v automobilové továrně. Když první člen kanálu odešle výsledek druhému členu, může paralelně zpracovat další položku, protože druhý člen zpracuje první výsledek.

Paralelismus, kterého se dosahuje pomocí kanálů toků dat, se označuje jako hrubě odstupňovaný paralelismus , protože se obvykle skládá z menšího počtu větších úloh. Můžete také použít jemněji odstupňovaný paralelismus menších, krátkodobě běžících úloh v kanálu toku dat. V tomto příkladu findReversedWords člen kanálu používá PLINQ ke zpracování více položek v pracovním seznamu paralelně. Použití jemně odstupňovaného paralelismu v hrubém kanálu může zlepšit celkovou propustnost.

Můžete také připojit zdrojový blok toku dat k více cílovým blokům a vytvořit tak síť toku dat. Přetížená verze LinkTo metody přebírá Predicate<T> objekt, který definuje, zda cílový blok přijímá každou zprávu na základě jeho hodnoty. Většina typů bloků toku dat, které fungují jako zdroje, nabízí zprávy všem připojeným cílovým blokům v pořadí, ve kterém byly připojené, dokud jeden z bloků tuto zprávu přijme. Pomocí tohoto mechanismu filtrování můžete vytvořit systémy propojených bloků toku dat, které směrují určitá data přes jednu cestu a jiná data přes jinou cestu. Příklad, který používá filtrování k vytvoření sítě toku dat, najdete v tématu Návod: Použití toku dat v aplikaci model Windows Forms.

Viz také