Particionadores personalizados para PLINQ e TPL

Para paralelizar a uma operação em uma fonte de dados, uma das etapas essenciais é particionar a fonte em várias seções que possam ser acessadas simultaneamente por vários threads. O PLINQ e a TPL (Biblioteca de Paralelismo de Tarefas) fornecem particionadores padrão que funcionam de forma transparente quando você escreve uma consulta paralela ou um loop ForEach. Para cenários mais avançados, você pode conectar seu próprio particionador.

Tipos de particionamento

Há muitas maneiras de particionar uma fonte de dados. Nas abordagens mais eficientes, vários threads cooperam para processar a sequência de origem original, em vez de separar fisicamente a origem em várias subsequências. Para matrizes e outras fontes indexadas como coleções IList em que o tamanho é conhecido com antecedência, o particionamento por intervalos é o tipo mais simples de particionamento. Cada thread recebe índices exclusivos de abertura e fechamento, para que possa processar seu intervalo da origem sem substituir ou ser substituído por qualquer outro thread. A única sobrecarga envolvida no particionamento por intervalos é o trabalho inicial de criação de intervalos. Nenhuma sincronização adicional é necessária depois disso. Portanto, pode fornecer bom desempenho, desde que a carga de trabalho seja dividida igualmente. Uma desvantagem do particionamento por intervalos é que se um thread termina cedo, não pode ajudar os outros threads a concluírem seu trabalho.

Para listas vinculadas ou outras coleções cujo comprimento não é conhecido, você pode usar o particionamento por partes. Na parte de particionamento, cada thread ou tarefa em uma consulta ou um loop paralelo consome alguns elementos de origem em um bloco, processa-os e volta para recuperar elementos adicionais. O particionador garante que todos os elementos sejam distribuídos e que não haja duplicatas. Uma parte pode ser de qualquer tamanho. Por exemplo, o particionador que é demonstrado em Como implementar partições dinâmicas cria blocos que contêm apenas um elemento. Como as partes não são muito grandes, esse tipo de particionamento é, inerentemente, um balanceamento de carga, pois a atribuição de elementos a threads não é predeterminada. No entanto, o particionador causa sobrecarga de sincronização sempre que o thread precisa obter outro bloco. A quantidade de sincronização que ocorre nesses casos é inversamente proporcional ao tamanho das partes.

Em geral, a partição de intervalo só é mais rápida quando o tempo de execução do representante é pequeno a médio e a origem tem um grande número de elementos e o trabalho total de cada partição é aproximadamente equivalente. Portanto, o particionamento de bloco geralmente é mais rápido na maioria dos casos. Em origens com um pequeno número de elementos ou tempos de execução maiores para o representante, o desempenho de bloco e particionamento por intervalos é mais ou menos igual.

Os particionadores de TPL também dão suporte a um número dinâmico de partições. Isso significa que podem criar partições imediatamente, por exemplo, quando o loop ForEach gera uma nova tarefa. Esse recurso permite que o particionador seja dimensionado junto com o próprio loop. Os particionadores dinâmicos também realizam, inerentemente, o balanceamento de carga. Ao criar um particionador personalizado, você deve dar suporte ao particionamento dinâmico para ser consumível de um loop ForEach.

Configuração de particionadores de balanceamento de carga para PLINQ

Algumas sobrecargas do método Partitioner.Create permitem que você crie um particionador para uma matriz ou origem IList e especifique se deve tentar equilibrar a carga de trabalho entre os threads. Quando o particionador está configurado para balanceamento de carga, o particionamento por partes é usado, e os elementos são enviados para cada partição em pequenas partes conforme são solicitados. Essa abordagem ajuda a garantir que todas as partições tenham elementos para processar até que a consulta ou o loop inteiro seja concluído. Uma sobrecarga adicional pode ser usada para fornecer particionamento de balanceamento de carga de qualquer origem de IEnumerable.

Em geral, o balanceamento de carga requer que as partições solicitem elementos relativamente com frequência do particionador. Por outro lado, um particionador que faz o particionamento estático pode atribuir os elementos a cada particionador de uma só vez usando o particionamento por intervalos ou bloco. Isso requer menos sobrecarga de balanceamento de carga, mas pode levar mais tempo para ser executado se um thread acaba com significativamente mais trabalho do que os outros. Por padrão, quando recebe uma IList ou uma matriz, o PLINQ sempre usa o particionamento por intervalos sem balanceamento de carga. Para habilitar o balanceamento de carga para o PLINQ, use o método Partitioner.Create, conforme mostrado no exemplo a seguir.

// Static partitioning requires indexable source. Load balancing
// can use any IEnumerable.
var nums = Enumerable.Range(0, 100000000).ToArray();

// Create a load-balancing partitioner. Or specify false for static partitioning.
Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

// The partitioner is the query's data source.
var q = from x in customPartitioner.AsParallel()
        select x * Math.PI;

q.ForAll((x) =>
{
    ProcessData(x);
});
' Static number of partitions requires indexable source.
Dim nums = Enumerable.Range(0, 100000000).ToArray()

' Create a load-balancing partitioner. Or specify false For  Shared partitioning.
Dim customPartitioner = Partitioner.Create(nums, True)

' The partitioner is the query's data source.
Dim q = From x In customPartitioner.AsParallel()
        Select x * Math.PI

q.ForAll(Sub(x) ProcessData(x))

A melhor maneira de determinar se é preciso usar o balanceamento de carga em qualquer cenário específico é testar e medir o tempo necessário para concluir operações em cargas representativas e configurações de computador. Por exemplo, o particionamento estático pode fornecer um aumento de velocidade significativo em um computador com vários núcleos que tenha apenas alguns núcleos, mas pode resultar em lentidão em computadores que têm relativamente muitos núcleos.

A tabela a seguir lista as sobrecargas disponíveis do método Create. Essas particionadores não estão limitados ao uso somente com PLINQ ou Task. Também podem ser usados com qualquer constructo paralelo personalizado.

Sobrecarga Usa o balanceamento de carga
Create<TSource>(IEnumerable<TSource>) Sempre
Create<TSource>(TSource[], Boolean) Quando o argumento Boolean é especificado como true
Create<TSource>(IList<TSource>, Boolean) Quando o argumento Boolean é especificado como true
Create(Int32, Int32) Nunca
Create(Int32, Int32, Int32) Nunca
Create(Int64, Int64) Nunca
Create(Int64, Int64, Int64) Nunca

Configurando particionadores de intervalo estático para Parallel.ForEach

Em um loop For, o corpo do loop é fornecido para o método como um representante. O custo de invocar esse representante é quase o mesmo que o de uma chamada de método virtual. Em alguns cenários, o corpo de um loop paralelo pode ser pequeno o suficiente para que o custo da invocação do representante em cada iteração do loop se torne significativo. Nessas situações, você pode usar uma das sobrecargas Create para criar um IEnumerable<T> de partições de intervalo sobre os elementos de origem. Em seguida, você pode passar essa coleção de intervalos para um método ForEach cujo corpo consiste em um loop for regular. A vantagem dessa abordagem é que o custo de invocação do representante é incorrido apenas uma vez por intervalo, em vez de uma vez por elemento. O exemplo a seguir demonstra o padrão básico.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {

        // Source must be array or IList.
        var source = Enumerable.Range(0, 100000).ToArray();

        // Partition the entire source array.
        var rangePartitioner = Partitioner.Create(0, source.Length);

        double[] results = new double[source.Length];

        // Loop over the partitions in parallel.
        Parallel.ForEach(rangePartitioner, (range, loopState) =>
        {
            // Loop over each range element without a delegate invocation.
            for (int i = range.Item1; i < range.Item2; i++)
            {
                results[i] = source[i] * Math.PI;
            }
        });

        Console.WriteLine("Operation complete. Print results? y/n");
        char input = Console.ReadKey().KeyChar;
        if (input == 'y' || input == 'Y')
        {
            foreach(double d in results)
            {
                Console.Write("{0} ", d);
            }
        }
    }
}
Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module PartitionDemo

    Sub Main()
        ' Source must be array or IList.
        Dim source = Enumerable.Range(0, 100000).ToArray()

        ' Partition the entire source array. 
        ' Let the partitioner size the ranges.
        Dim rangePartitioner = Partitioner.Create(0, source.Length)

        Dim results(source.Length - 1) As Double

        ' Loop over the partitions in parallel. The Sub is invoked
        ' once per partition.
        Parallel.ForEach(rangePartitioner, Sub(range, loopState)

                                               ' Loop over each range element without a delegate invocation.
                                               For i As Integer = range.Item1 To range.Item2 - 1
                                                   results(i) = source(i) * Math.PI
                                               Next
                                           End Sub)
        Console.WriteLine("Operation complete. Print results? y/n")
        Dim input As Char = Console.ReadKey().KeyChar
        If input = "y"c Or input = "Y"c Then
            For Each d As Double In results
                Console.Write("{0} ", d)
            Next
        End If

    End Sub
End Module

Cada thread no loop recebe seu próprio Tuple<T1,T2>, que contém os valores de índice inicial e final no subintervalo especificado. O loop interno for usa os valores fromInclusive e toExclusive para executar um loop sobre a matriz ou o IList diretamente.

Uma das sobrecargas Create permite especificar o tamanho de partições e o número delas. Essa sobrecarga pode ser usada em cenários em que o trabalho por elemento é tão baixo que até mesmo uma chamada de método virtual por elemento tem impacto significativo sobre o desempenho.

Particionadores Personalizados

Em alguns cenários, talvez seja útil ou até mesmo necessário implementar seu próprio particionador. Por exemplo, você pode ter uma classe de coleção personalizada que pode particionar de forma mais eficiente do que os particionadores padrão, com base em seu conhecimento sobre a estrutura interna da classe. Ou, talvez, você queira criar partições de intervalo de tamanhos variados com base em seu conhecimento de quanto tempo levará para processar elementos em locais diferentes na coleção de origem.

Para criar um particionador personalizado básico, derive uma classe de System.Collections.Concurrent.Partitioner<TSource> e substitua os métodos virtuais, conforme descrito na tabela a seguir.

Método Descrição
GetPartitions Esse método é chamado uma vez pelo thread principal e retorna um IList(IEnumerator(TSource)). Cada thread de trabalho na consulta ou no loop pode chamar GetEnumerator na lista para recuperar um IEnumerator<T> por uma partição distinta.
SupportsDynamicPartitions Retorne true se você implementar GetDynamicPartitions; caso contrário, false.
GetDynamicPartitions Se SupportsDynamicPartitions for true, esse método poderá ser opcionalmente chamado, em vez de GetPartitions.

Se os resultados precisarem ser classificáveis ou exigirem acesso indexado aos elementos, derive de System.Collections.Concurrent.OrderablePartitioner<TSource> e substitua seus métodos virtuais, conforme descrito na tabela a seguir.

Método Descrição
GetPartitions Esse método é chamado uma vez pelo thread principal e retorna um IList(IEnumerator(TSource)). Cada thread de trabalho na consulta ou no loop pode chamar GetEnumerator na lista para recuperar um IEnumerator<T> por uma partição distinta.
SupportsDynamicPartitions Retorne true se você implementar GetDynamicPartitions; caso contrário, false.
GetDynamicPartitions Normalmente, isso apenas chama GetOrderableDynamicPartitions.
GetOrderableDynamicPartitions Se SupportsDynamicPartitions for true, esse método poderá ser opcionalmente chamado, em vez de GetPartitions.

A tabela a seguir fornece detalhes adicionais sobre como os três tipos de particionadores de balanceamento de carga implementam a classe OrderablePartitioner<TSource>.

Método/propriedade IList/matriz sem balanceamento de carga IList/matriz com balanceamento de carga IEnumerable
GetOrderablePartitions Usa o particionamento por intervalos Usa o particionamento por partes otimizado para Listas para o partitionCount especificado Usa o particionamento por partes por meio da criação de um número estático de partições.
OrderablePartitioner<TSource>.GetOrderableDynamicPartitions Gera uma exceção sem suporte Usa o particionamento por partes otimizado para Listas e partições dinâmicas Usa o particionamento por partes por meio da criação de um número dinâmico de partições.
KeysOrderedInEachPartition Retorna true Retorna true Retorna true
KeysOrderedAcrossPartitions Retorna true Retorna false Retorna false
KeysNormalized Retorna true Retorna true Retorna true
SupportsDynamicPartitions Retorna false Retorna true Retorna true

Partições dinâmicas

Se desejar que o particionador seja usado em um método ForEach, você deverá ser capaz de retornar um número dinâmico de partições. Isso significa que o particionador pode fornecer um enumerador para uma nova partição sob demanda a qualquer momento durante a execução do loop. Basicamente, sempre que o loop adicionar uma nova tarefa paralela, solicitará uma nova partição para a tarefa. Se for preciso que os dados sejam ordenáveis, derive System.Collections.Concurrent.OrderablePartitioner<TSource> para que seja atribuído um índice exclusivo a cada item em cada partição.

Para saber mais e obter um exemplo, confira Como implementar partições dinâmicas.

Contrato para particionadores

Ao implementar um particionador personalizado, siga estas diretrizes para garantir a interação correta com PLINQ e ForEach na TPL:

  • Se GetPartitions for chamado com um argumento de zero ou menos para partitionsCount, gere ArgumentOutOfRangeException. Embora o PLINQ e o TPL nunca passem um partitionCount igual a 0, mesmo assim, recomendamos que você proteja contra a possibilidade.

  • GetPartitions e GetOrderablePartitions devem retornar sempre partitionsCount partições. Se o particionador ficar sem dados e não puder criar tantas partições quantas foram solicitadas, o método deverá retornar um enumerador vazio para cada uma das partições restantes. Caso contrário, o PLINQ e o TPL gerarão um InvalidOperationException.

  • GetPartitions, GetOrderablePartitions, GetDynamicPartitions e GetOrderableDynamicPartitions nunca devem retornar null (Nothing no Visual Basic). Se isso ocorrer, PLINQ/TPL gerará um InvalidOperationException.

  • Os métodos que retornam partições devem retornar sempre partições que podem enumerar total e exclusivamente a fonte de dados. Não deve haver nenhuma duplicação na fonte de dados nem itens ignorados, a menos que isso seja especificamente necessário para o design do particionador. Se essa regra não for seguida, a ordem de saída poderá ser embaralhada.

  • Os seguintes getters boolianos devem sempre retornar com precisão os seguintes valores para que a ordem de saída não seja embaralhada:

    • KeysOrderedInEachPartition: cada partição retorna elementos com índices de chave crescentes.

    • KeysOrderedAcrossPartitions: para todas as partições que são retornadas, os índices de chave na partição i são maiores do que os índices de chave na partição i-1.

    • KeysNormalized: todos os índices de chave aumentam de forma monotônica sem lacunas, a partir de zero.

  • Todos os índices devem ser exclusivos. Não pode haver índices duplicados. Se essa regra não for seguida, a ordem de saída poderá ser embaralhada.

  • Todos os índices devem ser não negativos. Se essa regra não for seguida, PLINQ/TPL poderão gerar exceções.

Confira também