Partekatu bidez


Tutorial: Creación de una canalización de flujo de datos

Aunque puede usar los DataflowBlock.Receivemétodos , DataflowBlock.ReceiveAsyncy DataflowBlock.TryReceive para recibir mensajes de bloques de origen, también puede conectar bloques de mensajes para formar una canalización de flujo de datos. Una canalización de flujo de datos es una serie de componentes o bloques de flujo de datos, cada uno de los cuales realiza una tarea específica que contribuye a un objetivo mayor. Cada bloque de flujo de datos de una canalización de flujo de datos funciona cuando recibe un mensaje de otro bloque de flujo de datos. Se podría establecer una analogía de esto con una cadena de montaje en la fabricación de automóviles. A medida que cada vehículo pasa por la línea de montaje, una estación ensambla el marco, la siguiente instala el motor, etc. Dado que una línea de montaje permite ensamblar varios vehículos al mismo tiempo, proporciona un mejor rendimiento que ensamblar vehículos completos de uno en uno.

En este documento se muestra una canalización de flujo de datos que descarga el libro The Iliad of Homer desde un sitio web y busca el texto para que coincida con palabras individuales con palabras que invierten los caracteres de la primera palabra. La formación de la canalización de flujo de datos en este documento consta de los pasos siguientes:

  1. Cree los bloques de flujo de datos que participan en la canalización.

  2. Conecte cada bloque de flujo de datos al siguiente bloque de la canalización. Cada bloque recibe como entrada la salida del bloque anterior en la canalización.

  3. Para cada bloque de flujo de datos, cree una tarea de continuación que establezca el siguiente bloque en el estado completado después de que finalice el bloque anterior.

  4. Publique los datos en el inicio de la canalización.

  5. Marque el inicio de la canalización como completado.

  6. Espere a que la canalización complete todo el trabajo.

Prerrequisitos

Lea Flujo de datos antes de iniciar este tutorial.

Creación de una aplicación de consola

En Visual Studio, cree un proyecto de aplicación de consola de Visual C# o Visual Basic. Instale el paquete NuGet System.Threading.Tasks.Dataflow.

Nota:

La biblioteca de flujos de datos TPL (el System.Threading.Tasks.Dataflow namespace) se incluye en .NET 6 y posteriores. Para los proyectos de .NET Framework y .NET Standard, debe instalar el 📦 paquete NuGet System.Threading.Tasks.Dataflow.

Agregue el código siguiente al proyecto para crear la aplicación básica.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Creación de los bloques de flujo de datos

Agregue el código siguiente al Main método para crear los bloques de flujo de datos que participan en la canalización. En la tabla siguiente se resume el rol de cada miembro de la canalización.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine($"Downloading '{uri}'...");

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Miembro Tipo Description
downloadString TransformBlock<TInput,TOutput> Descarga el texto del libro de la Web.
createWordList TransformBlock<TInput,TOutput> Separa el texto del libro en una matriz de palabras.
filterWordList TransformBlock<TInput,TOutput> Quita palabras cortas y duplicados de la matriz de palabras.
findReversedWords TransformManyBlock<TInput,TOutput> Busca todas las palabras de la colección de matrices de palabras filtradas cuya inversa también se produce en la matriz de palabras.
printReversedWords ActionBlock<TInput> Muestra las palabras y las palabras inversas correspondientes en la consola.

Aunque puede combinar varios pasos del flujo de datos en este ejemplo en un paso, el ejemplo ilustra el concepto de componer múltiples tareas de flujo de datos independientes para llevar a cabo una tarea más grande. En el ejemplo se usa TransformBlock<TInput,TOutput> para habilitar cada miembro de la canalización para realizar una operación en sus datos de entrada y enviar los resultados al siguiente paso de la canalización. El findReversedWords miembro de la canalización es un TransformManyBlock<TInput,TOutput> objeto porque genera varias salidas independientes para cada entrada. La cola de la canalización, printReversedWords, es un objeto ActionBlock<TInput> porque actúa sobre su entrada y no produce un resultado.

Formación de la canalización

Agregue el código siguiente para conectar cada bloque al siguiente bloque de la canalización.

Cuando se llama al LinkTo método para conectar un bloque de flujo de datos de origen a un bloque de flujo de datos de destino, el bloque de flujo de datos de origen propaga los datos al bloque de destino a medida que los datos están disponibles. Si también proporciona DataflowLinkOptions con PropagateCompletion establecido como verdadero, el éxito o fracaso de un bloque de la canalización hará que se complete el siguiente bloque de la canalización.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Publicar datos en la canalización

Agregue el código siguiente para publicar la dirección URL del libro The Iliad of Homer (El Iliad de Homer ) al encabezado de la canalización de flujo de datos.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

En este ejemplo se usa DataflowBlock.Post para enviar datos de forma sincrónica al encabezado de la canalización. Use el DataflowBlock.SendAsync método cuando deba enviar datos de forma asincrónica a un nodo de flujo de datos.

Finalización de la actividad de canalización

Agregue el siguiente código para marcar el inicio de la canalización como finalizado. La cabecera de la canalización transmite su finalización después de procesar todos los mensajes en cola.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

En este ejemplo, se envía una dirección URL a través de la canalización de flujo de datos para su procesamiento. Si envía más de una entrada a través de una canalización, llame al IDataflowBlock.Complete método después de enviar toda la entrada. Puede omitir este paso si la aplicación no tiene ningún punto bien definido en el que los datos ya no están disponibles o la aplicación no tiene que esperar a que finalice la canalización.

Esperar a que finalice la canalización

Agregue el código siguiente para esperar a que finalice la canalización. La operación general finaliza cuando finaliza la cola de la canalización.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

Puede esperar a que se complete el flujo de datos desde cualquier subproceso o desde varios subprocesos al mismo tiempo.

Ejemplo completo

En el ejemplo siguiente se muestra el código completo de este tutorial.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine($"Downloading '{uri}'...");

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Pasos siguientes

En este ejemplo se envía una dirección URL para procesar a través de la canalización de flujo de datos. Si envía más de un valor de entrada a través de una canalización, puede introducir una forma de paralelismo en la aplicación que se parezca a la forma en que las partes pueden moverse a través de una fábrica de automóviles. Cuando el primer miembro de la canalización envía su resultado al segundo miembro, puede procesar otro elemento en paralelo, ya que el segundo miembro procesa el primer resultado.

El paralelismo que se logra mediante canalizaciones de flujo de datos se conoce como paralelismo de grano grueso ya que normalmente consta de tareas más grandes y menos numerosas. También puede usar un paralelismo más específico de tareas más pequeñas y de ejecución corta en una canalización de flujo de datos. En este ejemplo, el findReversedWords miembro de la canalización usa PLINQ para procesar varios elementos de la lista de trabajo en paralelo. El uso del paralelismo de grano fino en un pipeline de grano grueso puede mejorar el rendimiento general.

También puede conectar un bloque de flujo de datos de origen a varios bloques de destino para crear una red de flujo de datos. La versión sobrecargada del LinkTo método toma un Predicate<T> objeto que define si el bloque de destino acepta cada mensaje en función de su valor. La mayoría de los tipos de bloques de flujo de datos que actúan como orígenes ofrecen mensajes a todos los bloques de destino conectados, en el orden en que se conectaron, hasta que uno de los bloques acepta ese mensaje. Mediante este mecanismo de filtrado, puede crear sistemas de bloques de flujo de datos conectados que dirijan determinados datos a través de una ruta de acceso y otros datos a través de otra ruta de acceso. Para obtener un ejemplo que usa el filtrado para crear una red de flujo de datos, consulte Tutorial: Uso del flujo de datos en una aplicación de Windows Forms.

Consulte también