Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
A TPL (Biblioteca Paralela de Tarefas) fornece componentes de fluxo de dados para ajudar a aumentar a robustez dos aplicativos habilitados para simultaneidade. Esses componentes de fluxo de dados são chamados coletivamente de da Biblioteca de Fluxo de Dados TPL. Esse modelo de fluxo de dados promove a programação baseada em atores ao fornecer passagem de mensagens dentro do processo para tarefas de fluxo de dados e de pipeline de granularidade grosseira. Os componentes de fluxo de dados se baseiam nos tipos e na infraestrutura de agendamento do TPL e integram-se com o suporte à linguagem C#, Visual Basic e F# para programação assíncrona. Esses componentes de fluxo de dados são úteis quando você tem várias operações que devem se comunicar entre si de forma assíncrona ou quando você deseja processar dados conforme eles ficam disponíveis. Por exemplo, considere um aplicativo que processa dados de imagem de uma câmera da Web. Usando o modelo de fluxo de dados, o aplicativo pode processar quadros de imagem conforme eles ficam disponíveis. Se o aplicativo aprimora os quadros de imagem, por exemplo, executando a redução de olhos vermelhos ou correção de luz, você pode criar um pipeline dos componentes de fluxo de dados. Cada estágio do pipeline pode usar uma funcionalidade de paralelismo mais granular, como a funcionalidade fornecida pelo TPL, para transformar a imagem.
Este documento fornece uma visão geral da Biblioteca de Fluxos de Dados TPL. Ele descreve o modelo de programação, os tipos de bloco de fluxo de dados predefinidos e como configurar blocos de fluxo de dados para atender aos requisitos específicos de seus aplicativos.
Observação
A Biblioteca de Fluxo de Dados TPL (o namespace System.Threading.Tasks.Dataflow) não é distribuída com .NET. Para instalar o namespace System.Threading.Tasks.Dataflow no Visual Studio, abra seu projeto, escolha Gerenciar Pacotes NuGet no menu do Projeto e pesquise online o pacote System.Threading.Tasks.Dataflow
. Como alternativa, instale-o usando a CLI do .NET Core e execute dotnet add package System.Threading.Tasks.Dataflow
.
Modelo de programação
A Biblioteca de Fluxo de Dados TPL fornece uma base para passar mensagens e paralelizar aplicativos com uso intensivo de CPU e uso intensivo de E/S que têm alta taxa de transferência e baixa latência. Ele também fornece controle explícito sobre como os dados são armazenados em buffer e se movem ao redor do sistema. Para entender melhor o modelo de programação de fluxo de dados, considere um aplicativo que carrega as imagens de forma assíncrona do disco e cria uma composição dessas imagens. Os modelos de programação tradicionais normalmente exigem que você use retornos de chamada e objetos de sincronização, como bloqueios, para coordenar tarefas e acesso a dados compartilhados. Usando o modelo de programação de fluxo de dados, você pode criar objetos de fluxo de dados que processam imagens conforme elas são lidas do disco. No modelo de fluxo de dados, você declara como os dados são tratados quando eles ficam disponíveis e também quaisquer dependências entre os dados. Como o runtime gerencia dependências entre dados, muitas vezes você pode evitar o requisito de sincronizar o acesso aos dados compartilhados. Além disso, como os agendamentos de runtime funcionam com base na chegada assíncrona de dados, o fluxo de dados pode melhorar a capacidade de resposta e a taxa de transferência gerenciando com eficiência os threads subjacentes. Para obter um exemplo que usa o modelo de programação de fluxo de dados para implementar o processamento de imagens em um aplicativo do Windows Forms, consulte Passo a passo: usando o fluxo de dados em um aplicativo do Windows Forms.
Fontes e destinos
A Biblioteca de Fluxo de Dados TPL consiste em blocos de fluxo de dados, que são estruturas que armazenam em buffer e processam dados. O TPL define três tipos de blocos de fluxo de dados: blocos de origem, blocos de destinoe blocos propagadores. Um bloco de origem atua como uma fonte de dados e pode ser lido. Um bloco de destino atua como um receptor de dados e pode ser gravado. Um bloco de propagador atua como um bloco de origem e um bloco de destino e pode ser lido e gravado. O TPL define a interface System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> para representar fontes, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> para representar destinos e System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> para representar propagadores. IPropagatorBlock<TInput,TOutput> herda de ISourceBlock<TOutput>e ITargetBlock<TInput>.
A Biblioteca de Fluxos de Dados TPL fornece vários tipos de bloco de fluxo de dados predefinidos que implementam as interfaces ISourceBlock<TOutput>, ITargetBlock<TInput>e IPropagatorBlock<TInput,TOutput>. Esses tipos de bloco de fluxo de dados são descritos neste documento na seção tipos de bloco de fluxo de dados predefinidos.
Conectando blocos
Você pode conectar blocos de fluxo de dados para formar pipelines, que são sequências lineares de blocos de fluxo de dados ou redes, que são grafos de blocos de fluxo de dados. Um pipeline é uma forma de rede. Em um pipeline ou rede, as fontes propagam dados de forma assíncrona para destinos à medida que esses dados ficam disponíveis. O método ISourceBlock<TOutput>.LinkTo vincula um bloco de fluxo de dados de origem a um bloco de destino. Uma fonte pode ser vinculada a zero ou mais alvos; alvos podem ser vinculados a partir de zero ou mais fontes. Você pode adicionar ou remover blocos de fluxo de dados de ou para um pipeline ou rede simultaneamente. Os tipos de bloco de fluxo de dados predefinidos tratam de todos os aspectos de acesso thread-safe de vincular e desvincular.
Para obter um exemplo que conecta blocos de fluxo de dados para formar um pipeline básico, consulte Passo a passo: criando um pipeline de fluxo de dados. Para obter um exemplo que conecta blocos de fluxo de dados para formar uma rede mais complexa, consulte Passo a passo: usando o fluxo de dados em um aplicativo do Windows Forms. Para obter um exemplo que desvincula um destino de uma origem depois que a origem oferece uma mensagem ao destino, consulte Como desvincular blocos de fluxo de dados.
Filtragem
Ao chamar o método ISourceBlock<TOutput>.LinkTo para vincular uma origem a um destino, você pode fornecer um delegado que determina se o bloco de destino aceita ou rejeita uma mensagem com base no valor dessa mensagem. Esse mecanismo de filtragem é uma maneira útil de garantir que um bloco de fluxo de dados receba apenas determinados valores. Para a maioria dos tipos de bloco de fluxo de dados predefinidos, se um bloco de origem estiver conectado a vários blocos de destino, quando um bloco de destino rejeitar uma mensagem, a origem oferecerá essa mensagem para o próximo destino. A ordem na qual uma origem oferece mensagens para destinos é definida pela origem e pode variar de acordo com o tipo da origem. A maioria dos tipos de bloco de origem para de oferecer uma mensagem depois que um destino aceita essa mensagem. Uma exceção a essa regra é a classe BroadcastBlock<T>, que oferece cada mensagem a todos os destinos, mesmo que alguns destinos rejeitem a mensagem. Para obter um exemplo que usa a filtragem para processar apenas determinadas mensagens, consulte Passo a passo: usando o fluxo de dados em um aplicativo do Windows Forms.
Importante
Como cada tipo de bloco de fluxo de dados de origem predefinido garante que as mensagens sejam propagadas na ordem em que são recebidas, cada mensagem deve ser lida do bloco de origem antes que o bloco de origem possa processar a próxima mensagem. Portanto, ao usar a filtragem para conectar vários destinos a uma origem, verifique se pelo menos um bloco de destino recebe cada mensagem. Caso contrário, seu aplicativo poderá ficar em deadlock.
Passagem de Mensagens
O modelo de programação de fluxo de dados está relacionado ao conceito de passagem de mensagens, em que componentes independentes de um programa se comunicam entre si enviando mensagens. Uma maneira de propagar mensagens entre os componentes do aplicativo é chamar os métodos Post (síncrono) e SendAsync (assíncrono) para enviar mensagens para blocos de fluxo de dados de destino e os métodos Receive, ReceiveAsynce TryReceive para receber mensagens de blocos de origem. Você pode combinar esses métodos com pipelines ou redes de fluxo de dados enviando dados de entrada para o nó principal (um bloco de destino) e recebendo dados de saída do nó final do pipeline ou dos nós finais da rede (um ou mais blocos de origem). Você também pode usar o método Choose para ler a partir da primeira fonte fornecida que tenha dados disponíveis e, em seguida, executar ação nesses dados.
Os blocos de origem oferecem dados para blocos de destino chamando o método ITargetBlock<TInput>.OfferMessage. O bloco de destino responde a uma mensagem oferecida de uma das três maneiras: ele pode aceitar a mensagem, recusar a mensagem ou adiar a mensagem. Quando o destino aceita a mensagem, o método OfferMessage retorna Accepted. Quando o destino recusa a mensagem, o método OfferMessage retorna Declined. Quando o destino exige que ele não receba mais mensagens da origem, OfferMessage retorna DecliningPermanently. Os tipos de blocos de origem predefinidos não enviam mensagens para alvos vinculados após o recebimento desse valor de retorno, e eles se desvinculam automaticamente desses alvos.
Quando um bloco de destino adia a mensagem para uso posterior, o método OfferMessage retorna Postponed. Um bloco de destino que adia uma mensagem pode chamar posteriormente o método ISourceBlock<TOutput>.ReserveMessage para tentar reservar a mensagem oferecida. Neste ponto, é possível que a mensagem ainda esteja disponível e possa ser usada pelo bloco de destino ou então que a mensagem tenha sido tomada por outro destino. Quando o bloco de destino mais tarde requer a mensagem ou não precisa mais da mensagem, ele chama o método ISourceBlock<TOutput>.ConsumeMessage ou ReleaseReservation, respectivamente. A reserva de mensagens é usada normalmente pelos tipos de bloco de fluxo de dados que operam no modo não ganancioso. O modo não greedy é explicado mais adiante neste documento. Em vez de reservar uma mensagem adiada, um bloco de destino também pode usar o método ISourceBlock<TOutput>.ConsumeMessage para tentar consumir diretamente a mensagem adiada.
Conclusão do bloco de fluxo de dados
Blocos de fluxo de dados também dão suporte ao conceito de conclusão. Um bloco de fluxo de dados que está no estado concluído não executa nenhum trabalho adicional. Cada bloco de fluxo de dados tem um objeto System.Threading.Tasks.Task associado, conhecido como tarefa de conclusão , que representa o status de conclusão do bloco. Como você pode aguardar a conclusão de um objeto Task, usando tarefas de conclusão, você pode aguardar a conclusão de um ou mais nós terminais de uma rede de fluxo de dados. A interface IDataflowBlock define o método Complete, que informa o bloco de fluxo de dados de uma solicitação para que ele seja concluído e a propriedade Completion, que retorna a tarefa de conclusão do bloco de fluxo de dados. ISourceBlock<TOutput> e ITargetBlock<TInput> herdam a interface IDataflowBlock.
Há duas maneiras de determinar se um bloco de fluxo de dados foi concluído sem erro, encontrou um ou mais erros ou foi cancelado. A primeira maneira é chamar o método Task.Wait na tarefa de conclusão em um bloco de try
-catch
(Try
-Catch
no Visual Basic). O exemplo a seguir cria um objeto ActionBlock<TInput> que gera ArgumentOutOfRangeException se seu valor de entrada for menor que zero.
AggregateException é gerado quando este exemplo chama Wait na tarefa de conclusão. O ArgumentOutOfRangeException é acessado por meio da propriedade InnerExceptions do objeto AggregateException.
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine($"n = {n}");
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
return true;
});
}
/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
Este exemplo demonstra o caso em que uma exceção não é tratada no delegado de um bloco de fluxo de dados de execução. É recomendável que você trate exceções nos corpos de tais blocos. No entanto, se você não conseguir fazer isso, o bloco se comportará como se tivesse sido cancelado e não processará mensagens de entrada.
Quando um bloco de fluxo de dados é cancelado explicitamente, o objeto AggregateException contém OperationCanceledException na propriedade InnerExceptions. Para obter mais informações sobre o cancelamento de fluxo de dados, consulte a seção Habilitando o cancelamento.
A segunda maneira de determinar o status de conclusão de um bloco de fluxo de dados é usar uma continuação da tarefa de conclusão ou usar os recursos de linguagem assíncrona do C# e do Visual Basic para aguardar assíncronamente a tarefa de conclusão. O delegado que você fornece ao método Task.ContinueWith usa um objeto Task que representa a tarefa antecedente. No caso da propriedade Completion, o próprio representante para a continuação recebe a tarefa de conclusão. O exemplo a seguir se assemelha ao anterior, exceto que ele também usa o método ContinueWith para criar uma tarefa de continuação que imprime o status da operação de fluxo de dados geral.
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine($"n = {n}");
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
return true;
});
}
/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Create a continuation task that prints the overall
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' The status of the completion task is 'Faulted'.
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
Você também pode usar propriedades como IsCanceled no corpo da tarefa de continuação para determinar informações adicionais sobre o status de conclusão de um bloco de fluxo de dados. Para obter mais informações sobre tarefas de continuação e como elas se relacionam ao cancelamento e ao tratamento de erros, consulte Encadeamento de tarefas usando tarefas de continuação, de cancelamento de tarefa e de tratamento de exceções.
Tipos de bloco de fluxo de dados predefinidos
A Biblioteca de Fluxo de Dados TPL fornece vários tipos de bloco de fluxo de dados predefinidos. Esses tipos são divididos em três categorias: blocos de buffer, blocos de execuçãoe blocos de agrupamento. As seções a seguir descrevem os tipos de bloco que compõem essas categorias.
Blocos de buffer
Os blocos de buffer contêm dados para uso por consumidores de dados. A Biblioteca de Fluxo de Dados TPL fornece três tipos de bloco de buffer: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>e System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.
BufferBlock<T>
A classe BufferBlock<T> representa uma estrutura de mensagens assíncrona de uso geral. Essa classe armazena uma fila de mensagens FIFO (primeiro a entrar, primeiro a sair) que pode ser gravada por várias fontes ou lida por vários destinos. Quando um destino recebe uma mensagem de um objeto BufferBlock<T>, essa mensagem é removida da fila de mensagens. Portanto, embora um objeto BufferBlock<T> possa ter vários destinos, apenas um destino receberá cada mensagem. A classe BufferBlock<T> é útil quando você deseja passar várias mensagens para outro componente e esse componente deve receber cada mensagem.
O exemplo básico a seguir posta vários valores Int32 em um objeto BufferBlock<T> e lê esses valores de volta desse objeto.
// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
/* Output:
0
1
2
*/
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()
' Post several messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
' Output:
' 0
' 1
' 2
'
Para obter um exemplo completo que demonstra como gravar mensagens e ler mensagens de um objeto BufferBlock<T>, consulte Como gravar mensagens e ler mensagens de um bloco de fluxo de dados.
T do BroadcastBlock<>
A classe BroadcastBlock<T> é útil quando você deve passar várias mensagens para outro componente, mas esse componente precisa apenas do valor mais recente. Essa classe também é útil quando você deseja transmitir uma mensagem para vários componentes.
O exemplo básico a seguir posta um valor Double em um objeto BroadcastBlock<T> e lê esse valor de volta desse objeto várias vezes. Como os valores não são removidos dos objetos BroadcastBlock<T> depois de lidos, o mesmo valor está disponível todas as vezes.
// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);
// Post a message to the block.
broadcastBlock.Post(Math.PI);
// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(broadcastBlock.Receive());
}
/* Output:
3.14159265358979
3.14159265358979
3.14159265358979
*/
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)
' Post a message to the block.
broadcastBlock.Post(Math.PI)
' Receive the messages back from the block several times.
For i As Integer = 0 To 2
Console.WriteLine(broadcastBlock.Receive())
Next i
' Output:
' 3.14159265358979
' 3.14159265358979
' 3.14159265358979
'
Para obter um exemplo completo que demonstra como usar BroadcastBlock<T> para difundir uma mensagem a vários blocos de destino, confira Como especificar um agendador de tarefas em um bloco de fluxo de dados.
WriteOnceBlock<T>
A classe WriteOnceBlock<T> se assemelha à classe BroadcastBlock<T>, exceto que um objeto WriteOnceBlock<T> pode ser gravado apenas uma vez. Você pode pensar em WriteOnceBlock<T> como sendo semelhante à palavra-chave de C# readonly (ReadOnly no Visual Basic), exceto pelo fato de que um objeto WriteOnceBlock<T> torna-se imutável depois de receber um valor em vez de uma construção. Assim como a classe BroadcastBlock<T>, quando um destino recebe uma mensagem de um objeto WriteOnceBlock<T>, essa mensagem não é removida desse objeto. Portanto, vários destinos recebem uma cópia da mensagem. A classe WriteOnceBlock<T> é útil quando você deseja propagar apenas a primeira de várias mensagens.
O exemplo básico a seguir posta diversos valores String em um objeto WriteOnceBlock<T> e, em seguida, lê esse valor de volta a partir desse objeto. Como um objeto WriteOnceBlock<T> pode ser gravado apenas uma vez, depois que um objeto WriteOnceBlock<T> recebe uma mensagem, ele descarta mensagens subsequentes.
// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);
// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
() => writeOnceBlock.Post("Message 1"),
() => writeOnceBlock.Post("Message 2"),
() => writeOnceBlock.Post("Message 3"));
// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());
/* Sample output:
Message 2
*/
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)
' Post several messages to the block in parallel. The first
' message to be received is written to the block.
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))
' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())
' Sample output:
' Message 2
'
Para obter um exemplo completo que demonstra como usar WriteOnceBlock<T> para receber o valor da primeira operação concluída, consulte Como desvincular blocos de fluxo de dados.
Blocos de execução
Blocos de execução chamam um delegado fornecido pelo usuário para cada parte dos dados recebidos. A Biblioteca de Fluxo de Dados TPL fornece três tipos de bloco de execução: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>e System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.
Bloco de Ação<T>
A classe ActionBlock<TInput> é um bloco de destino que chama um delegado quando recebe dados. Pense em um objeto ActionBlock<TInput> como um delegado que é executado de forma assíncrona quando os dados ficam disponíveis. O delegado que você fornece a um objeto ActionBlock<TInput> pode ser do tipo Action<T> ou do tipo System.Func<TInput, Task>
. Quando você usa um objeto ActionBlock<TInput> com Action<T>, o processamento de cada elemento de entrada é considerado concluído quando o delegado retorna. Quando você usa um objeto ActionBlock<TInput> com System.Func<TInput, Task>
, o processamento de cada elemento de entrada é considerado concluído somente quando o objeto Task retornado é concluído. Usando esses dois mecanismos, você pode usar ActionBlock<TInput> para processamento síncrono e assíncrono de cada elemento de entrada.
O exemplo básico a seguir posta vários valores Int32 em um objeto ActionBlock<TInput>. O objeto ActionBlock<TInput> imprime esses valores no console. Este exemplo define o bloco como o estado concluído e aguarda a conclusão de todas as tarefas de fluxo de dados.
// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
actionBlock.Post(i * 10);
}
// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();
/* Output:
0
10
20
*/
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))
' Post several messages to the block.
For i As Integer = 0 To 2
actionBlock.Post(i * 10)
Next i
' Set the block to the completed state and wait for all
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()
' Output:
' 0
' 10
' 20
'
Para obter exemplos completos que demonstram como usar delegados com a classe ActionBlock<TInput>, consulte Como executar a ação quando um bloco de fluxo de dados recebe dados.
TransformBlock<TInput, TOutput>
A classe TransformBlock<TInput,TOutput> é semelhante à classe ActionBlock<TInput>, exceto que ela atua tanto como origem quanto destino. O representante que você passa para um objeto TransformBlock<TInput,TOutput> retorna um valor do tipo TOutput
. O delegado que você fornece a um objeto TransformBlock<TInput,TOutput> pode ser do tipo System.Func<TInput, TOutput>
ou do tipo System.Func<TInput, Task<TOutput>>
. Quando você usa um objeto TransformBlock<TInput,TOutput> com System.Func<TInput, TOutput>
, o processamento de cada elemento de entrada é considerado concluído quando o delegado retorna. Quando você usa um objeto TransformBlock<TInput,TOutput> usado com System.Func<TInput, Task<TOutput>>
, o processamento de cada elemento de entrada é considerado concluído somente quando o objeto Task<TResult> retornado é concluído. Assim como acontece com ActionBlock<TInput>, usando esses dois mecanismos, você pode usar TransformBlock<TInput,TOutput> para processamento síncrono e assíncrono de cada elemento de entrada.
O exemplo básico a seguir cria um objeto TransformBlock<TInput,TOutput> que calcula a raiz quadrada de sua entrada. O objeto TransformBlock<TInput,TOutput> usa Int32 valores como entrada e produz valores Double como saída.
// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));
// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);
// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(transformBlock.Receive());
}
/* Output:
3.16227766016838
4.47213595499958
5.47722557505166
*/
' Create a TransformBlock<int, double> object that
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))
' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)
' Read the output messages from the block.
For i As Integer = 0 To 2
Console.WriteLine(transformBlock.Receive())
Next i
' Output:
' 3.16227766016838
' 4.47213595499958
' 5.47722557505166
'
Para obter exemplos completos que usam TransformBlock<TInput,TOutput> em uma rede de blocos de fluxo de dados que executa o processamento de imagens em um aplicativo do Windows Forms, consulte Passo a passo: usando o fluxo de dados em um aplicativo do Windows Forms.
TransformManyBlock<TInput, TOutput>
A classe TransformManyBlock<TInput,TOutput> se assemelha à classe TransformBlock<TInput,TOutput>, exceto que TransformManyBlock<TInput,TOutput> produz zero ou mais valores de saída para cada valor de entrada, em vez de apenas um valor de saída para cada valor de entrada. O delegado que você fornece a um objeto TransformManyBlock<TInput,TOutput> pode ser do tipo System.Func<TInput, IEnumerable<TOutput>>
ou do tipo System.Func<TInput, Task<IEnumerable<TOutput>>>
. Quando você usa um objeto TransformManyBlock<TInput,TOutput> com System.Func<TInput, IEnumerable<TOutput>>
, o processamento de cada elemento de entrada é considerado concluído quando o delegado retorna. Quando você usa um objeto TransformManyBlock<TInput,TOutput> com System.Func<TInput, Task<IEnumerable<TOutput>>>
, o processamento de cada elemento de entrada é considerado concluído somente quando o objeto System.Threading.Tasks.Task<IEnumerable<TOutput>>
retornado é concluído.
O exemplo básico a seguir cria um objeto TransformManyBlock<TInput,TOutput> que divide cadeias de caracteres em suas sequências de caracteres individuais. O objeto TransformManyBlock<TInput,TOutput> usa String valores como entrada e produz valores Char como saída.
// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
s => s.ToCharArray());
// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");
// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
Console.WriteLine(transformManyBlock.Receive());
}
/* Output:
H
e
l
l
o
W
o
r
l
d
*/
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())
' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")
' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
Console.WriteLine(transformManyBlock.Receive())
Next i
' Output:
' H
' e
' l
' l
' o
' W
' o
' r
' l
' d
'
Para obter exemplos completos que usam TransformManyBlock<TInput,TOutput> para produzir várias saídas independentes para cada entrada em um pipeline de fluxo de dados, consulte Passo a passo: criando um pipeline de fluxo de dados.
Grau de paralelismo
Os buffers de objeto ActionBlock<TInput>, TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput> armazenam mensagens de entrada até que o bloco esteja pronto para processá-las. Por padrão, essas classes processam mensagens na ordem em que são recebidas, uma mensagem por vez. Você também pode especificar o grau de paralelismo para habilitar objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput> para processar várias mensagens simultaneamente. Para obter mais informações sobre a execução simultânea, consulte a seção Especificando o grau de paralelismo posteriormente neste documento. Para obter um exemplo que define o grau de paralelismo para permitir que um bloco de fluxo de dados de execução processe mais de uma mensagem por vez, consulte Como especificar o grau de paralelismo em um bloco de fluxo de dados.
Resumo dos tipos de delegado
A tabela a seguir resume os tipos de delegado que você pode fornecer aos objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput>. Esta tabela também especifica se o tipo delegado opera de forma síncrona ou assíncrona.
Tipo | Tipo de delegado síncrono | Tipo de delegado assíncrono |
---|---|---|
ActionBlock<TInput> | System.Action |
System.Func<TInput, Task> |
TransformBlock<TInput,TOutput> | System.Func<TInput, TOutput> |
System.Func<TInput, Task<TOutput>> |
TransformManyBlock<TInput,TOutput> | System.Func<TInput, IEnumerable<TOutput>> |
System.Func<TInput, Task<IEnumerable<TOutput>>> |
Você também pode usar expressões lambda quando trabalha com tipos de bloco de execução. Para obter um exemplo que mostra como usar uma expressão lambda com um bloco de execução, consulte Como executar a ação quando um bloco de fluxo de dados recebe dados.
Blocos de agrupamento
Os blocos de agrupamento combinam dados de uma ou mais fontes e sob várias restrições. A Biblioteca de Fluxo de Dados TPL fornece três tipos de bloco de junção: BatchBlock<T>, JoinBlock<T1,T2>e BatchedJoinBlock<T1,T2>.
T do BatchBlock<>
A classe BatchBlock<T> combina conjuntos de dados de entrada, que são conhecidos como lotes, em matrizes de dados de saída. Especifique o tamanho de cada lote ao criar um objeto BatchBlock<T>. Quando o objeto BatchBlock<T> recebe a contagem especificada de elementos de entrada, ele propaga de forma assíncrona uma matriz que contém esses elementos. Se um objeto BatchBlock<T> estiver definido como o estado concluído, mas não contiver elementos suficientes para formar um lote, ele propagará uma matriz final que contém os elementos de entrada restantes.
A classe BatchBlock<T> opera em qualquer um dos modos greedy ou não greedy. No modo greedy, que é o padrão, um objeto BatchBlock<T> aceita todas as mensagens oferecidas e propaga uma matriz depois de receber a contagem especificada de elementos. No modo não greedy, um objeto BatchBlock<T> adia todas as mensagens de entrada até que origens suficientes tenham oferecido mensagens para que o bloco forme um lote. O modo guloso geralmente é mais eficiente do que o modo não guloso, pois requer menos sobrecarga de processamento. No entanto, você pode usar o modo não ganancioso quando deve coordenar o consumo de várias fontes de maneira atômica. Especifique o modo não greedy definindo Greedy como False
no parâmetro dataflowBlockOptions
no constructo BatchBlock<T>.
O exemplo básico a seguir posta vários valores Int32 em um objeto BatchBlock<T> que contém dez elementos em um lote. Para garantir que todos os valores sejam propagados fora do BatchBlock<T>, este exemplo chama o método Complete. O método Complete define o objeto BatchBlock<T> para o estado concluído e, portanto, o objeto BatchBlock<T> propaga todos os elementos restantes como um lote final.
// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);
// Post several values to the block.
for (int i = 0; i < 13; i++)
{
batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();
// Print the sum of both batches.
Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");
Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");
/* Output:
The sum of the elements in batch 1 is 45.
The sum of the elements in batch 2 is 33.
*/
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)
' Post several values to the block.
For i As Integer = 0 To 12
batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()
' Print the sum of both batches.
Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())
Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())
' Output:
' The sum of the elements in batch 1 is 45.
' The sum of the elements in batch 2 is 33.
'
Para obter um exemplo completo que usa BatchBlock<T> para melhorar a eficiência das operações de inserção de banco de dados, consulte Passo a passo: usando BatchBlock e BatchedJoinBlock para melhorar a eficiência.
JoinBlock<T1, T2, ...>
As classes JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> coletam elementos de entrada e propagam System.Tuple<T1,T2> ou objetos System.Tuple<T1,T2,T3> que contêm esses elementos. As classes JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> não herdam de ITargetBlock<TInput>. Em vez disso, eles fornecem propriedades, Target1, Target2e Target3, que implementam ITargetBlock<TInput>.
Como BatchBlock<T>, JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> operam no modo ganancioso ou não ganancioso. No modo greedy, que é o padrão, um objeto JoinBlock<T1,T2> ou JoinBlock<T1,T2,T3> aceita todas as mensagens que são oferecidas a ele e propaga uma tupla depois de cada um dos seus destinos receber pelo menos uma mensagem. No modo não greedy, um objeto JoinBlock<T1,T2> ou JoinBlock<T1,T2,T3> adia todas as mensagens de entrada até que os dados que são necessários para criar uma tupla tenham sido oferecidos para todos os destinos. Neste ponto, o bloco se envolve em um protocolo 2PC para recuperar atomicamente todos os itens obrigatórios das origens. Esse adiamento possibilita que outra entidade consuma os dados enquanto isso, para permitir que o sistema geral faça progressos.
O exemplo básico a seguir demonstra um caso em que um objeto JoinBlock<T1,T2,T3> requer vários dados para calcular um valor. Este exemplo cria um objeto JoinBlock<T1,T2,T3> que requer dois valores Int32 e um valor Char para executar uma operação aritmética.
// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();
// Post two values to each target of the join.
joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);
joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);
joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');
// Receive each group of values and apply the operator part
// to the number parts.
for (int i = 0; i < 2; i++)
{
var data = joinBlock.Receive();
switch (data.Item3)
{
case '+':
Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
break;
case '-':
Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
break;
default:
Console.WriteLine($"Unknown operator '{data.Item3}'.");
break;
}
}
/* Output:
3 + 5 = 8
6 - 4 = 2
*/
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()
' Post two values to each target of the join.
joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)
joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)
joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)
' Receive each group of values and apply the operator part
' to the number parts.
For i As Integer = 0 To 1
Dim data = joinBlock.Receive()
Select Case data.Item3
Case "+"c
Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
Case "-"c
Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
Case Else
Console.WriteLine("Unknown operator '{0}'.", data.Item3)
End Select
Next i
' Output:
' 3 + 5 = 8
' 6 - 4 = 2
'
Para um exemplo completo que utiliza objetos JoinBlock<T1,T2> no modo não ganancioso para compartilhar cooperativamente um recurso, consulte Como usar JoinBlock para ler dados de várias fontes.
BatchedJoinBlock<T1, T2, ...>
As classes BatchedJoinBlock<T1,T2> e BatchedJoinBlock<T1,T2,T3> coletam lotes de elementos de entrada e propagam objetos System.Tuple(IList(T1), IList(T2))
ou System.Tuple(IList(T1), IList(T2), IList(T3))
que contêm esses elementos. Pense em BatchedJoinBlock<T1,T2> como uma combinação de BatchBlock<T> e JoinBlock<T1,T2>. Especifique o tamanho de cada lote ao criar um objeto BatchedJoinBlock<T1,T2>.
BatchedJoinBlock<T1,T2> também fornece propriedades, Target1 e Target2, que implementam ITargetBlock<TInput>. Quando a contagem especificada de elementos de entrada é recebida de todos os destinos, o objeto BatchedJoinBlock<T1,T2> propaga de forma assíncrona um objeto System.Tuple(IList(T1), IList(T2))
que contém esses elementos.
O exemplo básico a seguir cria um objeto BatchedJoinBlock<T1,T2> que contém resultados, valores Int32 e erros que são Exception objetos. Este exemplo executa várias operações e grava resultados na propriedade Target1 e erros na propriedade Target2 do objeto BatchedJoinBlock<T1,T2>. Como a contagem de operações bem-sucedidas e com falha é desconhecida com antecedência, os objetos IList<T> permitem que cada destino receba zero ou mais valores.
// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
if (n < 0)
throw new ArgumentOutOfRangeException();
return n;
};
// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);
// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
try
{
// Post the result of the worker to the
// first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i));
}
catch (ArgumentOutOfRangeException e)
{
// If an error occurred, post the Exception to the
// second target of the block.
batchedJoinBlock.Target2.Post(e);
}
}
// Read the results from the block.
var results = batchedJoinBlock.Receive();
// Print the results to the console.
// Print the results.
foreach (int n in results.Item1)
{
Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
Console.WriteLine(e.Message);
}
/* Output:
5
6
13
55
0
Specified argument was out of the range of valid values.
Specified argument was out of the range of valid values.
*/
' For demonstration, create a Func<int, int> that
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
Return n
End Function
' Create a BatchedJoinBlock<int, Exception> object that holds
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)
' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
Try
' Post the result of the worker to the
' first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i))
Catch e As ArgumentOutOfRangeException
' If an error occurred, post the Exception to the
' second target of the block.
batchedJoinBlock.Target2.Post(e)
End Try
Next i
' Read the results from the block.
Dim results = batchedJoinBlock.Receive()
' Print the results to the console.
' Print the results.
For Each n As Integer In results.Item1
Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
Console.WriteLine(e.Message)
Next e
' Output:
' 5
' 6
' 13
' 55
' 0
' Specified argument was out of the range of valid values.
' Specified argument was out of the range of valid values.
'
Para obter um exemplo completo que usa BatchedJoinBlock<T1,T2> para capturar os resultados e quaisquer exceções que ocorram enquanto o programa lê de um banco de dados, consulte Passo a passo: usando BatchBlock e BatchedJoinBlock para melhorar a eficiência.
Configurando o comportamento do bloco de fluxo de dados
Você pode habilitar opções adicionais fornecendo um objeto System.Threading.Tasks.Dataflow.DataflowBlockOptions ao construtor de tipos de bloco de fluxo de dados. Essas opções controlam o comportamento, como o agendador que gerencia a tarefa subjacente e o grau de paralelismo. O DataflowBlockOptions também tem tipos derivados que especificam um comportamento específico para determinados tipos de bloco de fluxo de dados. A tabela a seguir resume qual tipo de opções está associado a cada tipo de bloco de fluxo de dados.
As seções a seguir fornecem informações adicionais sobre os tipos importantes de opções de bloco de fluxo de dados disponíveis por meio das classes System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionse System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.
Especificando o agendador de tarefas
Cada bloco de fluxo de dados predefinido usa o mecanismo de agendamento de tarefas TPL para executar atividades como propagar dados para um destino, receber dados de uma origem e executar delegados definidos pelo usuário quando os dados estiverem disponíveis. TaskScheduler é uma classe abstrata que representa um agendador de tarefas que enfileira tarefas em threads. O agendador de tarefas padrão, Default, usa a classe ThreadPool para enfileirar e executar o trabalho. Você pode substituir o agendador de tarefas padrão definindo a propriedade TaskScheduler ao construir um objeto de bloco de fluxo de dados.
Quando o mesmo agendador de tarefas gerencia vários blocos de fluxo de dados, ele pode impor políticas entre eles. Por exemplo, se vários blocos de fluxo de dados forem configurados para direcionar o agendador exclusivo do mesmo objeto ConcurrentExclusiveSchedulerPair, todo o trabalho executado nesses blocos será serializado. Da mesma forma, se esses blocos estiverem configurados para direcionar o agendador simultâneo do mesmo objeto ConcurrentExclusiveSchedulerPair e esse agendador estiver configurado para ter um nível máximo de simultaneidade, todo o trabalho desses blocos será limitado a esse número de operações simultâneas. Para obter um exemplo que usa a classe ConcurrentExclusiveSchedulerPair para habilitar a realização de operações de leitura em paralelo mas mantém as operações de gravação ocorrendo de modo exclusivo com relação a todas as outras operações, confira Como especificar um agendador de tarefas em um bloco de fluxo de dados. Para obter mais informações sobre agendadores de tarefas no TPL, consulte o tópico da classe TaskScheduler.
Especificando o grau de paralelismo
Por padrão, os três tipos de bloco de execução que a Biblioteca de Fluxo de Dados TPL fornece, ActionBlock<TInput>, TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput>, processam uma mensagem por vez. Esses tipos de bloco de fluxo de dados também processam mensagens na ordem em que são recebidas. Para habilitar esses blocos de fluxo de dados para processar mensagens simultaneamente, defina a propriedade ExecutionDataflowBlockOptions.MaxDegreeOfParallelism ao construir o objeto de bloco de fluxo de dados.
O valor padrão de MaxDegreeOfParallelism é 1, o que garante que o bloco de fluxo de dados processe uma mensagem por vez. Definir essa propriedade como um valor maior que 1 permite que o bloco de fluxo de dados processe várias mensagens simultaneamente. Definir essa propriedade como DataflowBlockOptions.Unbounded permite que o agendador de tarefas subjacente gerencie o grau máximo de simultaneidade.
Importante
Quando você especifica um grau máximo de paralelismo maior que 1, várias mensagens são processadas simultaneamente e, portanto, as mensagens podem não ser processadas na ordem em que são recebidas. A ordem na qual as mensagens são saída do bloco é, no entanto, a mesma em que elas são recebidas.
Como a propriedade MaxDegreeOfParallelism representa o grau máximo de paralelismo, o bloco de fluxo de dados pode ser executado com um grau menor de paralelismo do que você especificar. O bloco de fluxo de dados pode usar um menor grau de paralelismo para atender aos seus requisitos funcionais ou porque há uma falta de recursos do sistema disponíveis. Um bloco de fluxo de dados nunca escolhe mais paralelismo do que você especificar.
O valor da propriedade MaxDegreeOfParallelism é exclusivo para cada objeto de bloco de fluxo de dados. Por exemplo, se quatro objetos de bloco de fluxo de dados especificarem 1 para o grau máximo de paralelismo, todos os quatro objetos de bloco de fluxo de dados poderão ser executados em paralelo.
Para obter um exemplo que define o grau máximo de paralelismo para permitir que operações longas ocorram em paralelo, consulte Como especificar o grau de paralelismo em um bloco de fluxo de dados.
Especificando o número de mensagens por tarefa
Os tipos de bloco de fluxo de dados predefinidos usam tarefas para processar vários elementos de entrada. Isso ajuda a minimizar o número de objetos de tarefa necessários para processar dados, o que permite que os aplicativos sejam executados com mais eficiência. No entanto, quando as tarefas de um conjunto de blocos de fluxo de dados estão processando dados, as tarefas de outros blocos de fluxo de dados podem precisar aguardar o tempo de processamento enfileirando mensagens. Para habilitar uma melhor imparcialidade entre as tarefas de fluxo de dados, defina a propriedade MaxMessagesPerTask. Quando MaxMessagesPerTask é definida como DataflowBlockOptions.Unbounded, que é o padrão, a tarefa usada por um bloco de fluxo de dados processa quantas mensagens estiverem disponíveis. Quando MaxMessagesPerTask é definido como um valor diferente de Unbounded, o bloco de fluxo de dados processa no máximo esse número de mensagens por objeto Task. Embora a configuração da propriedade MaxMessagesPerTask possa aumentar a imparcialidade entre as tarefas, isso pode fazer com que o sistema crie mais tarefas do que o necessário, o que pode diminuir o desempenho.
Habilitando o cancelamento
A TPL fornece um mecanismo que permite que as tarefas coordenem o cancelamento de forma cooperativa. Para permitir que os blocos de fluxo de dados participem desse mecanismo de cancelamento, defina a propriedade CancellationToken. Quando esse objeto CancellationToken é definido como o estado cancelado, todos os blocos de fluxo de dados que monitoram esse token terminam a execução de seu item atual, mas não iniciam o processamento de itens subsequentes. Esses blocos de fluxo de dados também limpam mensagens em buffer, liberam conexões para qualquer bloco de origem e destino e fazem a transição para o estado cancelado. Ao fazer a transição para o estado cancelado, a propriedade Completion tem a propriedade Status definida como Canceled, a menos que ocorra uma exceção durante o processamento. Nesse caso, Status está definido como Faulted.
Para obter um exemplo que demonstra como usar o cancelamento em um aplicativo do Windows Forms, consulte How to: Cancel a Dataflow Block. Para obter mais informações sobre cancelamento na TPL, consulte Cancelamento da tarefa.
Especificando comportamento greedy versus não greedy
Vários tipos de bloco de fluxo de dados de agrupamento podem operar em qualquer um dos modos greedy ou não greedy. Por padrão, os tipos de bloco de fluxo de dados predefinidos operam no modo ganancioso.
Para tipos de bloco de junção como JoinBlock<T1,T2>, o modo greedy significa que o bloco aceita dados imediatamente, mesmo que os dados correspondentes com os quais se pretende realizar a junção não estejam disponíveis. O modo não greedy significa que o bloco adia todas as mensagens de entrada até que uma esteja disponível em cada um de seus destinos para concluir a junção. Se qualquer uma das mensagens adiadas não estiver mais disponível, o bloco de junção liberará todas as mensagens adiadas e reiniciará o processo. Para a classe BatchBlock<T>, um comportamento ganancioso e não ganancioso é semelhante, exceto que, no modo não ganancioso, um objeto BatchBlock<T> posterga todas as mensagens recebidas até que mensagens suficientes estejam disponíveis de fontes distintas para concluir um lote.
Para especificar o modo não ganancioso para um bloco de fluxo de dados, defina Greedy para False
. Para ver um exemplo que demonstra como utilizar o modo de operação não voraz para permitir que múltiplos blocos de junção compartilhem uma fonte de dados de maneira mais eficiente, consulte Como usar JoinBlock para ler dados de várias fontes.
Blocos de fluxo de dados personalizados
Embora a Biblioteca de Fluxo de Dados TPL forneça muitos tipos de bloco predefinidos, você pode criar tipos de bloco adicionais que executam o comportamento personalizado. Implemente as interfaces ISourceBlock<TOutput> ou ITargetBlock<TInput> diretamente ou use o método Encapsulate para criar um bloco complexo que encapsula o comportamento dos tipos de bloco existentes. Para obter exemplos que mostram como implementar a funcionalidade personalizada de bloco de fluxo de dados, consulte Passo a passo: criando um tipo de bloco de fluxo de dados personalizado.
Tópicos relacionados
Título | Descrição |
---|---|
Como gravar mensagens e ler mensagens de um bloco de fluxo de dados | Demonstra como gravar mensagens e ler mensagens de um objeto BufferBlock<T>. |
Como implementar um padrão de fluxo de dados Producer-Consumer | Descreve como usar o modelo de fluxo de dados para implementar um padrão produtor-consumidor, em que o produtor envia mensagens para um bloco de fluxo de dados e o consumidor lê mensagens desse bloco. |
Como executar a ação quando um bloco de fluxo de dados recebe dados | Descreve como fornecer delegados para os tipos de bloco de fluxo de dados de execução, ActionBlock<TInput>, TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput>. |
Passo a passo: criando um pipeline de fluxo de dados | Descreve como criar um pipeline de fluxo de dados que baixa texto da Web e executa operações nesse texto. |
Como desvincular blocos de fluxo de dados | Demonstra como usar o método LinkTo para desvincular um bloco de destino de sua origem depois que a origem oferece uma mensagem ao destino. |
passo a passo: usando o fluxo de dados em um aplicativo do Windows Forms | Demonstra como criar uma rede de blocos de fluxo de dados que executam o processamento de imagem em um aplicativo do Windows Forms. |
Como cancelar um bloco de fluxo de dados | Demonstra como usar o cancelamento em um aplicativo do Windows Forms. |
Como usar JoinBlock para ler dados de várias fontes | Explica como usar a classe JoinBlock<T1,T2> para executar uma operação quando os dados estão disponíveis de várias fontes e como utilizar o modo não ganancioso para permitir que múltiplos blocos de junção de dados compartilhem uma fonte de dados de forma mais eficiente. |
Como especificar o grau de paralelismo em um bloco de fluxo de dados | Descreve como definir a propriedade MaxDegreeOfParallelism para habilitar um bloco de fluxo de dados de execução para processar mais de uma mensagem por vez. |
Como especificar um agendador de tarefas em um bloco de fluxo de dados | Demonstra como associar um agendador de tarefas específico ao usar o fluxo de dados em seu aplicativo. |
passo a passo: usando BatchBlock e BatchedJoinBlock para melhorar a eficiência | Descreve como usar a classe BatchBlock<T> para melhorar a eficiência das operações de inserção de banco de dados e como usar a classe BatchedJoinBlock<T1,T2> para capturar os resultados e quaisquer exceções que ocorram enquanto o programa lê de um banco de dados. |
passo a passo: criando um tipo de bloco de fluxo de dados personalizado | Demonstra duas maneiras de criar um tipo de bloco de fluxo de dados que implementa o comportamento personalizado. |
Biblioteca Paralela de Tarefas (TPL) | Apresenta o TPL, uma biblioteca que simplifica a programação paralela e simultânea em aplicativos do .NET Framework. |