Dela via


Genomgång: Skapa en dataflödespipeline

Även om du kan använda DataflowBlock.Receivemetoderna , DataflowBlock.ReceiveAsyncoch DataflowBlock.TryReceive för att ta emot meddelanden från källblock, kan du också ansluta meddelandeblock för att skapa en dataflödespipeline. En dataflödespipeline är en serie komponenter eller dataflödesblock som vart och ett utför en specifik uppgift som bidrar till ett större mål. Varje dataflödesblock i en dataflödespipeline utför arbete när det tar emot ett meddelande från ett annat dataflödesblock. En analogi till detta är en monteringslinje för biltillverkning. När varje fordon passerar genom monteringslinjen monterar en station ramen, nästa installerar motorn och så vidare. Eftersom en monteringslinje gör att flera fordon kan monteras samtidigt ger den bättre dataflöde än att montera kompletta fordon en i taget.

Det här dokumentet visar en dataflödespipeline som laddar ned boken Homers Iliad från en webbplats och söker igenom texten för att matcha enskilda ord med ord som vänder det första ordets tecken. Bildandet av dataflödespipelinen i det här dokumentet består av följande steg:

  1. Skapa de dataflödesblock som ingår i pipelinen.

  2. Anslut varje dataflödesblock till nästa block i pipelinen. Varje block tar emot som indata från föregående block i pipelinen.

  3. För varje dataflödesblock skapar du en fortsättningsaktivitet som anger nästa block till slutfört tillstånd när föregående block har slutförts.

  4. Publicera data till chefen för pipelinen.

  5. Markera pipelinens huvud som slutfört.

  6. Vänta tills pipelinen har slutfört allt arbete.

Krav

Läs Dataflöde innan du påbörjar den här genomgången.

Skapa ett konsolprogram

Skapa ett Visual C# eller Visual Basic Console Application-projekt i Visual Studio. Installera NuGet-paketet System.Threading.Tasks.Dataflow.

Anteckning

TPL-dataflödesbiblioteket System.Threading.Tasks.Dataflow (namnområdet) distribueras inte med .NET. Om du vill installera System.Threading.Tasks.Dataflow namnområdet i Visual Studio öppnar du projektet, väljer Hantera NuGet-paketProject-menyn och söker online efter System.Threading.Tasks.Dataflow paketet. Du kan också installera den med .NET Core CLI genom att köra dotnet add package System.Threading.Tasks.Dataflow.

Lägg till följande kod i projektet för att skapa det grundläggande programmet.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Skapa dataflödesblocken

Lägg till följande kod i Main metoden för att skapa de dataflödesblock som ingår i pipelinen. Tabellen som följer sammanfattar rollen för varje medlem i pipelinen.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Medlem Typ Description
downloadString TransformBlock<TInput,TOutput> Laddar ned boktexten från webben.
createWordList TransformBlock<TInput,TOutput> Separerar boktexten i en matris med ord.
filterWordList TransformBlock<TInput,TOutput> Tar bort korta ord och dubbletter från ordmatrisen.
findReversedWords TransformManyBlock<TInput,TOutput> Söker efter alla ord i den filtrerade ordmatrissamlingen vars omvända också förekommer i ordmatrisen.
printReversedWords ActionBlock<TInput> Visar ord och motsvarande omvända ord i konsolen.

Även om du kan kombinera flera steg i dataflödespipelinen i det här exemplet i ett steg illustrerar exemplet begreppet att skapa flera oberoende dataflödesuppgifter för att utföra en större uppgift. Exemplet används TransformBlock<TInput,TOutput> för att göra det möjligt för varje medlem i pipelinen att utföra en åtgärd på indata och skicka resultaten till nästa steg i pipelinen. Medlemmen findReversedWords i pipelinen är ett TransformManyBlock<TInput,TOutput> objekt eftersom det producerar flera oberoende utdata för varje indata. Pipelinens slut, printReversedWords, är ett ActionBlock<TInput> objekt eftersom det utför en åtgärd på indata och inte ger något resultat.

Skapa pipelinen

Lägg till följande kod för att ansluta varje block till nästa block i pipelinen.

När du anropar LinkTo metoden för att ansluta ett källdataflödesblock till ett måldataflödesblock, sprids data till målblocket när data blir tillgängliga. Om du även anger DataflowLinkOptionsPropagateCompletion sant, lyckat eller misslyckat slutförande av ett block i pipelinen kommer nästa block i pipelinen att slutföras.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Publicera data i pipelinen

Lägg till följande kod för att publicera URL:en för boken Iliad för Homer till chefen för dataflödespipelinen.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

Det här exemplet använder DataflowBlock.Post för att synkront skicka data till pipelinens huvud. DataflowBlock.SendAsync Använd metoden när du asynkront måste skicka data till en dataflödesnod.

Slutföra pipelineaktivitet

Lägg till följande kod för att markera pipelinehuvudet som slutfört. Pipelinens huvud sprids när alla buffrade meddelanden har bearbetats.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

Det här exemplet skickar en URL via dataflödespipelinen som ska bearbetas. Om du skickar fler än en indata via en pipeline anropar IDataflowBlock.Complete du metoden när du har skickat alla indata. Du kan utelämna det här steget om ditt program inte har någon väldefinierad punkt där data inte längre är tillgängliga eller om programmet inte behöver vänta tills pipelinen har slutförts.

Väntar på att pipelinen ska slutföras

Lägg till följande kod för att vänta tills pipelinen har slutförts. Den övergripande åtgärden slutförs när pipelinens slut är klar.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

Du kan vänta på att dataflödet ska slutföras från valfri tråd eller från flera trådar samtidigt.

Det fullständiga exemplet

I följande exempel visas den fullständiga koden för den här genomgången.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Nästa steg

Det här exemplet skickar en URL för att bearbeta via dataflödespipelinen. Om du skickar mer än ett indatavärde via en pipeline kan du introducera en form av parallellitet i ditt program som liknar hur delar kan röra sig genom en bilfabrik. När den första medlemmen i pipelinen skickar resultatet till den andra medlemmen kan den bearbeta ett annat objekt parallellt när den andra medlemmen bearbetar det första resultatet.

Den parallellitet som uppnås med hjälp av dataflödespipelines kallas för grovkornig parallellitet eftersom den vanligtvis består av färre och större uppgifter. Du kan också använda en mer detaljerad parallellitet med mindre, kortsiktiga uppgifter i en dataflödespipeline. I det här exemplet findReversedWords använder pipelinemedlemmen PLINQ för att bearbeta flera objekt i arbetslistan parallellt. Användningen av detaljerad parallellitet i en grovkornig pipeline kan förbättra det totala dataflödet.

Du kan också ansluta ett källdataflödesblock till flera målblock för att skapa ett dataflödesnätverk. Den överlagrade versionen av LinkTo metoden tar ett Predicate<T> objekt som definierar om målblocket accepterar varje meddelande baserat på dess värde. De flesta typer av dataflödesblock som fungerar som källor erbjuder meddelanden till alla anslutna målblock, i den ordning de var anslutna, tills något av blocken accepterar det meddelandet. Med den här filtreringsmekanismen kan du skapa system med anslutna dataflödesblock som dirigerar vissa data via en sökväg och andra data via en annan sökväg. Ett exempel som använder filtrering för att skapa ett dataflödesnätverk finns i Genomgång: Använda dataflöde i ett Windows Forms program.

Se även