Freigeben über


Exemplarische Vorgehensweise: Erstellen einer Datenflusspipeline

Obwohl Sie die Methoden DataflowBlock.Receive, DataflowBlock.ReceiveAsync und DataflowBlock.TryReceive verwenden können, um Nachrichten von Quellblöcken zu empfangen, können Sie auch Nachrichtenblöcke verbinden, um eine Datenflusspipeline zu bilden. Eine Datenpipeline besteht aus einer Reihe von Komponenten, oder Datenflussblöcken, von denen jede eine bestimmte Aufgabe ausführt, die zu einem größeren Ziel beiträgt. Jeder Datenflussblock in einer Datenflusspipeline erledigt eine Aufgabe, wenn er eine Meldung von einem anderen Datenflussblock empfangen hat. Eine Analogie hierzu ist eine Fertigungsstraße eines Fahrzeugherstellers. Jedes Fahrzeug durchläuft die Fertigungsstraße: In einer Station wird das Fahrgestell montiert, in der nächsten wird der Motor eingebaut usw. Da eine Fertigungsstraße ermöglicht, dass mehrere Fahrzeuge gleichzeitig montiert werden, bietet sie einen besseren Durchsatz als eine Einzelmontage eines vollständigen Fahrzeugs.

Dieses Dokument veranschaulicht eine Datenflusspipeline, die das Buch The Iliad of Homer von einer Website herunterlädt und den Text durchsucht, um einzelne Wörter mit Wörtern abzugleichen, in denen die Zeichen des ersten Worts umgekehrt sind. Die Entwicklung der Datenflusspipeline in diesem Dokument besteht aus den folgenden Schritten:

  1. Erstellen Sie die Datenflussblöcke, die Bestandteile der Pipeline sein sollen.

  2. Verbinden Sie jeden Datenflussblock mit dem nächsten Block in der Pipeline. Jeder Block empfängt als Eingabe die Ausgabe des jeweils vorherigen Blocks in der Pipeline.

  3. Erstellen Sie für jeden Datenflussblock eine Fortsetzungsaufgabe, die den nächsten Datenblock in den Zustand Abgeschlossen schaltet, nachdem der vorherige Block beendet wurde.

  4. Senden Sie Daten an den Anfang der Pipeline.

  5. Kennzeichnen Sie den Anfang der Pipeline als abgeschlossen.

  6. Warten Sie, bis die Pipeline alle Aufgaben ausgeführt hat.

Voraussetzungen

Lesen Sie das Thema Datenfluss, bevor Sie mit dieser exemplarischen Vorgehensweise beginnen.

Erstellen einer Konsolenanwendung

Erstellen Sie in Visual Studio ein Visual C#- oder Visual Basic-Projekt des Typs „Konsolenanwendung“. Installieren Sie das NuGet-Paket „System.Threading.Tasks.Dataflow“.

Hinweis

Die TPL-Datenflussbibliothek (System.Threading.Tasks.Dataflow-Namespace) wird nicht mit .NET ausgeliefert. Öffnen Sie zum Installieren des System.Threading.Tasks.Dataflow-Namespace in Visual Studio Ihr Projekt, wählen Sie im Menü Projekt die Option NuGet-Pakete verwalten aus, und suchen Sie online nach dem System.Threading.Tasks.Dataflow-Paket. Alternativ können Sie es mithilfe der .NET Core-CLI installieren und dazu dotnet add package System.Threading.Tasks.Dataflow ausführen.

Fügen Sie Ihrem Projekt den folgenden Code hinzu , um die Grundanwendung zu erstellen.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Erstellen der Datenflussblöcke

Fügen Sie der Main-Methode den folgenden Code hinzu, um die Datenflussblöcke zu erstellen, die Bestandteile der Pipeline sein sollen. Die folgende Tabelle enthält eine Übersicht über die Rollen, die die Elemente (Member) der Pipeline haben.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Member Typ Beschreibung
downloadString TransformBlock<TInput,TOutput> Lädt den Text des Buchs aus dem Internet herunter.
createWordList TransformBlock<TInput,TOutput> Trennt den Text des Buchs in ein Array aus einzelnen Wörtern.
filterWordList TransformBlock<TInput,TOutput> Entfernt kurze Wörter und Duplikate aus dem Wortarray.
findReversedWords TransformManyBlock<TInput,TOutput> Sucht in der gefilterten Wortarrayauflistung nach allen Wörtern, deren Umkehrung ebenfalls im Wortarray enthalten ist.
printReversedWords ActionBlock<TInput> Zeigt die jeweiligen Wörter und die entsprechenden Umkehrwörter in der Konsole an.

Obwohl Sie mehrere Schritte in der Datenflusspipeline in diesem Beispiel in einem einzigen Schritt kombinieren könnten, veranschaulicht dieses Beispiel das Konzept des Zusammenstellens mehrerer unabhängiger Datenflussaufgaben, um eine größere Aufgabe auszuführen. In dem Beispiel wird TransformBlock<TInput,TOutput> verwendet, damit jedes Element der Pipeline einen Vorgang für seine Eingabedaten ausführen und die Ergebnisse an den nächsten Schritt in der Pipeline senden kann. Das findReversedWords-Element der Pipeline ist ein TransformManyBlock<TInput,TOutput>-Objekt, da es für jede Eingabe mehrere unabhängige Ausgaben erzeugt. Das Ende der Pipeline, printReversedWords, ist ein ActionBlock<TInput>-Objekt, weil es eine Aktion für seine Eingabe ausführt und kein Ergebnis erzeugt.

Aufbauen der Pipeline

Fügen Sie den folgenden Code ein, um jeden Block mit dem nächsten Block in der Pipeline zu verbinden.

Wenn Sie die LinkTo-Methode aufrufen, um einen Quelldatenflussblock mit einem Zieldatenflussblock zu verbinden, überträgt der Quelldatenflussblock Daten an den Zielblock, sobald Daten verfügbar sind. Wenn Sie auch DataflowLinkOptions mit PropagateCompletion (Wert auf „true“ gesetzt) angeben, führt ein erfolgreicher oder nicht erfolgreicher Abschluss eines Blocks in der Pipeline dazu, dass der nächste Block in der Pipeline abgeschlossen wird.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Senden von Daten an die Pipeline

Fügen Sie folgenden Code hinzu, um die URL des Buchs The Iliad of Homer an den Anfang der Datenflusspipeline zu senden.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

In diesem Beispiel wird DataflowBlock.Post verwendet, damit die Daten synchron an den Anfang der Pipeline gesendet werden. Verwenden Sie die DataflowBlock.SendAsync-Methode, wenn Sie Daten asynchron an einen Datenflussknoten senden müssen.

Abschließen der Pipelineaktivität

Fügen Sie den folgenden Code ein, um den Anfang der Pipeline als abgeschlossen zu kennzeichnen. Der Anfang der Pipeline überträgt seinen Abschluss, nachdem er alle gepufferten Nachrichten verarbeitet hat.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

In diesem Beispiel wird eine URL durch die Datenflusspipeline gesendet, um verarbeitet zu werden. Wenn Sie mehrere Eingaben durch eine Pipeline senden, rufen Sie die IDataflowBlock.Complete-Methode auf, nachdem Sie alle Eingaben übertragen haben. Sie können auf diesen Schritt verzichten, wenn Ihre Anwendung keinen wohldefinierten Punkt hat, an dem keine Daten mehr verfügbar sind, oder wenn die Anwendung nicht warten muss, bis die Pipeline beendet ist.

Warten bis zum Beenden der Pipeline

Fügen Sie den folgenden Code hinzu, damit gewartet wird, bis die Pipeline beendet ist. Der gesamte Vorgang ist abgeschlossen, wenn das Ende der Pipeline beendet ist.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

Sie können von jedem Thread oder von mehreren Threads gleichzeitig auf den Datenflussabschluss warten.

Vollständiges Beispiel

Das folgende Beispiel enthält den vollständigen Code für diese exemplarische Vorgehensweise.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Nächste Schritte

In diesem Beispiel wird eine zu verarbeitende URL durch die Datenflusspipeline gesendet. Wenn Sie mehrere Eingabewerte durch eine Pipeline senden, können Sie eine Form von Parallelität in Ihrer Anwendung implementieren, die der Art und Weise ähnelt, wie Teile durch eine Automobilfabrik bewegt werden. Wenn das erste Element der Pipeline sein Ergebnis an das zweite Element gesendet hat, kann es parallel ein weiteres Objekt verarbeiten, während das zweite Element das erste Ergebnis verarbeitet.

Die Parallelität, die durch ein Verwenden von Datenflusspipelines erreicht wird, wird als grobmaschige Parallelität bezeichnet, weil sie in der Regel aus weniger, dafür aber größeren Aufgaben besteht. Sie können auch eine feinmaschigere Parallelität von kleineren Aufgaben mit kurzer Ausführungszeit in einer Datenflusspipeline verwenden. In diesem Beispiel wird für das findReversedWords-Element der Pipeline PLINQ verwendet, um mehrere Elemente in der Arbeitsliste parallel zu verarbeiten. Die Verwendung von feinmaschiger Parallelität in einer grobmaschigen Pipeline kann den Gesamtdurchsatz verbessern.

Sie können auch einen Quelldatenflussblock mit mehreren Zielblöcken verbinden, um ein Datenflussnetzwerk zu erstellen. Die überladene Version der LinkTo-Methode übernimmt ein Predicate<T>-Objekt, das definiert, ob der Zielblock jede Nachricht anhand dessen Wert akzeptiert. Die meisten Datenflussblocktypen, die als Quellen fungieren, senden Nachrichten an alle verbundenen Zielblöcke in der Reihenfolge, in der sie verbunden wurden, bis einer der Blöcke die jeweilige Nachricht akzeptiert hat. Durch Verwenden dieser Filtermechanismus können Sie Systeme von verbundenen Datenflussblöcken erstellen, mit denen bestimmte Daten durch einen Pfad und andere Daten durch einen anderen Pfad geleitet werden. Ein Beispiel, in dem Filterung zum Erstellen eines Datenflussnetzwerks verwendet wird, finden Sie unter Exemplarische Vorgehensweise: Using Dataflow in a Windows Forms Application (Exemplarische Vorgehensweise: Verwenden von Datenflüssen in einer Windows Forms-Anwendung).

Siehe auch