Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
A TPL (Task Parallel Library) fornece componentes de fluxo de dados para ajudar a aumentar a robustez de aplicativos habilitados para simultaneidade. Esses componentes de fluxo de dados são coletivamente chamados de TPL Dataflow Library. Esse modelo de fluxo de dados promove a programação baseada em atores, fornecendo passagem de mensagens dentro do processo para tarefas de fluxo de dados de grão grosso e de pipelining. Os componentes de fluxo de dados se baseiam nos tipos e na infraestrutura de agendamento do TPL e se integram ao suporte às linguagens C#, Visual Basic e F# para programação assíncrona. Esses componentes de fluxo de dados são úteis quando você tem várias operações que devem se comunicar entre si de forma assíncrona ou quando deseja processar dados à medida que eles ficam disponíveis. Por exemplo, considere um aplicativo que processa dados de imagem de uma câmera web. Usando o modelo de fluxo de dados, o aplicativo pode processar quadros de imagem à medida que eles ficam disponíveis. Se o aplicativo aprimorar quadros de imagem, por exemplo, executando correção de luz ou redução de olhos vermelhos, você poderá criar um pipeline de componentes de fluxo de dados. Cada estágio do pipeline pode usar a funcionalidade de paralelismo mais grosseiro, como a funcionalidade fornecida pelo TPL, para transformar a imagem.
Este documento fornece uma visão geral da Biblioteca de Fluxo de Dados TPL. Ele descreve o modelo de programação, os tipos de bloco de fluxo de dados predefinidos e como configurar blocos de fluxo de dados para atender aos requisitos específicos de seus aplicativos.
Observação
A biblioteca de fluxo de dados TPL (o namespace System.Threading.Tasks.Dataflow) não é distribuída com o .NET. Para instalar o namespace System.Threading.Tasks.Dataflow no Visual Studio, abra o seu projeto, escolha Gerir Pacotes NuGet no menu Projeto e pesquise o pacote System.Threading.Tasks.Dataflow online. Como alternativa, para instalá-lo usando a CLI do .NET Core, execute dotnet add package System.Threading.Tasks.Dataflow.
Modelo de Programação
A TPL Dataflow Library fornece uma base para a passagem de mensagens e paralelização de aplicativos com uso intensivo de CPU e E/S que têm alta taxa de transferência e baixa latência. Ele também oferece controle explícito sobre como os dados são armazenados em buffer e se movem pelo sistema. Para entender melhor o modelo de programação de fluxo de dados, considere um aplicativo que carrega imagens do disco de forma assíncrona e cria uma composição dessas imagens. Os modelos de programação tradicionais normalmente exigem que você use retornos de chamada e objetos de sincronização, como bloqueios, para coordenar tarefas e acessar dados compartilhados. Usando o modelo de programação de fluxo de dados, você pode criar objetos de fluxo de dados que processam imagens à medida que são lidas do disco. No modelo de fluxo de dados, você declara como os dados são manipulados quando ficam disponíveis e também quaisquer dependências entre dados. Como o tempo de execução gerencia dependências entre dados, muitas vezes você pode evitar a necessidade de sincronizar o acesso aos dados compartilhados. Além disso, como as agendas de tempo de execução funcionam com base na chegada assíncrona de dados, o fluxo de dados pode melhorar a capacidade de resposta e a taxa de transferência gerenciando com eficiência os threads subjacentes. Para obter um exemplo que usa o modelo de programação de fluxo de dados para implementar o processamento de imagem em um aplicativo do Windows Forms, consulte Passo a passo: Usando o fluxo de dados em um aplicativo do Windows Forms.
Fontes e Alvos
A Biblioteca de Fluxo de Dados TPL consiste em blocos de fluxo de dados, que são estruturas de dados que armazenam em buffer e processam dados. O TPL define três tipos de blocos de fluxo de dados: blocos de origem, blocos de destinoe blocos propagadores. Um bloco de origem atua como uma fonte de dados e pode ser lido. Um bloco de destino atua como um recetor de dados e pode ser gravado. Um bloco propagador atua como um bloco de origem e um bloco de destino, e pode ser lido e gravado. O TPL define a interface System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> para representar fontes, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> para representar alvos e System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> para representar propagadores. IPropagatorBlock<TInput,TOutput> herda de ISourceBlock<TOutput>e ITargetBlock<TInput>.
A Biblioteca de Fluxo de Dados TPL fornece vários tipos de bloco de fluxo de dados predefinidos que implementam as interfaces ISourceBlock<TOutput>, ITargetBlock<TInput>e IPropagatorBlock<TInput,TOutput>. Esses tipos de bloco de fluxo de dados são descritos neste documento na seção Tipos de bloco de fluxo de dados predefinidos.
Conectando blocos
Você pode conectar blocos de fluxo de dados para formar pipelines, que são sequências lineares de blocos de fluxo de dados, ou redes , que são gráficos de blocos de fluxo de dados. Um pipeline é uma forma de rede. Em um pipeline ou rede, as fontes propagam dados de forma assíncrona para destinos à medida que esses dados ficam disponíveis. O método ISourceBlock<TOutput>.LinkTo vincula um bloco de fluxo de dados de origem a um bloco de destino. Uma fonte pode ser vinculada a zero ou mais alvos; os alvos podem ser vinculados a partir de zero ou mais fontes. Você pode adicionar ou remover blocos de fluxo de dados de ou para um pipeline ou rede simultaneamente. Os tipos de blocos de fluxo de dados predefinidos gerem todos os aspetos de segurança de threads na ligação e desligação.
Para obter um exemplo que conecta blocos de fluxo de dados para formar um pipeline básico, consulte Passo a passo: Criando um pipeline de fluxo de dados. Para obter um exemplo que conecta blocos de fluxo de dados para formar uma rede mais complexa, consulte Passo a passo: Usando o fluxo de dados em um aplicativo do Windows Forms. Para obter um exemplo que desvincula um destino de uma fonte depois que a fonte oferece uma mensagem ao destino, consulte Como desvincular blocos de fluxo de dados.
Filtragem
Quando você chama o método ISourceBlock<TOutput>.LinkTo para vincular uma origem a um destino, você pode fornecer um delegado que determina se o bloco de destino aceita ou rejeita uma mensagem com base no valor dessa mensagem. Esse mecanismo de filtragem é uma maneira útil de garantir que um bloco de fluxo de dados receba apenas determinados valores. Para a maioria dos tipos de bloco de fluxo de dados predefinidos, se um bloco de origem estiver conectado a vários blocos de destino, quando um bloco de destino rejeitar uma mensagem, a origem oferecerá essa mensagem ao próximo destino. A ordem em que uma fonte oferece mensagens aos alvos é definida pela fonte e pode variar de acordo com o tipo da fonte. A maioria dos tipos de bloco de origem deixa de oferecer uma mensagem depois que um destino aceita essa mensagem. Uma exceção a essa regra é a classe BroadcastBlock<T>, que oferece cada mensagem a todos os destinos, mesmo que alguns destinos rejeitem a mensagem. Para obter um exemplo que usa filtragem para processar apenas determinadas mensagens, consulte Passo a passo: Usando o fluxo de dados em um aplicativo do Windows Forms.
Importante
Como cada tipo de bloco de fluxo de dados de origem predefinido garante que as mensagens sejam propagadas na ordem em que são recebidas, cada mensagem deve ser lida do bloco de origem antes que o bloco de origem possa processar a próxima mensagem. Portanto, ao usar a filtragem para conectar vários destinos a uma origem, certifique-se de que pelo menos um bloco de destino receba cada mensagem. Caso contrário, seu aplicativo pode ficar bloqueado.
Passagem de mensagens
O modelo de programação de fluxo de dados está relacionado ao conceito de passagem de mensagens, onde componentes independentes de um programa se comunicam entre eles enviando mensagens. Uma maneira de propagar mensagens entre os componentes do aplicativo é chamar os métodos Post (síncrono) e SendAsync (assíncrono) para enviar mensagens aos blocos de fluxo de dados de destino e os métodos Receive, ReceiveAsynce TryReceive para receber mensagens de blocos de origem. Você pode combinar esses métodos com pipelines ou redes de fluxo de dados enviando dados de entrada para o nó principal (um bloco de destino) e recebendo dados de saída do nó terminal do pipeline ou dos nós terminais da rede (um ou mais blocos de origem). Você também pode usar o método Choose para ler a partir da primeira das fontes fornecidas que tem dados disponíveis e executar ações nesses dados.
Os blocos de origem oferecem dados aos blocos de destino chamando o método ITargetBlock<TInput>.OfferMessage. O bloco de destino responde a uma mensagem oferecida de três maneiras: pode aceitar a mensagem, recusar a mensagem ou adiá-la. Quando o destino aceita a mensagem, o método OfferMessage retorna Accepted. Quando o destino recusa a mensagem, o método OfferMessage retorna Declined. Quando o destino exige que não receba mais mensagens da origem, OfferMessage retorna DecliningPermanently. Os tipos de bloco de origem predefinidos não enviam mensagens para alvos ligados após a recepção desse valor de retorno e desvinculam-se automaticamente desses alvos.
Quando um bloco de destino adia a mensagem para uso posterior, o método OfferMessage retorna Postponed. Um bloco de destino que adia uma mensagem pode mais tarde chamar o método ISourceBlock<TOutput>.ReserveMessage para tentar reservar a mensagem oferecida. Neste ponto, a mensagem ainda está disponível e pode ser usada pelo bloco alvo, ou a mensagem foi usada por outro alvo. Quando o bloco de destino mais tarde requer a mensagem ou não precisa mais da mensagem, ele chama o método ISourceBlock<TOutput>.ConsumeMessage ou ReleaseReservation, respectivamente. A reserva de mensagens é normalmente usada pelos tipos de bloco de fluxo de dados que operam no modo não ganancioso. O modo não ganancioso é explicado mais adiante neste documento. Em vez de reservar uma mensagem adiada, um bloco de destino também pode usar o método ISourceBlock<TOutput>.ConsumeMessage para tentar consumir diretamente a mensagem adiada.
Conclusão do bloco de fluxo de dados
Os blocos de fluxo de dados também suportam o conceito de conclusão. Um bloco de fluxo de dados que está no estado concluído não executa nenhum trabalho adicional. Cada bloco de fluxo de dados tem um objeto System.Threading.Tasks.Task associado, conhecido como de tarefa de conclusão de, que representa o status de conclusão do bloco. Como podes esperar que um objeto Task termine, utilizando tarefas de conclusão, podes aguardar a conclusão de um ou mais nós terminais de uma rede de processamento de dados. A interface IDataflowBlock define o método Complete, que informa o bloco de fluxo de dados de uma solicitação para que ele seja concluído, e a propriedade Completion, que retorna a tarefa de conclusão para o bloco de fluxo de dados. Tanto ISourceBlock<TOutput> quanto ITargetBlock<TInput> herdam a interface IDataflowBlock.
Há duas maneiras de determinar se um bloco de fluxo de dados foi concluído sem erros, encontrou um ou mais erros ou foi cancelado. A primeira maneira é chamar o método Task.Wait na tarefa de conclusão em um bloco try-catch (Try-Catch no Visual Basic). O exemplo a seguir cria um objeto ActionBlock<TInput> que lança ArgumentOutOfRangeException se seu valor de entrada for menor que zero.
AggregateException é lançado quando este exemplo chama Wait na tarefa de finalização. O ArgumentOutOfRangeException é acessado através da propriedade InnerExceptions do objeto AggregateException.
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine($"n = {n}");
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
return true;
});
}
/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
Este exemplo demonstra o caso em que uma exceção não é tratada no delegado de um bloco de fluxo de dados de execução. Recomenda-se lidar com exceções nos corpos desses blocos. No entanto, se você não conseguir fazer isso, o bloco se comporta como se tivesse sido cancelado e não processa as mensagens recebidas.
Quando um bloco de fluxo de dados é cancelado explicitamente, o objeto AggregateException contém OperationCanceledException na propriedade InnerExceptions. Para obter mais informações sobre o cancelamento do fluxo de dados, consulte a seção Habilitando o Cancelamento.
A segunda maneira de determinar o status de conclusão de um bloco de fluxo de dados é usar uma continuação da tarefa de conclusão ou usar os recursos de linguagem assíncrona de C# e Visual Basic para aguardar assincronamente a tarefa de conclusão. O delegado que você fornece ao método Task.ContinueWith usa um objeto Task que representa a tarefa antecedente. No caso da propriedade Completion, o delegado para a continuação assume a própria tarefa de conclusão. O exemplo a seguir é semelhante ao anterior, exceto que ele também usa o método ContinueWith para criar uma tarefa de continuação que imprime o status da operação de fluxo de dados geral.
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine($"n = {n}");
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
return true;
});
}
/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Create a continuation task that prints the overall
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' The status of the completion task is 'Faulted'.
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
Você também pode usar propriedades como IsCanceled no corpo da tarefa de continuação para determinar informações adicionais sobre o status de conclusão de um bloco de fluxo de dados. Para obter mais informações sobre tarefas de continuação e como elas se relacionam com o cancelamento e o tratamento de erros, consulte Encadeamento de Tarefas Usando Tarefas de Continuação, Cancelamento de Tarefas, e Tratamento de Exceções.
Tipos de bloco de fluxo de dados predefinidos
A Biblioteca de Fluxo de Dados TPL fornece vários tipos de blocos de fluxo de dados predefinidos. Esses tipos são divididos em três categorias: blocos de buffer, blocos de execuçãoe blocos de agrupamento. As seções a seguir descrevem os tipos de bloco que compõem essas categorias.
Blocos de buffer
Os blocos de buffer armazenam dados para uso pelos consumidores de dados. A Biblioteca de Fluxo de Dados TPL fornece três tipos de blocos de buffer: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>e System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.
BufferBlock<T>
A classe BufferBlock<T> representa uma estrutura de mensagens assíncrona de uso geral. A classe armazena uma fila FIFO (first in, first out) de mensagens que podem ser escritas por múltiplas fontes ou lidas por múltiplos destinos. Quando um destino recebe uma mensagem de um objeto BufferBlock<T>, essa mensagem é removida da fila de mensagens. Portanto, embora um objeto BufferBlock<T> possa ter vários destinos, apenas um destino receberá cada mensagem. A classe BufferBlock<T> é útil quando você deseja passar várias mensagens para outro componente, e esse componente deve receber cada mensagem.
O exemplo básico a seguir posta vários valores Int32 para um objeto BufferBlock<T> e, em seguida, lê esses valores de volta desse objeto.
// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
/* Output:
0
1
2
*/
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()
' Post several messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
' Output:
' 0
' 1
' 2
'
Para obter um exemplo completo que demonstra como escrever e ler mensagens de um objeto BufferBlock<T>, consulte Como gravar e ler mensagens de um bloco de fluxo de dados.
BroadcastBlock<T>
A classe BroadcastBlock<T> é útil quando você deve passar várias mensagens para outro componente, mas esse componente precisa apenas do valor mais recente. Essa classe também é útil quando você deseja transmitir uma mensagem para vários componentes.
O exemplo básico a seguir posta um valor Double para um objeto BroadcastBlock<T> e, em seguida, lê esse valor de volta desse objeto várias vezes. Como os valores não são removidos dos objetos depois de BroadcastBlock<T> serem lidos, o mesmo valor está disponível sempre.
// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);
// Post a message to the block.
broadcastBlock.Post(Math.PI);
// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(broadcastBlock.Receive());
}
/* Output:
3.14159265358979
3.14159265358979
3.14159265358979
*/
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)
' Post a message to the block.
broadcastBlock.Post(Math.PI)
' Receive the messages back from the block several times.
For i As Integer = 0 To 2
Console.WriteLine(broadcastBlock.Receive())
Next i
' Output:
' 3.14159265358979
' 3.14159265358979
' 3.14159265358979
'
Para obter um exemplo completo que demonstra como usar o BroadcastBlock<T> para transmitir uma mensagem para vários blocos de destino, consulte Como especificar um agendador de tarefas em um bloco de fluxo de dados.
WriteOnceBlock<T>
A classe WriteOnceBlock<T> é semelhante à classe BroadcastBlock<T>, exceto que um objeto WriteOnceBlock<T> pode ser gravado apenas uma vez. Você pode pensar em WriteOnceBlock<T> como sendo semelhante à palavra-chave C# readonly (ReadOnly no Visual Basic), exceto que um objeto WriteOnceBlock<T> torna-se imutável depois de receber um valor em vez de na construção. Como a BroadcastBlock<T> classe, quando um destino recebe uma mensagem de um WriteOnceBlock<T> objeto, essa mensagem não é removida desse objeto. Portanto, vários destinos recebem uma cópia da mensagem. A classe WriteOnceBlock<T> é útil quando você deseja propagar apenas a primeira de várias mensagens.
O exemplo básico seguinte envia vários valores de String para um objeto WriteOnceBlock<T> e, em seguida, lê o valor de volta a partir desse objeto. Como um objeto WriteOnceBlock<T> pode ser gravado apenas uma vez, depois que um objeto WriteOnceBlock<T> recebe uma mensagem, ele descarta as mensagens subsequentes.
// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);
// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
() => writeOnceBlock.Post("Message 1"),
() => writeOnceBlock.Post("Message 2"),
() => writeOnceBlock.Post("Message 3"));
// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());
/* Sample output:
Message 2
*/
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)
' Post several messages to the block in parallel. The first
' message to be received is written to the block.
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))
' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())
' Sample output:
' Message 2
'
Para obter um exemplo completo que demonstra como usar WriteOnceBlock<T> para receber o valor da primeira operação concluída, consulte Como desvincular blocos de fluxo de dados.
Blocos de Execução
Os blocos de execução chamam um delegado especificado pelo utilizador para cada parte de dados recebida. A Biblioteca de Fluxo de Dados TPL fornece três tipos de blocos de execução: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>e System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.
ActionBlock<T>
A classe ActionBlock<TInput> é um bloco de destino que chama um delegado quando ele recebe dados. Pense em um objeto ActionBlock<TInput> como um delegado que é executado de forma assíncrona quando os dados ficam disponíveis. O delegado que você fornece a um objeto ActionBlock<TInput> pode ser do tipo Action<T> ou do tipo System.Func<TInput, Task>. Quando você usa um objeto ActionBlock<TInput> com Action<T>, o processamento de cada elemento de entrada é considerado concluído quando o delegado retorna. Quando você usa um objeto ActionBlock<TInput> com System.Func<TInput, Task>, o processamento de cada elemento de entrada é considerado concluído somente quando o objeto de Task retornado é concluído. Usando esses dois mecanismos, você pode usar ActionBlock<TInput> para processamento síncrono e assíncrono de cada elemento de entrada.
O exemplo básico a seguir posta vários valores de Int32 para um objeto ActionBlock<TInput>. O objeto ActionBlock<TInput> imprime esses valores no console. Este exemplo define o bloco para o estado concluído e aguarda a conclusão de todas as tarefas de fluxo de dados.
// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
actionBlock.Post(i * 10);
}
// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();
/* Output:
0
10
20
*/
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))
' Post several messages to the block.
For i As Integer = 0 To 2
actionBlock.Post(i * 10)
Next i
' Set the block to the completed state and wait for all
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()
' Output:
' 0
' 10
' 20
'
Para obter exemplos completos que demonstram como usar delegados com a classe ActionBlock<TInput>, consulte Como: Executar ação quando um bloco de fluxo de dados recebe dados.
TransformBlock<TInput, TOutput>
A classe TransformBlock<TInput,TOutput> se assemelha à classe ActionBlock<TInput>, exceto que ela atua como fonte e como destino. O delegado que passa para um objeto TransformBlock<TInput,TOutput> devolve um valor do tipo TOutput. O delegado que você fornece a um objeto TransformBlock<TInput,TOutput> pode ser do tipo System.Func<TInput, TOutput> ou do tipo System.Func<TInput, Task<TOutput>>. Quando você usa um objeto TransformBlock<TInput,TOutput> com System.Func<TInput, TOutput>, o processamento de cada elemento de entrada é considerado concluído quando o delegado retorna. Quando você usa um objeto TransformBlock<TInput,TOutput> usado com System.Func<TInput, Task<TOutput>>, o processamento de cada elemento de entrada é considerado concluído somente quando o objeto de Task<TResult> retornado é concluído. Assim como ActionBlock<TInput>, usando esses dois mecanismos, você pode usar TransformBlock<TInput,TOutput> para processamento síncrono e assíncrono de cada elemento de entrada.
O exemplo básico a seguir cria um objeto TransformBlock<TInput,TOutput> que calcula a raiz quadrada de sua entrada. O objeto TransformBlock<TInput,TOutput> usa valores Int32 como entrada e produz valores Double como saída.
// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));
// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);
// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(transformBlock.Receive());
}
/* Output:
3.16227766016838
4.47213595499958
5.47722557505166
*/
' Create a TransformBlock<int, double> object that
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))
' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)
' Read the output messages from the block.
For i As Integer = 0 To 2
Console.WriteLine(transformBlock.Receive())
Next i
' Output:
' 3.16227766016838
' 4.47213595499958
' 5.47722557505166
'
Para obter exemplos completos que usam TransformBlock<TInput,TOutput> em uma rede de blocos de fluxo de dados que executa o processamento de imagem em um aplicativo do Windows Forms, consulte Passo a passo: Usando o fluxo de dados em um aplicativo do Windows Forms.
TransformManyBlock<TInput, TOutput>
A classe TransformManyBlock<TInput,TOutput> é semelhante à classe TransformBlock<TInput,TOutput>, exceto que TransformManyBlock<TInput,TOutput> produz zero ou mais valores de saída para cada valor de entrada, em vez de apenas um valor de saída para cada valor de entrada. O delegado que você fornece a um objeto TransformManyBlock<TInput,TOutput> pode ser do tipo System.Func<TInput, IEnumerable<TOutput>> ou do tipo System.Func<TInput, Task<IEnumerable<TOutput>>>. Quando você usa um objeto TransformManyBlock<TInput,TOutput> com System.Func<TInput, IEnumerable<TOutput>>, o processamento de cada elemento de entrada é considerado concluído quando o delegado retorna. Quando você usa um objeto TransformManyBlock<TInput,TOutput> com System.Func<TInput, Task<IEnumerable<TOutput>>>, o processamento de cada elemento de entrada é considerado concluído somente quando o objeto System.Threading.Tasks.Task<IEnumerable<TOutput>> retornado é concluído.
O exemplo básico a seguir cria um objeto TransformManyBlock<TInput,TOutput> que divide cadeias de caracteres em suas sequências de caracteres individuais. O objeto TransformManyBlock<TInput,TOutput> usa valores String como entrada e produz valores Char como saída.
// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
s => s.ToCharArray());
// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");
// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
Console.WriteLine(transformManyBlock.Receive());
}
/* Output:
H
e
l
l
o
W
o
r
l
d
*/
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())
' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")
' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
Console.WriteLine(transformManyBlock.Receive())
Next i
' Output:
' H
' e
' l
' l
' o
' W
' o
' r
' l
' d
'
Para obter exemplos completos que usam TransformManyBlock<TInput,TOutput> para produzir várias saídas independentes para cada entrada em um pipeline de fluxo de dados, consulte Passo a passo: Criando um pipeline de fluxo de dados.
Grau de paralelismo
Cada objeto ActionBlock<TInput>, TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput> armazena mensagens de entrada em buffer até que o bloco esteja pronto para processá-las. Por padrão, essas classes processam mensagens na ordem em que são recebidas, uma mensagem de cada vez. Você também pode especificar o grau de paralelismo para permitir que ActionBlock<TInput>, TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput> objetos processem várias mensagens simultaneamente. Para obter mais informações sobre execução simultânea, consulte a seção Especificando o grau de paralelismo mais adiante neste documento. Para obter um exemplo que define o grau de paralelismo para permitir que um bloco de fluxo de dados de execução processe mais de uma mensagem de cada vez, consulte Como especificar o grau de paralelismo em um bloco de fluxo de dados.
Resumo dos tipos de delegados
A tabela a seguir resume os tipos de delegados que você pode fornecer aos objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput>. Esta tabela também especifica se o tipo de delegado opera de forma síncrona ou assíncrona.
| Tipo | Tipo de delegado síncrono | Tipo de delegado assíncrono |
|---|---|---|
| ActionBlock<TInput> | System.Action |
System.Func<TInput, Task> |
| TransformBlock<TInput,TOutput> | System.Func<TInput, TOutput> |
System.Func<TInput, Task<TOutput>> |
| TransformManyBlock<TInput,TOutput> | System.Func<TInput, IEnumerable<TOutput>> |
System.Func<TInput, Task<IEnumerable<TOutput>>> |
Você também pode usar expressões lambda ao trabalhar com tipos de bloco de execução. Para obter um exemplo que mostra como usar uma expressão lambda com um bloco de execução, consulte Como: Executar ação quando um bloco de fluxo de dados recebe dados.
Agrupamento de Blocos
Os blocos de agrupamento combinam dados de uma ou mais fontes e sob várias restrições. A Biblioteca de Fluxo de Dados TPL fornece três tipos de blocos de junção: BatchBlock<T>, JoinBlock<T1,T2>e BatchedJoinBlock<T1,T2>.
Bloco T<>
A classe BatchBlock<T> combina conjuntos de dados de entrada, que são conhecidos como lotes, em matrizes de dados de saída. Você especifica o tamanho de cada lote ao criar um objeto BatchBlock<T>. Quando o objeto BatchBlock<T> recebe a contagem especificada de elementos de entrada, ele propaga de forma assíncrona uma matriz que contém esses elementos. Se um BatchBlock<T> objeto estiver definido para o estado concluído, mas não contiver elementos suficientes para formar um lote, ele propagará uma matriz final que contém os elementos de entrada restantes.
A classe BatchBlock<T> opera no modo ganancioso ou não ganancioso. No modo ganancioso, que é o padrão, um objeto BatchBlock<T> aceita todas as mensagens que lhe são oferecidas e propaga uma matriz depois de receber a contagem especificada de elementos. No modo não ganancioso, um objeto BatchBlock<T> adia todas as mensagens recebidas até que fontes suficientes tenham oferecido mensagens ao bloco para formar um lote. O modo ganancioso normalmente tem um desempenho melhor do que o modo não ganancioso porque requer menos sobrecarga de processamento. No entanto, você pode usar o modo não ganancioso quando você deve coordenar o consumo de várias fontes de forma atômica. Especifique o modo não ganancioso definindo Greedy como False no parâmetro dataflowBlockOptions no construtor BatchBlock<T>.
O exemplo básico a seguir posta vários valores Int32 para um objeto BatchBlock<T> que contém dez elementos em um lote. Para garantir que todos os valores se propaguem para fora do BatchBlock<T>, este exemplo chama o método Complete. O método Complete define o objeto BatchBlock<T> para o estado concluído e, portanto, o objeto BatchBlock<T> propaga todos os elementos restantes como um lote final.
// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);
// Post several values to the block.
for (int i = 0; i < 13; i++)
{
batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();
// Print the sum of both batches.
Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");
Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");
/* Output:
The sum of the elements in batch 1 is 45.
The sum of the elements in batch 2 is 33.
*/
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)
' Post several values to the block.
For i As Integer = 0 To 12
batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()
' Print the sum of both batches.
Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())
Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())
' Output:
' The sum of the elements in batch 1 is 45.
' The sum of the elements in batch 2 is 33.
'
Para obter um exemplo completo que usa BatchBlock<T> para melhorar a eficiência das operações de inserção de banco de dados, consulte passo a passo: Usando BatchBlock e BatchedJoinBlock para melhorar a eficiência.
JoinBlock<T1, T2, ...>
As classes JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> coletam elementos de entrada e propagam objetos System.Tuple<T1,T2> ou System.Tuple<T1,T2,T3> que contêm esses elementos. As JoinBlock<T1,T2> classes e JoinBlock<T1,T2,T3> não herdam de ITargetBlock<TInput>. Em vez disso, eles fornecem propriedades, Target1, Target2e Target3, que implementam ITargetBlock<TInput>.
Como BatchBlock<T>, JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> operam em modo ganancioso ou não ganancioso. No modo ganancioso, que é o padrão, um objeto JoinBlock<T1,T2> ou JoinBlock<T1,T2,T3> aceita todas as mensagens que lhe são oferecidas e propaga uma tupla depois que cada um de seus alvos recebe pelo menos uma mensagem. No modo não ganancioso, um objeto JoinBlock<T1,T2> ou JoinBlock<T1,T2,T3> adia todas as mensagens recebidas até que todos os alvos tenham recebido os dados necessários para criar uma tupla. Neste ponto, o bloco envolve-se num protocolo de confirmação em duas fases para recuperar atomicamente todos os itens necessários das fontes. Este adiamento permite que outra entidade consuma os dados entretanto, para permitir que o sistema global avance para a frente.
O exemplo básico a seguir demonstra um caso em que um objeto JoinBlock<T1,T2,T3> requer vários dados para calcular um valor. Este exemplo cria um objeto JoinBlock<T1,T2,T3> que requer dois valores Int32 e um valor Char para executar uma operação aritmética.
// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();
// Post two values to each target of the join.
joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);
joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);
joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');
// Receive each group of values and apply the operator part
// to the number parts.
for (int i = 0; i < 2; i++)
{
var data = joinBlock.Receive();
switch (data.Item3)
{
case '+':
Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
break;
case '-':
Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
break;
default:
Console.WriteLine($"Unknown operator '{data.Item3}'.");
break;
}
}
/* Output:
3 + 5 = 8
6 - 4 = 2
*/
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()
' Post two values to each target of the join.
joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)
joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)
joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)
' Receive each group of values and apply the operator part
' to the number parts.
For i As Integer = 0 To 1
Dim data = joinBlock.Receive()
Select Case data.Item3
Case "+"c
Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
Case "-"c
Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
Case Else
Console.WriteLine("Unknown operator '{0}'.", data.Item3)
End Select
Next i
' Output:
' 3 + 5 = 8
' 6 - 4 = 2
'
Para obter um exemplo completo que usa objetos JoinBlock<T1,T2> no modo não ganancioso para compartilhar cooperativamente um recurso, consulte Como usar o JoinBlock para ler dados de várias fontes.
BatchedJoinBlock<T1, T2, ...>
As classes BatchedJoinBlock<T1,T2> e BatchedJoinBlock<T1,T2,T3> coletam lotes de elementos de entrada e propagam objetos System.Tuple(IList(T1), IList(T2)) ou System.Tuple(IList(T1), IList(T2), IList(T3)) que contêm esses elementos. Pense em BatchedJoinBlock<T1,T2> como uma combinação de BatchBlock<T> e JoinBlock<T1,T2>. Especifique o tamanho de cada lote ao criar um objeto BatchedJoinBlock<T1,T2>.
BatchedJoinBlock<T1,T2> também fornece propriedades, Target1 e Target2, que implementam ITargetBlock<TInput>. Quando a contagem especificada de elementos de entrada é recebida de todos os destinos, o objeto BatchedJoinBlock<T1,T2> propaga de forma assíncrona um objeto System.Tuple(IList(T1), IList(T2)) que contém esses elementos.
O exemplo básico a seguir cria um objeto BatchedJoinBlock<T1,T2> que contém resultados, valores Int32 e erros que são objetos Exception. Este exemplo executa várias operações e grava resultados na propriedade Target1 e erros na propriedade Target2 do objeto BatchedJoinBlock<T1,T2>. Como a contagem de operações bem-sucedidas e com falha é desconhecida antecipadamente, os objetos IList<T> permitem que cada destino receba zero ou mais valores.
// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
if (n < 0)
throw new ArgumentOutOfRangeException();
return n;
};
// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);
// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
try
{
// Post the result of the worker to the
// first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i));
}
catch (ArgumentOutOfRangeException e)
{
// If an error occurred, post the Exception to the
// second target of the block.
batchedJoinBlock.Target2.Post(e);
}
}
// Read the results from the block.
var results = batchedJoinBlock.Receive();
// Print the results to the console.
// Print the results.
foreach (int n in results.Item1)
{
Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
Console.WriteLine(e.Message);
}
/* Output:
5
6
13
55
0
Specified argument was out of the range of valid values.
Specified argument was out of the range of valid values.
*/
' For demonstration, create a Func<int, int> that
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
Return n
End Function
' Create a BatchedJoinBlock<int, Exception> object that holds
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)
' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
Try
' Post the result of the worker to the
' first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i))
Catch e As ArgumentOutOfRangeException
' If an error occurred, post the Exception to the
' second target of the block.
batchedJoinBlock.Target2.Post(e)
End Try
Next i
' Read the results from the block.
Dim results = batchedJoinBlock.Receive()
' Print the results to the console.
' Print the results.
For Each n As Integer In results.Item1
Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
Console.WriteLine(e.Message)
Next e
' Output:
' 5
' 6
' 13
' 55
' 0
' Specified argument was out of the range of valid values.
' Specified argument was out of the range of valid values.
'
Para obter um exemplo completo que usa BatchedJoinBlock<T1,T2> para capturar os resultados e quaisquer exceções que ocorram enquanto o programa lê de um banco de dados, consulte Passo a passo: Usando BatchBlock e BatchedJoinBlock para melhorar a eficiência.
Configurando o comportamento do bloco de fluxo de dados
Você pode habilitar opções adicionais fornecendo um objeto System.Threading.Tasks.Dataflow.DataflowBlockOptions para o construtor de tipos de bloco de fluxo de dados. Essas opções controlam o comportamento, como o agendador que gerencia a tarefa subjacente e o grau de paralelismo. O DataflowBlockOptions também tem tipos derivados que especificam o comportamento que é específico para determinados tipos de bloco de fluxo de dados. A tabela a seguir resume qual tipo de opção está associado a cada tipo de bloco de fluxo de dados.
As seções a seguir fornecem informações adicionais sobre os tipos importantes de opções de bloco de fluxo de dados disponíveis por meio das classes System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionse System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.
Especificando o Agendador de Tarefas
Cada bloco de fluxo de dados predefinido usa o mecanismo de agendamento de tarefas TPL para executar atividades como propagar dados para um destino, receber dados de uma fonte e executar delegados definidos pelo usuário quando os dados ficam disponíveis. TaskScheduler é uma classe abstrata que representa um agendador de tarefas que enfileira tarefas em threads. O agendador de tarefas padrão, Default, usa a classe ThreadPool para enfileirar e executar trabalho. Você pode substituir o agendador de tarefas padrão definindo a propriedade TaskScheduler ao construir um objeto de bloco de fluxo de dados.
Quando o mesmo agendador de tarefas gerencia vários blocos de fluxo de dados, ele pode impor políticas entre eles. Por exemplo, se vários blocos de fluxo de dados forem configurados para direcionar o agendador exclusivo do mesmo objeto ConcurrentExclusiveSchedulerPair, todo o trabalho executado nesses blocos será serializado. Da mesma forma, se esses blocos estiverem configurados para direcionar o agendador simultâneo do mesmo objeto ConcurrentExclusiveSchedulerPair e esse agendador estiver configurado para ter um nível máximo de simultaneidade, todo o trabalho desses blocos será limitado a esse número de operações simultâneas. Para obter um exemplo que usa a classe ConcurrentExclusiveSchedulerPair para permitir que as operações de leitura ocorram em paralelo, mas as operações de gravação ocorram de forma exclusiva em relação a todas as outras operações, consulte Como especificar um agendador de tarefas em um bloco de fluxo de dados. Para obter mais informações sobre agendadores de tarefas na TPL, consulte o tópico da classe TaskScheduler.
Especificando o grau de paralelismo
Por padrão, os três tipos de bloco de execução que a Biblioteca de Fluxo de Dados TPL fornece, ActionBlock<TInput>, TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput>, processam uma mensagem de cada vez. Esses tipos de bloco de fluxo de dados também processam mensagens na ordem em que são recebidas. Para permitir que esses blocos de fluxo de dados processem mensagens simultaneamente, defina a propriedade ExecutionDataflowBlockOptions.MaxDegreeOfParallelism ao construir o objeto de bloco de fluxo de dados.
O valor padrão de MaxDegreeOfParallelism é 1, o que garante que o bloco de fluxo de dados processe uma mensagem de cada vez. Definir essa propriedade para um valor maior que 1 permite que o bloco de fluxo de dados processe várias mensagens simultaneamente. Definir essa propriedade como DataflowBlockOptions.Unbounded permite que o agendador de tarefas subjacente gerencie o grau máximo de simultaneidade.
Importante
Quando você especifica um grau máximo de paralelismo maior que 1, várias mensagens são processadas simultaneamente e, portanto, as mensagens podem não ser processadas na ordem em que são recebidas. A ordem em que as mensagens são saídas do bloco é, no entanto, a mesma em que são recebidas.
Como a propriedade MaxDegreeOfParallelism representa o grau máximo de paralelismo, o bloco de fluxo de dados pode ser executado com um grau menor de paralelismo do que o especificado. O bloco de fluxo de dados pode usar um grau menor de paralelismo para atender aos seus requisitos funcionais ou porque há falta de recursos disponíveis do sistema. Um bloco de fluxo de dados nunca escolhe mais paralelismo do que você especifica.
O valor da propriedade MaxDegreeOfParallelism é exclusivo para cada objeto de bloco de fluxo de dados. Por exemplo, se quatro objetos de bloco de fluxo de dados especificarem 1 para o grau máximo de paralelismo, todos os quatro objetos de bloco de fluxo de dados poderão ser executados em paralelo.
Para obter um exemplo que define o grau máximo de paralelismo para permitir que operações longas ocorram em paralelo, consulte Como especificar o grau de paralelismo em um bloco de fluxo de dados.
Especificando o número de mensagens por tarefa
Os tipos de bloco de fluxo de dados predefinidos usam tarefas para processar vários elementos de entrada. Isso ajuda a minimizar o número de objetos de tarefa necessários para processar dados, o que permite que os aplicativos sejam executados com mais eficiência. No entanto, quando as tarefas de um conjunto de blocos de fluxo de dados estão processando dados, as tarefas de outros blocos de fluxo de dados podem precisar aguardar o tempo de processamento enfileirando mensagens. Para permitir uma melhor equidade entre as tarefas de fluxo de dados, defina a propriedade MaxMessagesPerTask. Quando MaxMessagesPerTask é definido como DataflowBlockOptions.Unbounded, que é o padrão, a tarefa usada por um bloco de fluxo de dados processa quantas mensagens estiverem disponíveis. Quando MaxMessagesPerTask é definido como um valor diferente de Unbounded, o bloco de fluxo de dados processa, no máximo, esse número de mensagens por objeto Task. Embora a definição da propriedade MaxMessagesPerTask possa aumentar a equidade entre as tarefas, ela pode fazer com que o sistema crie mais tarefas do que as necessárias, o que pode diminuir o desempenho.
Ativando o cancelamento
A TPL fornece um mecanismo que permite às tarefas coordenarem o cancelamento de maneira cooperativa. Para permitir que os blocos de fluxo de dados participem desse mecanismo de cancelamento, defina a propriedade CancellationToken. Quando esse CancellationToken objeto é definido para o estado cancelado, todos os blocos de fluxo de dados que monitoram esse token concluem a execução de seu item atual, mas não iniciam o processamento de itens subsequentes. Esses blocos de fluxo de dados também limpam todas as mensagens armazenadas em buffer, liberam conexões com qualquer bloco de origem e destino e fazem a transição para o estado cancelado. Ao fazer a transição para o estado cancelado, a propriedade Completion tem a propriedade Status definida como Canceled, a menos que ocorra uma exceção durante o processamento. Nesse caso, Status está definido como Faulted.
Para obter um exemplo que demonstra como usar o cancelamento em um aplicativo do Windows Forms, consulte Como cancelar um bloco de fluxo de dados. Para obter mais informações sobre o cancelamento no TPL, consulte Cancelamento de Tarefas.
Especificando o comportamento ganancioso versus não ganancioso
Vários tipos de bloco de agrupamento de fluxo de dados podem operar em modo ganancioso ou o modo não ganancioso. Por padrão, os tipos de bloco de fluxo de dados predefinidos operam no modo ganancioso.
Para tipos de bloco de junção, como JoinBlock<T1,T2>, o modo ganancioso significa que o bloco aceita imediatamente os dados, mesmo que os dados correspondentes com os quais ingressar ainda não estejam disponíveis. O modo não ganancioso significa que o bloco adia todas as mensagens recebidas até que uma esteja disponível em cada um dos seus alvos para concluir a junção. Se alguma das mensagens adiadas não estiver mais disponível, o bloco de ingresso libera todas as mensagens adiadas e reinicia o processo. Para a classe BatchBlock<T>, o comportamento ganancioso e não ganancioso é semelhante, exceto que, no modo não ganancioso, um objeto BatchBlock<T> adia todas as mensagens recebidas até que estejam disponíveis fontes distintas o suficiente para completar um lote.
Para especificar o modo não ganancioso para um bloco de fluxo de dados, defina Greedy como False. Para obter um exemplo que demonstra como usar o modo não ganancioso para permitir que vários blocos de junção compartilhem uma fonte de dados com mais eficiência, consulte Como usar o JoinBlock para ler dados de várias fontes.
Blocos de fluxo de dados personalizados
Embora a Biblioteca de Fluxo de Dados TPL forneça muitos tipos de bloco predefinidos, você pode criar tipos de bloco adicionais que executam um comportamento personalizado. Implemente as interfaces ISourceBlock<TOutput> ou ITargetBlock<TInput> diretamente ou use o método Encapsulate para criar um bloco complexo que encapsula o comportamento de tipos de bloco existentes. Para obter exemplos que mostram como implementar a funcionalidade de bloco de fluxo de dados personalizado, consulte Passo a passo: Criando um tipo de bloco de fluxo de dados personalizado.
Tópicos relacionados
| Título | Descrição |
|---|---|
| Como escrever e ler mensagens de um bloco de fluxo de dados | Demonstra como escrever e ler mensagens de um objeto BufferBlock<T>. |
| Como implementar um padrão de fluxo de dados Producer-Consumer | Descreve como usar o modelo de fluxo de dados para implementar um padrão produtor-consumidor, onde o produtor envia mensagens para um bloco de fluxo de dados e o consumidor lê mensagens desse bloco. |
| Como: Executar ação quando um bloco de fluxo de dados recebe dados | Descreve como fornecer delegados para os tipos de bloco de fluxo de dados de execução, ActionBlock<TInput>, TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput>. |
| Passo a passo: Criando um pipeline de fluxo de dados | Descreve como criar um pipeline de fluxo de dados que baixa texto da Web e executa operações nesse texto. |
| Como desvincular blocos de fluxo de dados | Demonstra como usar o método LinkTo para desvincular um bloco de destino de sua origem depois que a fonte oferece uma mensagem para o destino. |
| Passo a passo: Usando o fluxo de dados em um aplicativo do Windows Forms | Demonstra como criar uma rede de blocos de fluxo de dados que executam o processamento de imagem em um aplicativo Windows Forms. |
| pt-PT: Como cancelar um bloco de fluxo de dados | Demonstra como usar o cancelamento em um aplicativo do Windows Forms. |
| Como usar o JoinBlock para ler dados de várias fontes | Explica como usar a classe JoinBlock<T1,T2> para executar uma operação quando os dados estão disponíveis de várias fontes e como usar o modo não ganancioso para permitir que vários blocos de junção compartilhem uma fonte de dados com mais eficiência. |
| Como: Especificar o grau de paralelismo em um bloco de fluxo de dados | Descreve como definir a propriedade MaxDegreeOfParallelism para permitir que um bloco de fluxo de dados de execução processe mais de uma mensagem de cada vez. |
| Como: Especificar um agendador de tarefas em um bloco de fluxo de dados | Demonstra como associar um agendador de tarefas específico quando você usa o fluxo de dados em seu aplicativo. |
| passo a passo: Usando BatchBlock e BatchedJoinBlock para melhorar a eficiência | Descreve como usar a classe BatchBlock<T> para melhorar a eficiência das operações de inserção de banco de dados e como usar a classe BatchedJoinBlock<T1,T2> para capturar os resultados e quaisquer exceções que ocorram enquanto o programa lê de um banco de dados. |
| Passo a passo: Criando um tipo de bloco de fluxo de dados personalizado | Demonstra duas maneiras de criar um tipo de bloco de fluxo de dados que implementa o comportamento personalizado. |
| Biblioteca Paralela de Tarefas (TPL) | Apresenta o TPL, uma biblioteca que simplifica a programação paralela e simultânea em aplicativos .NET Framework. |