Partilhar via


Passo a passo: Criando um pipeline de fluxo de dados

Embora você possa usar os métodos DataflowBlock.Receive, DataflowBlock.ReceiveAsynce DataflowBlock.TryReceive para receber mensagens de blocos de origem, também pode conectar blocos de mensagens para formar um pipeline de fluxo de dados . Um pipeline de fluxo de dados é uma série de componentes, ou blocos de fluxo de dados, cada um dos quais executa uma tarefa específica que contribui para um objetivo maior. Cada bloco de fluxo de dados numa linha de processamento de dados executa o trabalho quando recebe uma mensagem de outro bloco de fluxo de dados. Uma analogia a isso é uma linha de montagem para fabricação de automóveis. À medida que cada veículo passa pela linha de montagem, uma estação monta o quadro, a próxima instala o motor e assim por diante. Como uma linha de montagem permite que vários veículos sejam montados ao mesmo tempo, ela oferece melhor rendimento do que montar veículos completos um de cada vez.

Este documento demonstra um pipeline de processamento de dados que baixa o livro A Ilíada de Homero de um site e pesquisa o texto, procurando palavras individuais que correspondam a outras palavras cujos caracteres invertam os da primeira palavra. A formação do pipeline de fluxo de dados neste documento consiste nas seguintes etapas:

  1. Crie os blocos de fluxo de dados que participam da canalização.

  2. Conecte cada bloco de fluxo de dados ao bloco seguinte na tubulação. Cada bloco recebe como entrada a saída do bloco anterior no pipeline.

  3. Para cada bloco de fluxo de dados, crie uma tarefa de continuação que defina o próximo bloco para o estado concluído após a conclusão do bloco anterior.

  4. Enviar dados para o início da linha de processamento.

  5. Marque a cabeça do gasoduto como concluída.

  6. Aguarde até que o pipeline conclua todo o trabalho.

Pré-requisitos

Leia Dataflow antes de iniciar este passo a passo.

Criando um aplicativo de console

No Visual Studio, crie um projeto Visual C# ou Visual Basic Console Application. Instale o pacote NuGet System.Threading.Tasks.Dataflow.

Observação

A biblioteca de fluxo de dados TPL (o namespace System.Threading.Tasks.Dataflow) não é distribuída com o .NET. Para instalar o namespace System.Threading.Tasks.Dataflow no Visual Studio, abra o seu projeto, escolha Gerir Pacotes NuGet no menu Projeto e pesquise o pacote System.Threading.Tasks.Dataflow online. Como alternativa, para instalá-lo usando a CLI do .NET Core, execute dotnet add package System.Threading.Tasks.Dataflow.

Adicione o seguinte código ao seu projeto para criar o aplicativo básico.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Criando os blocos de fluxo de dados

Adicione o seguinte código ao método Main para criar os blocos de fluxo de dados que participam do pipeline. A tabela a seguir resume a função de cada membro do pipeline.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine($"Downloading '{uri}'...");

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Membro Tipo Descrição
downloadString TransformBlock<TInput,TOutput> Descarrega o texto do livro a partir da Web.
createWordList TransformBlock<TInput,TOutput> Separa o texto do livro em uma matriz de palavras.
filterWordList TransformBlock<TInput,TOutput> Remove palavras curtas e duplicados da matriz de palavras.
findReversedWords TransformManyBlock<TInput,TOutput> Localiza todas as palavras na coleção de matriz de palavras filtrada cujo inverso também ocorre na matriz de palavras.
printReversedWords ActionBlock<TInput> Exibe palavras e as palavras inversas correspondentes no console.

Embora você possa combinar várias etapas no pipeline de fluxo de dados neste exemplo em uma etapa, o exemplo ilustra o conceito de compor várias tarefas independentes de fluxo de dados para executar uma tarefa maior. O exemplo usa TransformBlock<TInput,TOutput> para permitir que cada membro do pipeline execute uma operação em seus dados de entrada e envie os resultados para a próxima etapa do pipeline. O membro findReversedWords do pipeline é um objeto TransformManyBlock<TInput,TOutput> porque produz várias saídas independentes para cada entrada. A cauda do pipeline, printReversedWords, é um objeto ActionBlock<TInput> porque executa uma ação na sua entrada e não produz um resultado.

Formando o gasoduto

Adicione o seguinte código para conectar cada bloco ao próximo bloco no pipeline.

Quando você chama o método LinkTo para conectar um bloco de fluxo de dados de origem a um bloco de fluxo de dados de destino, o bloco de fluxo de dados de origem propaga dados para o bloco de destino à medida que os dados ficam disponíveis. Se também forneceres DataflowLinkOptions com PropagateCompletion definido como verdadeiro, a conclusão, seja bem-sucedida ou malsucedida, de um bloco no pipeline fará com que o próximo bloco também se conclua.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Envio de Dados para o Pipeline

Adicione o seguinte código para postar a URL do livro A Ilíada de Homero no cabeçalho do pipeline de fluxo de dados.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

Este exemplo usa DataflowBlock.Post para enviar dados síncronamente para o início do pipeline. Use o método DataflowBlock.SendAsync quando precisar enviar dados de forma assíncrona para um nó de fluxo de dados.

Conclusão da atividade do pipeline

Adicione o seguinte código para marcar a cabeça do pipeline como concluída. O chefe do pipeline propaga sua conclusão depois de processar todas as mensagens armazenadas em buffer.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

Este exemplo envia uma URL para ser processada através do pipeline de fluxo de dados. Se você enviar mais de uma entrada através de um pipeline, chame o método IDataflowBlock.Complete depois de enviar toda a entrada. Você pode omitir essa etapa se seu aplicativo não tiver um ponto bem definido no qual os dados não estejam mais disponíveis ou se o aplicativo não precisar esperar a conclusão do pipeline.

Aguardando a conclusão do pipeline

Adicione o seguinte código para aguardar a conclusão do pipeline. A operação geral é concluída quando a cauda do gasoduto termina.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

Você pode aguardar a conclusão do fluxo de dados de qualquer thread ou de vários threads ao mesmo tempo.

O exemplo completo

O exemplo a seguir mostra o código completo para este passo a passo.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine($"Downloading '{uri}'...");

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Próximos passos

Este exemplo envia uma URL para ser processada através do pipeline de fluxo de dados. Se você enviar mais de um valor de entrada por meio de um pipeline, poderá introduzir uma forma de paralelismo em seu aplicativo que se assemelha a como as peças podem se mover por uma fábrica de automóveis. Quando o primeiro membro do pipeline envia seu resultado para o segundo membro, ele pode processar outro item em paralelo enquanto o segundo membro processa o primeiro resultado.

O paralelismo que é obtido usando pipelines de fluxo de dados é conhecido como paralelismo grosseiro porque normalmente consiste em tarefas menores e maiores. Você também pode usar um paralelismo mais refinado de tarefas menores e de execução curta em um pipeline de fluxo de dados. Neste exemplo, o membro findReversedWords do pipeline usa PLINQ para processar vários itens na lista de trabalho em paralelo. O uso de paralelismo de grão fino em um oleoduto de grão grosso pode melhorar o rendimento geral.

Você também pode conectar um bloco de fluxo de dados de origem a vários blocos de destino para criar uma rede de fluxo de dados . A versão sobrecarregada do método LinkTo usa um objeto Predicate<T> que define se o bloco de destino aceita cada mensagem com base em seu valor. A maioria dos tipos de bloco de fluxo de dados que atuam como fontes oferecem mensagens para todos os blocos de destino conectados, na ordem em que foram conectados, até que um dos blocos aceite essa mensagem. Usando esse mecanismo de filtragem, você pode criar sistemas de blocos de fluxo de dados conectados que direcionam determinados dados por um caminho e outros dados por outro caminho. Para obter um exemplo que usa filtragem para criar uma rede de fluxo de dados, consulte Passo a passo: Usando o fluxo de dados em um aplicativo do Windows Forms.

Ver também