Ler em inglês

Partilhar via


Fluxo de dados (biblioteca paralela de tarefas)

A TPL (Task Parallel Library) fornece componentes de fluxo de dados para ajudar a aumentar a robustez de aplicativos habilitados para simultaneidade. Esses componentes de fluxo de dados são coletivamente chamados de Biblioteca de Fluxo de Dados TPL. Esse modelo de fluxo de dados promove a programação baseada em atores, fornecendo passagem de mensagens em processo para tarefas de fluxo de dados e pipelining de grão grosso. Os componentes de fluxo de dados se baseiam nos tipos e na infraestrutura de agendamento do TPL e se integram ao suporte às linguagens C#, Visual Basic e F# para programação assíncrona. Esses componentes de fluxo de dados são úteis quando você tem várias operações que devem se comunicar entre si de forma assíncrona ou quando deseja processar dados à medida que eles ficam disponíveis. Por exemplo, considere um aplicativo que processa dados de imagem de uma câmera web. Usando o modelo de fluxo de dados, o aplicativo pode processar quadros de imagem à medida que eles ficam disponíveis. Se o aplicativo aprimorar quadros de imagem, por exemplo, executando correção de luz ou redução de olhos vermelhos, você poderá criar um pipeline de componentes de fluxo de dados. Cada estágio do pipeline pode usar a funcionalidade de paralelismo de grão mais grosso, como a funcionalidade fornecida pelo TPL, para transformar a imagem.

Este documento fornece uma visão geral da Biblioteca de Fluxo de Dados TPL. Ele descreve o modelo de programação, os tipos de bloco de fluxo de dados predefinidos e como configurar blocos de fluxo de dados para atender aos requisitos específicos de seus aplicativos.

Nota

A biblioteca de fluxo de dados TPL (o namespace) não é distribuída com o System.Threading.Tasks.Dataflow .NET. Para instalar o System.Threading.Tasks.Dataflow namespace no Visual Studio, abra seu projeto, escolha Gerenciar pacotes NuGet no menu Projeto e pesquise o System.Threading.Tasks.Dataflow pacote online. Como alternativa, para instalá-lo usando a CLI do .NET Core, execute dotnet add package System.Threading.Tasks.Dataflow.

Modelo de Programação

A TPL Dataflow Library fornece uma base para a passagem de mensagens e paralelização de aplicativos com uso intensivo de CPU e E/S que têm alta taxa de transferência e baixa latência. Ele também oferece controle explícito sobre como os dados são armazenados em buffer e se movem pelo sistema. Para entender melhor o modelo de programação de fluxo de dados, considere um aplicativo que carrega imagens do disco de forma assíncrona e cria uma composição dessas imagens. Os modelos de programação tradicionais normalmente exigem que você use retornos de chamada e objetos de sincronização, como bloqueios, para coordenar tarefas e acessar dados compartilhados. Usando o modelo de programação de fluxo de dados, você pode criar objetos de fluxo de dados que processam imagens à medida que são lidas do disco. No modelo de fluxo de dados, você declara como os dados são manipulados quando ficam disponíveis e também quaisquer dependências entre dados. Como o tempo de execução gerencia dependências entre dados, muitas vezes você pode evitar a necessidade de sincronizar o acesso aos dados compartilhados. Além disso, como as agendas de tempo de execução funcionam com base na chegada assíncrona de dados, o fluxo de dados pode melhorar a capacidade de resposta e a taxa de transferência gerenciando com eficiência os threads subjacentes. Para obter um exemplo que usa o modelo de programação de fluxo de dados para implementar o processamento de imagem em um aplicativo do Windows Forms, consulte Passo a passo: Usando o fluxo de dados em um aplicativo do Windows Forms.

Fontes e Alvos

A Biblioteca de Fluxo de Dados TPL consiste em blocos de fluxo de dados, que são estruturas de dados que armazenam em buffer e processam dados. O TPL define três tipos de blocos de fluxo de dados: blocos de origem, blocos de destino e blocos de propagador. Um bloco de origem atua como uma fonte de dados e pode ser lido. Um bloco de destino atua como um recetor de dados e pode ser gravado. Um bloco propagador atua como um bloco de origem e um bloco de destino, e pode ser lido e gravado. O TPL define a System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> interface para representar fontes, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> para representar alvos e System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> para representar propagadores. IPropagatorBlock<TInput,TOutput>herda de ambos e ISourceBlock<TOutput>ITargetBlock<TInput>.

A Biblioteca de Fluxo de Dados TPL fornece vários tipos de bloco de fluxo de dados predefinidos que implementam as ISourceBlock<TOutput>interfaces , ITargetBlock<TInput>e IPropagatorBlock<TInput,TOutput> . Esses tipos de bloco de fluxo de dados são descritos neste documento na seção Tipos de bloco de fluxo de dados predefinidos.

Conectando blocos

Você pode conectar blocos de fluxo de dados para formar pipelines, que são sequências lineares de blocos de fluxo de dados, ou redes, que são gráficos de blocos de fluxo de dados. Um pipeline é uma forma de rede. Em um pipeline ou rede, as fontes propagam dados de forma assíncrona para destinos à medida que esses dados ficam disponíveis. O ISourceBlock<TOutput>.LinkTo método vincula um bloco de fluxo de dados de origem a um bloco de destino. Uma fonte pode ser vinculada a zero ou mais alvos; os alvos podem ser vinculados a partir de zero ou mais fontes. Você pode adicionar ou remover blocos de fluxo de dados de ou para um pipeline ou rede simultaneamente. Os tipos de bloco de fluxo de dados predefinidos lidam com todos os aspetos de segurança de thread de vinculação e desvinculação.

Para obter um exemplo que conecta blocos de fluxo de dados para formar um pipeline básico, consulte Passo a passo: Criando um pipeline de fluxo de dados. Para obter um exemplo que conecta blocos de fluxo de dados para formar uma rede mais complexa, consulte Passo a passo: Usando o fluxo de dados em um aplicativo do Windows Forms. Para obter um exemplo que desvincula um destino de uma origem depois que a origem oferece uma mensagem ao destino, consulte Como desvincular blocos de fluxo de dados.

Filtragem

Quando você chama o ISourceBlock<TOutput>.LinkTo método para vincular uma origem a um destino, você pode fornecer um delegado que determina se o bloco de destino aceita ou rejeita uma mensagem com base no valor dessa mensagem. Esse mecanismo de filtragem é uma maneira útil de garantir que um bloco de fluxo de dados receba apenas determinados valores. Para a maioria dos tipos de bloco de fluxo de dados predefinidos, se um bloco de origem estiver conectado a vários blocos de destino, quando um bloco de destino rejeitar uma mensagem, a origem oferecerá essa mensagem ao próximo destino. A ordem em que uma fonte oferece mensagens aos alvos é definida pela fonte e pode variar de acordo com o tipo da fonte. A maioria dos tipos de bloco de origem deixa de oferecer uma mensagem depois que um destino aceita essa mensagem. Uma exceção a essa regra é a BroadcastBlock<T> classe, que oferece cada mensagem a todos os destinos, mesmo que alguns destinos rejeitem a mensagem. Para obter um exemplo que usa filtragem para processar apenas determinadas mensagens, consulte Passo a passo: Usando o fluxo de dados em um aplicativo do Windows Forms.

Importante

Como cada tipo de bloco de fluxo de dados de origem predefinido garante que as mensagens sejam propagadas na ordem em que são recebidas, cada mensagem deve ser lida do bloco de origem antes que o bloco de origem possa processar a próxima mensagem. Portanto, ao usar a filtragem para conectar vários destinos a uma origem, certifique-se de que pelo menos um bloco de destino receba cada mensagem. Caso contrário, seu aplicativo pode ficar bloqueado.

Passagem de mensagens

O modelo de programação de fluxo de dados está relacionado ao conceito de passagem de mensagens, onde componentes independentes de um programa se comunicam entre si enviando mensagens. Uma maneira de propagar mensagens entre componentes de aplicativo é chamar os métodos (síncrono) e (assíncrono Post ) para enviar mensagens para blocos de fluxo de dados de destino e os Receivemétodos , ReceiveAsynce TryReceive para receber mensagens de blocos de origem.SendAsync Você pode combinar esses métodos com pipelines ou redes de fluxo de dados enviando dados de entrada para o nó principal (um bloco de destino) e recebendo dados de saída do nó terminal do pipeline ou dos nós terminais da rede (um ou mais blocos de origem). Você também pode usar o Choose método para ler a partir da primeira das fontes fornecidas que tem dados disponíveis e executar ações sobre esses dados.

Os blocos de origem oferecem dados aos blocos de destino chamando o ITargetBlock<TInput>.OfferMessage método. O bloco de destino responde a uma mensagem oferecida de três maneiras: pode aceitar a mensagem, recusar a mensagem ou adiá-la. Quando o destino aceita a mensagem, o OfferMessage método retorna Accepted. Quando o destino recusa a mensagem, o OfferMessage método retorna Declined. Quando o destino requer que não receba mais mensagens da origem, OfferMessage retorna DecliningPermanently. Os tipos de bloco de origem predefinidos não oferecem mensagens para destinos vinculados depois que esse valor de retorno é recebido e eles se desvinculam automaticamente desses destinos.

Quando um bloco de destino adia a mensagem para uso posterior, o OfferMessage método retorna Postponed. Um bloco de destino que adia uma mensagem pode mais tarde chamar o ISourceBlock<TOutput>.ReserveMessage método para tentar reservar a mensagem oferecida. Neste ponto, a mensagem ainda está disponível e pode ser usada pelo bloco de destino, ou a mensagem foi tomada por outro destino. Quando o bloco de destino mais tarde requer a mensagem ou não precisa mais da mensagem, ele chama o ISourceBlock<TOutput>.ConsumeMessage método or ReleaseReservation , respectivamente. A reserva de mensagens é normalmente usada pelos tipos de bloco de fluxo de dados que operam no modo não ganancioso. O modo não ganancioso é explicado mais adiante neste documento. Em vez de reservar uma mensagem adiada, um bloco de destino também pode usar o ISourceBlock<TOutput>.ConsumeMessage método para tentar consumir diretamente a mensagem adiada.

Conclusão do bloco de fluxo de dados

Os blocos de fluxo de dados também suportam o conceito de conclusão. Um bloco de fluxo de dados que está no estado concluído não executa nenhum trabalho adicional. Cada bloco de fluxo de dados tem um objeto associado System.Threading.Tasks.Task , conhecido como tarefa de conclusão, que representa o status de conclusão do bloco. Como você pode aguardar a conclusão de um Task objeto, usando tarefas de conclusão, você pode aguardar a conclusão de um ou mais nós de terminal de uma rede de fluxo de dados. A IDataflowBlock interface define o Complete método, que informa o bloco de fluxo de dados de uma solicitação para que ele seja concluído, e a Completion propriedade, que retorna a tarefa de conclusão para o bloco de fluxo de dados. Ambos ISourceBlock<TOutput> e ITargetBlock<TInput> herdar a IDataflowBlock interface.

Há duas maneiras de determinar se um bloco de fluxo de dados foi concluído sem erros, encontrou um ou mais erros ou foi cancelado. A primeira maneira é chamar o Task.Wait método na tarefa de conclusão em um-trycatch bloco (Try-Catch no Visual Basic). O exemplo a seguir cria um ActionBlock<TInput> objeto que lança se seu valor de ArgumentOutOfRangeException entrada for menor que zero. AggregateException é lançado quando este exemplo chama Wait a tarefa de conclusão. O ArgumentOutOfRangeException é acessado através da InnerExceptions propriedade do AggregateException objeto.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/

Este exemplo demonstra o caso em que uma exceção não é tratada no delegado de um bloco de fluxo de dados de execução. Recomendamos que você lide com exceções nos corpos desses blocos. No entanto, se você não conseguir fazer isso, o bloco se comporta como se tivesse sido cancelado e não processa as mensagens recebidas.

Quando um bloco de fluxo de dados é cancelado explicitamente, o AggregateException objeto contém OperationCanceledException na InnerExceptions propriedade. Para obter mais informações sobre o cancelamento do fluxo de dados, consulte a seção Habilitando o cancelamento .

A segunda maneira de determinar o status de conclusão de um bloco de fluxo de dados é usar uma continuação da tarefa de conclusão ou usar os recursos de linguagem assíncrona de C# e Visual Basic para aguardar assincronamente a tarefa de conclusão. O delegado que você fornece ao Task.ContinueWith método usa um Task objeto que representa a tarefa antecedente. No caso da Completion propriedade, o delegado para a continuação assume a tarefa de conclusão em si. O exemplo a seguir é semelhante ao anterior, exceto que ele também usa o ContinueWith método para criar uma tarefa de continuação que imprime o status da operação de fluxo de dados geral.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.",
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/

Você também pode usar propriedades como IsCanceled no corpo da tarefa de continuação para determinar informações adicionais sobre o status de conclusão de um bloco de fluxo de dados. Para obter mais informações sobre tarefas de continuação e como elas se relacionam com cancelamento e tratamento de erros, consulte Encadeando tarefas usando tarefas de continuação, Cancelamento de tarefas e Tratamento de exceções.

Tipos de bloco de fluxo de dados predefinidos

A Biblioteca de Fluxo de Dados TPL fornece vários tipos de blocos de fluxo de dados predefinidos. Esses tipos são divididos em três categorias: blocos de buffer, blocos de execução e blocos de agrupamento. As seções a seguir descrevem os tipos de bloco que compõem essas categorias.

Blocos de buffer

Os blocos de buffer armazenam dados para uso pelos consumidores de dados. A Biblioteca de Fluxo de Dados TPL fornece três tipos de blocos de buffer: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>e System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

A BufferBlock<T> classe representa uma estrutura de mensagens assíncrona de uso geral. Essa classe armazena uma fila FIFO (first in, first out) de mensagens que podem ser gravadas por várias fontes ou lidas por vários destinos. Quando um destino recebe uma mensagem de um BufferBlock<T> objeto, essa mensagem é removida da fila de mensagens. Portanto, embora um BufferBlock<T> objeto possa ter vários destinos, apenas um destino receberá cada mensagem. A BufferBlock<T> classe é útil quando você deseja passar várias mensagens para outro componente, e esse componente deve receber cada mensagem.

O exemplo básico a seguir posta vários Int32 valores em um BufferBlock<T> objeto e, em seguida, lê esses valores de volta desse objeto.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */

Para obter um exemplo completo que demonstra como escrever e ler mensagens de um BufferBlock<T> objeto, consulte Como gravar e ler mensagens de um bloco de fluxo de dados.

BroadcastBlock<T>

A BroadcastBlock<T> classe é útil quando você deve passar várias mensagens para outro componente, mas esse componente precisa apenas do valor mais recente. Essa classe também é útil quando você deseja transmitir uma mensagem para vários componentes.

O exemplo básico a seguir posta um Double valor em um BroadcastBlock<T> objeto e, em seguida, lê esse valor de volta desse objeto várias vezes. Como os valores não são removidos dos objetos depois de BroadcastBlock<T> serem lidos, o mesmo valor está disponível sempre.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */

Para obter um exemplo completo que demonstra como usar BroadcastBlock<T> para transmitir uma mensagem para vários blocos de destino, consulte Como especificar um agendador de tarefas em um bloco de fluxo de dados.

WriteOnceBlock<T>

A WriteOnceBlock<T> classe é semelhante à BroadcastBlock<T> classe, exceto que um WriteOnceBlock<T> objeto pode ser gravado apenas uma vez. Você pode pensar WriteOnceBlock<T> que é semelhante à palavra-chave somente leitura em C# (ReadOnly no Visual Basic), exceto que um WriteOnceBlock<T> objeto se torna imutável depois de receber um valor em vez de na construção. Como a BroadcastBlock<T> classe, quando um destino recebe uma mensagem de um WriteOnceBlock<T> objeto, essa mensagem não é removida desse objeto. Portanto, vários destinos recebem uma cópia da mensagem. A WriteOnceBlock<T> classe é útil quando você deseja propagar apenas a primeira de várias mensagens.

O exemplo básico a seguir posta vários String valores em um WriteOnceBlock<T> objeto e, em seguida, lê o valor de volta desse objeto. Como um WriteOnceBlock<T> objeto pode ser gravado apenas uma vez, depois que um WriteOnceBlock<T> objeto recebe uma mensagem, ele descarta as mensagens subsequentes.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */

Para obter um exemplo completo que demonstra como usar WriteOnceBlock<T> para receber o valor da primeira operação concluída, consulte Como desvincular blocos de fluxo de dados.

Blocos de Execução

Os blocos de execução chamam um delegado fornecido pelo usuário para cada parte dos dados recebidos. A Biblioteca de Fluxo de Dados TPL fornece três tipos de blocos de execução: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>e System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

A ActionBlock<TInput> classe é um bloco de destino que chama um delegado quando ele recebe dados. Pense em um ActionBlock<TInput> objeto como um delegado que é executado de forma assíncrona quando os dados ficam disponíveis. O delegado que você fornece a um ActionBlock<TInput> objeto pode ser do tipo Action<T> .System.Func<TInput, Task> Quando você usa um ActionBlock<TInput> objeto com Action<T>, o processamento de cada elemento de entrada é considerado concluído quando o delegado retorna. Quando você usa um ActionBlock<TInput> objeto com System.Func<TInput, Task>, o processamento de cada elemento de entrada é considerado concluído somente quando o objeto retornado Task é concluído. Usando esses dois mecanismos, você pode usar ActionBlock<TInput> para processamento síncrono e assíncrono de cada elemento de entrada.

O exemplo básico a seguir posta vários Int32 valores em um ActionBlock<TInput> objeto. O ActionBlock<TInput> objeto imprime esses valores no console. Este exemplo define o bloco para o estado concluído e aguarda a conclusão de todas as tarefas de fluxo de dados.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */

Para obter exemplos completos que demonstram como usar delegados com a ActionBlock<TInput> classe, consulte Como executar ação quando um bloco de fluxo de dados recebe dados.

TransformBlock<TInput, TOutput>

A TransformBlock<TInput,TOutput> classe se assemelha à ActionBlock<TInput> classe, exceto que ela atua como uma fonte e como um destino. O delegado que você passa para um TransformBlock<TInput,TOutput> objeto retorna um valor do tipo TOutput. O delegado que você fornece a um TransformBlock<TInput,TOutput> objeto pode ser do tipo System.Func<TInput, TOutput> .System.Func<TInput, Task<TOutput>> Quando você usa um TransformBlock<TInput,TOutput> objeto com System.Func<TInput, TOutput>, o processamento de cada elemento de entrada é considerado concluído quando o delegado retorna. Quando você usa um TransformBlock<TInput,TOutput> objeto usado com System.Func<TInput, Task<TOutput>>o , o processamento de cada elemento de entrada é considerado concluído somente quando o objeto retornado Task<TResult> é concluído. Assim como no ActionBlock<TInput>, usando esses dois mecanismos, você pode usar TransformBlock<TInput,TOutput> para processamento síncrono e assíncrono de cada elemento de entrada.

O exemplo básico a seguir cria um TransformBlock<TInput,TOutput> objeto que calcula a raiz quadrada de sua entrada. O TransformBlock<TInput,TOutput> objeto usa Int32 valores como entrada e produz Double valores como saída.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */

Para obter exemplos completos que usa TransformBlock<TInput,TOutput> em uma rede de blocos de fluxo de dados que executa o processamento de imagem em um aplicativo do Windows Forms, consulte Passo a passo: Usando o fluxo de dados em um aplicativo do Windows Forms.

TransformManyBlock<TInput, TOutput>

A TransformManyBlock<TInput,TOutput> classe é semelhante à TransformBlock<TInput,TOutput> classe, exceto que TransformManyBlock<TInput,TOutput> produz zero ou mais valores de saída para cada valor de entrada, em vez de apenas um valor de saída para cada valor de entrada. O delegado que você fornece a um TransformManyBlock<TInput,TOutput> objeto pode ser do tipo System.Func<TInput, IEnumerable<TOutput>> .System.Func<TInput, Task<IEnumerable<TOutput>>> Quando você usa um TransformManyBlock<TInput,TOutput> objeto com System.Func<TInput, IEnumerable<TOutput>>, o processamento de cada elemento de entrada é considerado concluído quando o delegado retorna. Quando você usa um TransformManyBlock<TInput,TOutput> objeto com System.Func<TInput, Task<IEnumerable<TOutput>>>, o processamento de cada elemento de entrada é considerado concluído somente quando o objeto retornado System.Threading.Tasks.Task<IEnumerable<TOutput>> é concluído.

O exemplo básico a seguir cria um TransformManyBlock<TInput,TOutput> objeto que divide cadeias de caracteres em suas sequências de caracteres individuais. O TransformManyBlock<TInput,TOutput> objeto usa String valores como entrada e produz Char valores como saída.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */

Para obter exemplos completos que usam TransformManyBlock<TInput,TOutput> para produzir várias saídas independentes para cada entrada em um pipeline de fluxo de dados, consulte Passo a passo: Criando um pipeline de fluxo de dados.

Grau de paralelismo

Cada ActionBlock<TInput>objeto , TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput> armazena mensagens de entrada em buffer até que o bloco esteja pronto para processá-las. Por padrão, essas classes processam mensagens na ordem em que são recebidas, uma mensagem de cada vez. Você também pode especificar o grau de paralelismo para habilitar ActionBlock<TInput>o , TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput> objetos para processar várias mensagens simultaneamente. Para obter mais informações sobre execução simultânea, consulte a seção Especificando o grau de paralelismo mais adiante neste documento. Para obter um exemplo que define o grau de paralelismo para permitir que um bloco de fluxo de dados de execução processe mais de uma mensagem de cada vez, consulte Como especificar o grau de paralelismo em um bloco de fluxo de dados.

Resumo dos tipos de delegados

A tabela a seguir resume os tipos de delegados que você pode fornecer a ActionBlock<TInput>, TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput> objetos. Esta tabela também especifica se o tipo de delegado opera de forma síncrona ou assíncrona.

Type Tipo de delegado síncrono Tipo de delegado assíncrono
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

Você também pode usar expressões lambda ao trabalhar com tipos de bloco de execução. Para obter um exemplo que mostra como usar uma expressão lambda com um bloco de execução, consulte Como executar uma ação quando um bloco de fluxo de dados recebe dados.

Agrupamento de Blocos

Os blocos de agrupamento combinam dados de uma ou mais fontes e sob várias restrições. A Biblioteca de Fluxo de Dados TPL fornece três tipos de blocos de junção: BatchBlock<T>, JoinBlock<T1,T2>e BatchedJoinBlock<T1,T2>.

Bloco T<>

A BatchBlock<T> classe combina conjuntos de dados de entrada, que são conhecidos como lotes, em matrizes de dados de saída. Você especifica o tamanho de cada lote ao criar um BatchBlock<T> objeto. Quando o BatchBlock<T> objeto recebe a contagem especificada de elementos de entrada, ele propaga de forma assíncrona uma matriz que contém esses elementos. Se um BatchBlock<T> objeto é definido para o estado concluído, mas não contém elementos suficientes para formar um lote, ele propaga uma matriz final que contém os elementos de entrada restantes.

A BatchBlock<T> classe opera no modo ganancioso ou não-ganancioso . No modo ganancioso, que é o padrão, um BatchBlock<T> objeto aceita todas as mensagens que lhe são oferecidas e propaga uma matriz depois de receber a contagem especificada de elementos. No modo não ganancioso, um BatchBlock<T> objeto adia todas as mensagens recebidas até que fontes suficientes tenham oferecido mensagens ao bloco para formar um lote. O modo ganancioso normalmente tem um desempenho melhor do que o modo não ganancioso porque requer menos sobrecarga de processamento. No entanto, você pode usar o modo não ganancioso quando você deve coordenar o consumo de várias fontes de forma atômica. Especifique o dataflowBlockOptions modo não ganancioso definindo Greedy como False no parâmetro no BatchBlock<T> construtor.

O exemplo básico a seguir envia vários Int32 valores para um BatchBlock<T> objeto que contém dez elementos em um lote. Para garantir que todos os valores se propaguem para BatchBlock<T>fora do , este exemplo chama o Complete método. O Complete método define o BatchBlock<T> objeto para o estado concluído e, portanto, o BatchBlock<T> objeto propaga todos os elementos restantes como um lote final.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */

Para obter um exemplo completo que usa BatchBlock<T> para melhorar a eficiência das operações de inserção de banco de dados, consulte Passo a passo: Usando BatchBlock e BatchedJoinBlock para melhorar a eficiência.

JoinBlock<T1, T2, ...>

As JoinBlock<T1,T2> classes e JoinBlock<T1,T2,T3> coletam elementos de entrada e se propagam ou System.Tuple<T1,T2>System.Tuple<T1,T2,T3> objetos que contêm esses elementos. As JoinBlock<T1,T2> classes e JoinBlock<T1,T2,T3> não herdam de ITargetBlock<TInput>. Em vez disso, eles fornecem propriedades, Target1, , e Target3, que implementam ITargetBlock<TInput>Target2.

Como BatchBlock<T>, JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> operar em modo ganancioso ou não-ganancioso. No modo ganancioso, que é o padrão, um JoinBlock<T1,T2> ou JoinBlock<T1,T2,T3> objeto aceita cada mensagem que lhe é oferecida e propaga uma tupla depois que cada um de seus alvos recebe pelo menos uma mensagem. No modo não ganancioso, um JoinBlock<T1,T2> ou JoinBlock<T1,T2,T3> objeto adia todas as mensagens recebidas até que todos os alvos tenham recebido os dados necessários para criar uma tupla. Neste ponto, o bloco se envolve em um protocolo de confirmação de duas fases para recuperar atomicamente todos os itens necessários das fontes. Este adiamento permite que outra entidade consuma os dados entretanto, para permitir que o sistema global avance para a frente.

O exemplo básico a seguir demonstra um caso em que um JoinBlock<T1,T2,T3> objeto requer vários dados para calcular um valor. Este exemplo cria um JoinBlock<T1,T2,T3> objeto que requer dois Int32 valores e um Char valor para executar uma operação aritmética.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */

Para obter um exemplo completo que usa JoinBlock<T1,T2> objetos no modo não ganancioso para compartilhar cooperativamente um recurso, consulte Como usar JoinBlock para ler dados de várias fontes.

BatchedJoinBlock<T1, T2, ...>

As BatchedJoinBlock<T1,T2> classes e BatchedJoinBlock<T1,T2,T3> coletam lotes de elementos de entrada e se propagam ou System.Tuple(IList(T1), IList(T2))System.Tuple(IList(T1), IList(T2), IList(T3)) objetos que contêm esses elementos. Pense em BatchedJoinBlock<T1,T2> como uma combinação de BatchBlock<T> e JoinBlock<T1,T2>. Especifique o tamanho de cada lote ao criar um BatchedJoinBlock<T1,T2> objeto. BatchedJoinBlock<T1,T2> também fornece propriedades, Target1 e Target2, que implementam ITargetBlock<TInput>. Quando a contagem especificada de elementos de entrada é recebida de todos os destinos, o BatchedJoinBlock<T1,T2> objeto propaga de forma assíncrona um System.Tuple(IList(T1), IList(T2)) objeto que contém esses elementos.

O exemplo básico a seguir cria um BatchedJoinBlock<T1,T2> objeto que contém resultados, Int32 valores e erros que são Exception objetos. Este exemplo executa várias operações e grava resultados na Target1 propriedade e erros na Target2 propriedade do BatchedJoinBlock<T1,T2> objeto. Como a contagem de operações bem-sucedidas e com falha é desconhecida antecipadamente, os IList<T> objetos permitem que cada destino receba zero ou mais valores.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */

Para obter um exemplo completo que usa BatchedJoinBlock<T1,T2> para capturar os resultados e quaisquer exceções que ocorrem enquanto o programa lê de um banco de dados, consulte Passo a passo: Usando BatchBlock e BatchedJoinBlock para melhorar a eficiência.

Configurando o comportamento do bloco de fluxo de dados

Você pode habilitar opções adicionais fornecendo um System.Threading.Tasks.Dataflow.DataflowBlockOptions objeto para o construtor de tipos de bloco de fluxo de dados. Essas opções controlam o comportamento, como o agendador que gerencia a tarefa subjacente e o grau de paralelismo. O DataflowBlockOptions também tem tipos derivados que especificam o comportamento que é específico para determinados tipos de bloco de fluxo de dados. A tabela a seguir resume qual tipo de opção está associado a cada tipo de bloco de fluxo de dados.

As seções a seguir fornecem informações adicionais sobre os tipos importantes de opções de bloco de fluxo de dados disponíveis por meio das System.Threading.Tasks.Dataflow.DataflowBlockOptionsclasses , System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionse System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions .

Especificando o Agendador de Tarefas

Cada bloco de fluxo de dados predefinido usa o mecanismo de agendamento de tarefas TPL para executar atividades como propagar dados para um destino, receber dados de uma fonte e executar delegados definidos pelo usuário quando os dados ficam disponíveis. TaskScheduler é uma classe abstrata que representa um agendador de tarefas que enfileira tarefas em threads. O agendador de tarefas padrão, Default, usa a ThreadPool classe para enfileirar e executar trabalho. Você pode substituir o agendador de tarefas padrão definindo a TaskScheduler propriedade ao construir um objeto de bloco de fluxo de dados.

Quando o mesmo agendador de tarefas gerencia vários blocos de fluxo de dados, ele pode impor políticas entre eles. Por exemplo, se vários blocos de fluxo de dados forem configurados para direcionar o agendador exclusivo do mesmo ConcurrentExclusiveSchedulerPair objeto, todo o trabalho executado nesses blocos será serializado. Da mesma forma, se esses blocos estiverem configurados para direcionar o agendador simultâneo do mesmo ConcurrentExclusiveSchedulerPair objeto e esse agendador estiver configurado para ter um nível máximo de simultaneidade, todo o trabalho desses blocos será limitado a esse número de operações simultâneas. Para obter um exemplo que usa a ConcurrentExclusiveSchedulerPair classe para permitir que as operações de leitura ocorram em paralelo, mas as operações de gravação ocorram exclusivamente de todas as outras operações, consulte Como especificar um Agendador de Tarefas em um bloco de fluxo de dados. Para obter mais informações sobre agendadores de tarefas no TPL, consulte o tópico da TaskScheduler classe.

Especificando o grau de paralelismo

Por padrão, os três tipos de bloco de execução que a Biblioteca de Fluxo de Dados TPL fornece, ActionBlock<TInput>, TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput>, processam uma mensagem de cada vez. Esses tipos de bloco de fluxo de dados também processam mensagens na ordem em que são recebidas. Para permitir que esses blocos de fluxo de dados processem mensagens simultaneamente, defina a ExecutionDataflowBlockOptions.MaxDegreeOfParallelism propriedade ao construir o objeto de bloco de fluxo de dados.

O valor padrão de é 1, que garante que o bloco de fluxo de MaxDegreeOfParallelism dados processe uma mensagem de cada vez. Definir essa propriedade para um valor maior que 1 permite que o bloco de fluxo de dados processe várias mensagens simultaneamente. Definir essa propriedade para DataflowBlockOptions.Unbounded permitir que o agendador de tarefas subjacente gerencie o grau máximo de simultaneidade.

Importante

Quando você especifica um grau máximo de paralelismo maior que 1, várias mensagens são processadas simultaneamente e, portanto, as mensagens podem não ser processadas na ordem em que são recebidas. A ordem em que as mensagens são saídas do bloco é, no entanto, a mesma em que são recebidas.

Como a MaxDegreeOfParallelism propriedade representa o grau máximo de paralelismo, o bloco de fluxo de dados pode ser executado com um grau menor de paralelismo do que o especificado. O bloco de fluxo de dados pode usar um grau menor de paralelismo para atender aos seus requisitos funcionais ou porque há falta de recursos disponíveis do sistema. Um bloco de fluxo de dados nunca escolhe mais paralelismo do que você especifica.

O valor da propriedade é exclusivo para cada objeto de bloco de fluxo de MaxDegreeOfParallelism dados. Por exemplo, se quatro objetos de bloco de fluxo de dados especificarem 1 para o grau máximo de paralelismo, todos os quatro objetos de bloco de fluxo de dados poderão ser executados em paralelo.

Para obter um exemplo que define o grau máximo de paralelismo para permitir que operações longas ocorram em paralelo, consulte Como especificar o grau de paralelismo em um bloco de fluxo de dados.

Especificando o número de mensagens por tarefa

Os tipos de bloco de fluxo de dados predefinidos usam tarefas para processar vários elementos de entrada. Isso ajuda a minimizar o número de objetos de tarefa necessários para processar dados, o que permite que os aplicativos sejam executados com mais eficiência. No entanto, quando as tarefas de um conjunto de blocos de fluxo de dados estão processando dados, as tarefas de outros blocos de fluxo de dados podem precisar aguardar o tempo de processamento enfileirando mensagens. Para permitir uma melhor equidade entre as tarefas de fluxo de dados, defina a MaxMessagesPerTask propriedade. Quando MaxMessagesPerTask é definido como DataflowBlockOptions.Unbounded, que é o padrão, a tarefa usada por um bloco de fluxo de dados processa quantas mensagens estiverem disponíveis. Quando MaxMessagesPerTask é definido como um valor diferente de Unbounded, o bloco de fluxo de dados processa, no máximo, esse número de mensagens por Task objeto. Embora a definição da propriedade possa aumentar a MaxMessagesPerTask equidade entre as tarefas, ela pode fazer com que o sistema crie mais tarefas do que as necessárias, o que pode diminuir o desempenho.

Ativando o cancelamento

O TPL fornece um mecanismo que permite que as tarefas coordenem o cancelamento de forma cooperativa. Para permitir que os blocos de fluxo de dados participem desse mecanismo de cancelamento, defina a CancellationToken propriedade. Quando esse CancellationToken objeto é definido para o estado cancelado, todos os blocos de fluxo de dados que monitoram esse token concluem a execução de seu item atual, mas não iniciam o processamento de itens subsequentes. Esses blocos de fluxo de dados também limpam todas as mensagens armazenadas em buffer, liberam conexões com qualquer bloco de origem e destino e fazem a transição para o estado cancelado. Ao fazer a transição para o estado cancelado, a Completion propriedade tem a propriedade definida como , a Canceledmenos que ocorra uma exceção durante o Status processamento. Nesse caso, Status está definido como Faulted.

Para obter um exemplo que demonstra como usar o cancelamento em um aplicativo do Windows Forms, consulte Como cancelar um bloco de fluxo de dados. Para obter mais informações sobre cancelamento no TPL, consulte Cancelamento de tarefas.

Especificando o comportamento ganancioso versus não ganancioso

Vários tipos de blocos de fluxo de dados de agrupamento podem operar no modo ganancioso ou não ganancioso . Por padrão, os tipos de bloco de fluxo de dados predefinidos operam no modo ganancioso.

Para tipos de bloco de junção, como JoinBlock<T1,T2>, o modo ganancioso significa que o bloco aceita imediatamente os dados, mesmo que os dados correspondentes com os quais se juntar ainda não estejam disponíveis. O modo não ganancioso significa que o bloco adia todas as mensagens recebidas até que uma esteja disponível em cada um dos seus alvos para concluir a junção. Se alguma das mensagens adiadas não estiver mais disponível, o bloco de ingresso libera todas as mensagens adiadas e reinicia o processo. Para a classe, o BatchBlock<T> comportamento ganancioso e não ganancioso é semelhante, exceto que, no modo não ganancioso, um BatchBlock<T> objeto adia todas as mensagens recebidas até que estejam disponíveis fontes distintas o suficiente para completar um lote.

Para especificar o modo não ganancioso para um bloco de fluxo de dados, defina Greedy como False. Para obter um exemplo que demonstra como usar o modo não ganancioso para permitir que vários blocos de junção compartilhem uma fonte de dados com mais eficiência, consulte Como usar o JoinBlock para ler dados de várias fontes.

Blocos de fluxo de dados personalizados

Embora a Biblioteca de Fluxo de Dados TPL forneça muitos tipos de bloco predefinidos, você pode criar tipos de bloco adicionais que executam um comportamento personalizado. Implemente as ISourceBlock<TOutput> interfaces ou ITargetBlock<TInput> diretamente ou use o Encapsulate método para criar um bloco complexo que encapsula o comportamento de tipos de bloco existentes. Para obter exemplos que mostram como implementar a funcionalidade de bloco de fluxo de dados personalizado, consulte Passo a passo: Criando um tipo de bloco de fluxo de dados personalizado.

Title Description
Como gravar e ler mensagens de um bloco de fluxo de dados Demonstra como escrever e ler mensagens de um BufferBlock<T> objeto.
Como implementar um padrão de fluxo de dados produtor-consumidor Descreve como usar o modelo de fluxo de dados para implementar um padrão produtor-consumidor, onde o produtor envia mensagens para um bloco de fluxo de dados e o consumidor lê mensagens desse bloco.
Como: Executar ação quando um bloco de fluxo de dados recebe dados Descreve como fornecer delegados para os tipos de bloco de fluxo de dados de execução, ActionBlock<TInput>, TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput>.
Passo a passo: Criando um pipeline de fluxo de dados Descreve como criar um pipeline de fluxo de dados que baixa texto da Web e executa operações nesse texto.
Como: Desvincular blocos de fluxo de dados Demonstra como usar o LinkTo método para desvincular um bloco de destino de sua origem depois que a fonte oferece uma mensagem para o destino.
Passo a passo: Usando o fluxo de dados em um aplicativo do Windows Forms Demonstra como criar uma rede de blocos de fluxo de dados que executam o processamento de imagem em um aplicativo Windows Forms.
Como: Cancelar um bloco de fluxo de dados Demonstra como usar o cancelamento em um aplicativo do Windows Forms.
Como: Usar JoinBlock para ler dados de várias fontes Explica como usar a classe para executar uma operação quando os dados estão disponíveis de várias fontes e como usar o JoinBlock<T1,T2> modo não ganancioso para permitir que vários blocos de junção compartilhem uma fonte de dados com mais eficiência.
Como: Especificar o grau de paralelismo em um bloco de fluxo de dados Descreve como definir a MaxDegreeOfParallelism propriedade para permitir que um bloco de fluxo de dados de execução processe mais de uma mensagem de cada vez.
Como especificar um Agendador de Tarefas em um bloco de fluxo de dados Demonstra como associar um agendador de tarefas específico quando você usa o fluxo de dados em seu aplicativo.
Passo a passo: Usando BatchBlock e BatchedJoinBlock para melhorar a eficiência Descreve como usar a BatchBlock<T> classe para melhorar a eficiência das operações de inserção de banco de dados e como usar a BatchedJoinBlock<T1,T2> classe para capturar os resultados e quaisquer exceções que ocorrem enquanto o programa lê de um banco de dados.
Passo a passo: Criando um tipo de bloco de fluxo de dados personalizado Demonstra duas maneiras de criar um tipo de bloco de fluxo de dados que implementa o comportamento personalizado.
Biblioteca paralela de tarefas (TPL) Apresenta o TPL, uma biblioteca que simplifica a programação paralela e simultânea em aplicativos .NET Framework.