Condividi tramite


Procedura dettagliata: Creazione di una pipeline del flusso di dati

Sebbene sia possibile usare i DataflowBlock.Receivemetodi , DataflowBlock.ReceiveAsynce DataflowBlock.TryReceive per ricevere messaggi dai blocchi di origine, è anche possibile connettere blocchi di messaggi per formare una pipeline del flusso di dati. Una pipeline del flusso di dati è una serie di componenti o blocchi di flussi di dati, ognuno dei quali esegue un'attività specifica che contribuisce a un obiettivo più ampio. Ogni blocco di flussi di dati in una pipeline del flusso di dati esegue operazioni quando riceve un messaggio da un altro blocco di flussi di dati. Un'analogia con questa è una linea di montaggio per la produzione di automobili. Quando ogni veicolo passa attraverso la linea di montaggio, una stazione assembla il telaio, quello successivo installa il motore e così via. Poiché una linea di montaggio consente l'assemblaggio di più veicoli contemporaneamente, offre una velocità effettiva migliore rispetto all'assemblaggio di veicoli completi uno alla volta.

Questo documento illustra una pipeline del flusso di dati che scarica il libro Iliad di Homer da un sito Web e cerca il testo in modo che corrisponda a singole parole con parole che invertono i caratteri della prima parola. La formazione della pipeline del flusso di dati in questo documento è costituita dai passaggi seguenti:

  1. Creare i blocchi del flusso di dati che partecipano alla pipeline.

  2. Connettere ogni blocco del flusso di dati al blocco successivo nella pipeline. Ogni blocco riceve come input l'output del blocco precedente nella pipeline.

  3. Per ogni blocco di flussi di dati, creare un'attività di continuazione che imposta il blocco successivo sullo stato completato al termine del blocco precedente.

  4. Inserire i dati all'inizio della pipeline.

  5. Contrassegnare la testa della pipeline come completata.

  6. Attendere che la pipeline completi tutto il lavoro.

Prerequisiti

Leggere Flusso di dati prima di iniziare questa procedura dettagliata.

Creazione di un'applicazione console

In Visual Studio, creare un progetto di applicazione console Visual C# o Visual Basic. Installare il pacchetto NuGet System.Threading.Tasks.Dataflow.

Annotazioni

La libreria del flusso di dati TPL (spazio dei nomi System.Threading.Tasks.Dataflow) non viene distribuita con .NET. Per installare lo spazio dei nomi System.Threading.Tasks.Dataflow in Visual Studio, aprire il progetto, scegliere Gestisci pacchetti NuGet dal menu Project e cercare online il pacchetto System.Threading.Tasks.Dataflow. In alternativa, per installarlo usando l'interfaccia della riga di comando di .NET Core, eseguire dotnet add package System.Threading.Tasks.Dataflow.

Aggiungere il codice seguente al progetto per creare l'applicazione di base.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Creazione dei blocchi del flusso di dati

Aggiungere il codice seguente al Main metodo per creare i blocchi del flusso di dati che partecipano alla pipeline. La tabella seguente riepiloga il ruolo di ogni membro della pipeline.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine($"Downloading '{uri}'...");

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Membro TIPO Descrizione
downloadString TransformBlock<TInput,TOutput> Scarica il testo del libro dal Web.
createWordList TransformBlock<TInput,TOutput> Separa il testo del libro in una matrice di parole.
filterWordList TransformBlock<TInput,TOutput> Rimuove parole brevi e duplicati dalla matrice di parole.
findReversedWords TransformManyBlock<TInput,TOutput> Trova tutte le parole nella raccolta di matrici di parole filtrate il cui inverso si verifica anche nella matrice di parole.
printReversedWords ActionBlock<TInput> Visualizza le parole e le parole inverse corrispondenti nella console.

Sebbene sia possibile combinare più passaggi nella pipeline del flusso di dati in questo esempio in un unico passaggio, l'esempio illustra il concetto di composizione di più attività del flusso di dati indipendenti per eseguire un'attività più grande. L'esempio usa TransformBlock<TInput,TOutput> per consentire a ogni membro della pipeline di eseguire un'operazione sui dati di input e inviare i risultati al passaggio successivo della pipeline. Il findReversedWords membro della pipeline è un TransformManyBlock<TInput,TOutput> oggetto perché produce più output indipendenti per ogni input. La parte finale della pipeline, printReversedWords, è un ActionBlock<TInput> oggetto perché esegue un'azione sul relativo input e non produce un risultato.

Creazione della pipeline

Aggiungere il codice seguente per connettere ogni blocco al blocco successivo nella pipeline.

Quando si chiama il LinkTo metodo per connettere un blocco del flusso di dati di origine a un blocco di flussi di dati di destinazione, il blocco del flusso di dati di origine propaga i dati al blocco di destinazione man mano che i dati diventano disponibili. Se si fornisce anche DataflowLinkOptions con PropagateCompletion impostato su true, il completamento riuscito o non riuscito di un blocco nella pipeline completerà il blocco successivo nella pipeline.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Inserimento di dati nella pipeline

Aggiungere il codice seguente per pubblicare l'URL del libro The Iliad of Homer all'inizio della pipeline del flusso di dati.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

Questo esempio utilizza DataflowBlock.Post per inviare dati in modo sincrono all'inizio della pipeline. Usare il DataflowBlock.SendAsync metodo quando è necessario inviare dati in modo asincrono a un nodo del flusso di dati.

Completamento dell'attività della pipeline

Aggiungi il seguente codice per contrassegnare la testata della pipeline come completata. Il capo della pipeline propaga il suo completamento dopo aver elaborato tutti i messaggi memorizzati nel buffer.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

Questo esempio invia un URL tramite la pipeline del flusso di dati da elaborare. Se si inviano più input tramite una pipeline, chiamare il IDataflowBlock.Complete metodo dopo aver inviato tutto l'input. È possibile omettere questo passaggio se l'applicazione non ha un punto ben definito in cui i dati non sono più disponibili o l'applicazione non deve attendere il completamento della pipeline.

In attesa del completamento della pipeline

Aggiungere il codice seguente per attendere il completamento della pipeline. L'operazione complessiva viene completata quando la coda della pipeline termina.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

È possibile attendere il completamento del flusso di dati da qualsiasi thread o da più thread contemporaneamente.

Esempio completo

Nell'esempio seguente viene illustrato il codice completo per questa procedura dettagliata.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine($"Downloading '{uri}'...");

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Passaggi successivi

Questo esempio invia un URL da elaborare tramite la pipeline del flusso di dati. Se si inviano più valori di input tramite una pipeline, è possibile introdurre una forma di parallelismo nell'applicazione che assomiglia al modo in cui le parti potrebbero spostarsi in una fabbrica di automobili. Quando il primo membro della pipeline invia il risultato al secondo membro, può elaborare un altro elemento in parallelo mentre il secondo membro elabora il primo risultato.

Il parallelismo ottenuto usando le pipeline del flusso di dati è noto come parallelismo con granularità grossolana perché in genere è costituito da meno attività più grandi. È possibile anche utilizzare un parallelismo con granularità più fine per compiti più piccoli e a esecuzione breve all'interno di una pipeline del flusso di dati. In questo esempio il findReversedWords membro della pipeline usa PLINQ per elaborare più elementi nell'elenco di lavoro in parallelo. L'uso del parallelismo con granularità fine in una pipeline con granularità grossolana può migliorare la velocità effettiva complessiva.

È anche possibile connettere un blocco di flussi di dati di origine a più blocchi di destinazione per creare una rete di flussi di dati. La versione sovraccarica del metodo LinkTo prende un oggetto Predicate<T> che definisce se il blocco di destinazione accetta ogni messaggio in base al suo valore. La maggior parte dei tipi di blocchi di flussi di dati che fungono da origini offrono messaggi a tutti i blocchi di destinazione connessi, nell'ordine in cui sono stati connessi, fino a quando uno dei blocchi accetta tale messaggio. Usando questo meccanismo di filtro, è possibile creare sistemi di blocchi di flussi di dati connessi che indirizzano determinati dati attraverso un percorso e altri dati attraverso un altro percorso. Per un esempio che usa il filtro per creare una rete di flussi di dati, vedere Procedura dettagliata: Uso del flusso di dati in un'applicazione Windows Form.

Vedere anche