Поделиться через


Пользовательские разделители для PLINQ и TPL

Одним из основных шагов при распараллеливании операции в источнике данных является разделение источника на несколько секций, параллельный доступ к которым имеют сразу несколько потоков. PLINQ и Библиотека параллельных задач (TPL) по умолчанию предоставляют модули разделения, которые работают прозрачно при написании параллельного запроса или цикла ForEach. Для более сложных сценариев можно подключить собственный модуль разделения.

Типы разделения

Существует много способов разделения источника данных. В более рациональных решениях над выполнением исходной последовательности совместно работают несколько потоков, вместо физического разделения источника на несколько подпоследовательностей. Для массивов и других индексированных источников, например, коллекции IList, где длина заранее известна, разделение по диапазонам является самым простым типом разделения. Каждый поток получает уникальные индексы начала и конца для обработки диапазона источника без перезаписи источника, а также без перезаписи потока другим потоком. Только служебные данные, вовлеченные в разделение по диапазонам, служат началом процесса создания диапазонов; после этого никакая дополнительная синхронизация не требуется. Таким образом, хорошая производительность может быть обеспечена только в том случае, когда рабочая нагрузка распределена равномерно. Недостаток разделения по диапазонам состоит в том, что поток, завершивший обработку, не может помочь другим потокам.

Для связанных списков и других коллекций с неизвестной длиной можно использовать разделение по блокам. При разделении по блокам каждый поток или задача в параллельном цикле или запросе в каждом блоке использует некоторое количество исходных элементов, обрабатывает их, а затем возвращается к извлечению дополнительных элементов. Модуль разделения гарантирует распределение всех элементов и отсутствие их дублирования. Блок может иметь любой размер. Например, модуль разделения, показанный в разделе Практическое руководство. Реализация динамических разделов, создает блоки, содержащие только один элемент. Пока блоки небольшие, этот тип разделения по сути позволяет рационально распределить нагрузку, так как назначение элементов потокам не предопределено. Однако модуль разделения выполняет синхронизацию служебных данных каждый раз, при переходе потока к другому блоку. Объем синхронизации, выполняемой в данном случае, обратно пропорционален размеру блоков.

В общем, разделение по диапазонам выполняется быстрее в случае, когда период времени выполнения делегата находится в диапазоне от короткого до умеренного, источник содержит большое количество элементов, а общее количество усилий, затрачиваемых на обработку каждой секции, приблизительно одинаково. Таким образом, разделение по блокам в большинстве случаев ускоряет работу. В источниках с небольшим количеством элементов или длительным временем выполнения делегата производительность разделения по блокам и разделения по диапазонам приблизительно одинакова.

Модули разделения TPL также поддерживают динамичное количество разделов. Это означает, что они могут создавать разделы в реальном времени, например, когда цикл ForEach вызывает новую задачу. Эта функция позволяет модулю разделения масштабироваться вместе с циклом. Динамические модули разделения также служат для распределения нагрузки. При создании пользовательского модуля разделения, необходимо поддерживать динамическое разделение из цикла ForEach.

Настройка модулей распределения нагрузки для PLINQ

Несколько перегрузок метода Partitioner.Create позволяют создать модуль разделения для массива или источника IList, а также указать должен ли он предпринять попытку распределения рабочей нагрузки между потоками. Если модуль разделения настроен на распределение нагрузки, используется разделение по блокам, и элементы передаются в каждый раздел по запросу маленькими блоками. Такой подход гарантирует, что все разделы будут содержать элементы для обработки до тех пор, пока весь цикл или запрос не будет выполнен. Чтобы обеспечить разделение для распределения нагрузки для любого источника IEnumerable можно воспользоваться дополнительной перегрузкой.

Распределение нагрузки требует, чтобы разделы относительно часто запрашивали элементы из модуля разделения. Напротив, модуль разделения, осуществляющий статическое разделение, может назначить каждому модулю разделения сразу все элементы, используя диапазон или разделение по блокам. Для этого необходимо меньшее количество перегрузок, чем для распределения нагрузки, но может потребоваться больше времени для обработки, если один поток обрабатывает значительно большее количество задач, чем другие. По умолчанию, если передается IList или массив, PLINQ всегда использует разделение по диапазонам без распределения нагрузки. Для применения распределения нагрузки к PLINQ воспользуйтесь методом Partitioner.Create, как показано в следующем примере.

        ' Static number of partitions requires indexable source.
        Dim nums = Enumerable.Range(0, 100000000).ToArray()

        ' Create a load-balancing partitioner. Or specify false For  Shared partitioning.
        Dim customPartitioner = Partitioner.Create(nums, True)

        ' The partitioner is the query's data source.
        Dim q = From x In customPartitioner.AsParallel()
                Select x * Math.PI

        q.ForAll(Sub(x) ProcessData(x))

            // Static partitioning requires indexable source. Load balancing
            // can use any IEnumerable.
            var nums = Enumerable.Range(0, 100000000).ToArray();

            // Create a load-balancing partitioner. Or specify false for static partitioning.
            Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

            // The partitioner is the query's data source.
            var q = from x in customPartitioner.AsParallel()
                    select x * Math.PI;

            q.ForAll((x) =>
            {
                ProcessData(x);
            });

Лучшим способом для определения необходимости использования распределения нагрузки в любом исходном сценарии является проверка и измерение длительности выполнения операций при обычной загрузке и конфигурации компьютера. Например, статическое разделение может обеспечить значительное ускорение на многоядерном компьютере с небольшим количеством ядер, но на компьютере с относительно большим количеством ядер такое разделение может привести к замедлению обработки.

В следующей таблице перечислены доступные перегрузки метода Create. Эти модули распределения можно использовать не только для PLINQ или ForEach. Ими также можно воспользоваться в любой пользовательской параллельной конструкции.

Перегрузка

Использует распределение нагрузки

Create<TSource>(IEnumerable<TSource>)

Всегда

Create<TSource>(TSource[], Boolean)

Если логический аргумент имеет значение true

Create<TSource>(IList<TSource>, Boolean)

Если логический аргумент имеет значение true

Create(Int32, Int32)

Никогда

Create(Int32, Int32, Int32)

Никогда

Create(Int64, Int64)

Никогда

Create(Int64, Int64, Int64)

Никогда

Настройка статических модулей разделения по диапазону для Parallel.ForEach

В цикле For, тело цикла предоставляется методу в качестве делегата. Затраты ресурсов при вызове делегата приблизительно такие же, как и при вызове виртуального метода. В некоторых сценариях тело параллельного цикла может быть на столько маленьким, что затраты ресурсов на вызов делегата в каждой итерации цикла становятся значительными. В такой ситуации можно воспользоваться одной из перегрузок Create для создания IEnumerable<T> разделов по диапазонам из исходных элементов. Затем, коллекцию диапазонов можно передать методу ForEach, тело которого состоит из регулярного цикла for. Преимущество такого подхода состоит в том, что затраты ресурсов на вызов делегата происходят только один раз для каждого диапазона, а не один раз для каждого элемента. В следующем примере демонстрируется использование основного шаблона.

Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module PartitionDemo

    Sub Main()
        ' Source must be array or IList.
        Dim source = Enumerable.Range(0, 100000).ToArray()

        ' Partition the entire source array. 
        ' Let the partitioner size the ranges.
        Dim rangePartitioner = Partitioner.Create(0, source.Length)

        Dim results(source.Length - 1) As Double

        ' Loop over the partitions in parallel. The Sub is invoked
        ' once per partition.
        Parallel.ForEach(rangePartitioner, Sub(range, loopState)

                                               ' Loop over each range element without a delegate invocation.
                                               For i As Integer = range.Item1 To range.Item2 - 1
                                                   results(i) = source(i) * Math.PI
                                               Next
                                           End Sub)
        Console.WriteLine("Operation complete. Print results? y/n")
        Dim input As Char = Console.ReadKey().KeyChar
        If input = "y"c Or input = "Y"c Then
            For Each d As Double In results
                Console.Write("{0} ", d)
            Next
        End If

    End Sub
End Module
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {

        // Source must be array or IList.
        var source = Enumerable.Range(0, 100000).ToArray();

        // Partition the entire source array.
        var rangePartitioner = Partitioner.Create(0, source.Length);

        double[] results = new double[source.Length];

        // Loop over the partitions in parallel.
        Parallel.ForEach(rangePartitioner, (range, loopState) =>
        {
            // Loop over each range element without a delegate invocation.
            for (int i = range.Item1; i < range.Item2; i++)
            {
                results[i] = source[i] * Math.PI;
            }
        });

        Console.WriteLine("Operation complete. Print results? y/n");
        char input = Console.ReadKey().KeyChar;
        if (input == 'y' || input == 'Y')
        {
            foreach(double d in results)
            {
                Console.Write("{0} ", d);
            }           
        }
    }
}

Каждый поток цикла получает собственный Tuple<T1, T2>, содержащий значения индексов начала и конца указанного поддиапазона. Внутренний цикл for использует значения fromInclusive и toExclusive для применения цикла к массиву или непосредственно к IList.

Одна из перегрузок Create позволяет указать размер разделов и их количество. Такая перегрузка может использоваться в сценариях, в которых затраты ресурсов на обработку каждого элемента на столько незначительны, что даже один вызов виртуального метода на каждый элемент оказывает значительное влияние на производительность.

Пользовательские модули разделения

В некоторых сценариях следует или даже необходимо реализовать собственный модуль разделения. Например, пользовательский класс коллекции, на основании знания его внутренней структуры, можно разделить более эффективно, чем с помощью модулей разделения, установленных по умолчанию. Либо, при необходимости создания разделов по диапазонам разного размера, зная время, необходимое на обработку элементов, расположенных в разных местах исходной коллекции.

Чтобы создать базовый пользовательский модуль разделения, наследуйте класс от System.Collections.Concurrent.Partitioner<TSource> и переопределите виртуальные методы, как описано в следующей таблице.

GetPartitions

Этот метод вызывается основным потоком один раз и возвращает IList(IEnumerator(TSource)). Каждый рабочий поток цикла или запроса может вызвать GetEnumerator из списка для извлечения IEnumerator<T> из отдельного раздела.

SupportsDynamicPartitions

Возвращает true при реализации GetDynamicPartitions, в противном случае false.

GetDynamicPartitions

Если значение SupportsDynamicPartitions равно true, то этот метод может быть вызван вместо GetPartitions.

Если результаты должны быть сортируемыми или требуется индексный доступ к элементам, то наследуйте из System.Collections.Concurrent.OrderablePartitioner<TSource> и переопределите его виртуальные методы, как описано в следующей таблице.

GetPartitions

Этот метод вызывается основным потоком один раз и возвращает IList(IEnumerator(TSource)). Каждый рабочий поток цикла или запроса может вызвать GetEnumerator из списка для извлечения IEnumerator<T> из отдельного раздела.

SupportsDynamicPartitions

Возвращает true при реализации GetDynamicPartitions, в противном случае false.

GetDynamicPartitions

Обычно, это вызывает только GetOrderableDynamicPartitions.

GetOrderableDynamicPartitions

Если значение SupportsDynamicPartitions равно true, то этот метод может быть вызван вместо GetPartitions.

В следующей таблице приведены дополнительные сведения о реализации класса OrderablePartitioner<TSource> с помощью трех типов модулей разделения для распределения нагрузки.

Метод/свойство

IList/массив без распределения нагрузки

IList/массив с распределением нагрузки

IEnumerable

GetOrderablePartitions

Использует разделение по диапазонам

Использует разделение по блокам, оптимизированное для списков с указанным partitionCount

Использует разделение по блокам посредством создания статического количества разделов.

OrderablePartitioner<TSource>.GetOrderableDynamicPartitions

Выдает неподдерживаемое исключение

Использует разделение по блокам для списков и динамических разделов

Использует разделение по блокам посредством создания динамического количества разделов.

KeysOrderedInEachPartition

Возвращает true

Возвращает true

Возвращает true

KeysOrderedAcrossPartitions

Возвращает true

Возвращает false

Возвращает false

KeysNormalized

Возвращает true

Возвращает true

Возвращает true

SupportsDynamicPartitions

Возвращает false

Возвращает true

Возвращает true

Динамические разделы

Если модуль разделения планируется использовать в методе ForEach, должна быть возможность вернуть динамическое количество разделов. Это обозначает, что модуль разделения может в любое время в течение цикла по требованию предоставлять перечислитель для нового раздела. По существу, когда бы цикл не добавлял новую параллельную задачу, он для такой задачи запрашивает новый раздел. Если данные должны быть упорядоченными, наследуйте из System.Collections.Concurrent.OrderablePartitioner<TSource>, так чтобы каждому элементу в каждом разделе присваивался уникальный индекс.

Дополнительные сведения и примеры см. в разделе Практическое руководство. Реализация динамических разделов.

Контракт для модулей разделения

При реализации пользовательского модуля разделения следуйте следующим инструкциям для обеспечения правильного взаимодействия с PLINQ и ForEach в TPL:

  • Если GetPartitions вызывается со значением аргумента равным или меньше нуля для partitionsCount, вызовите ArgumentOutOfRangeException. Хотя PLINQ и TPL никогда не передают partitionCount со значением 0, тем не менее, рекомендуем защититься от такой возможности.

  • GetPartitions и GetOrderablePartitions всегда должны возвращать partitionsCount число разделов. Если модули разделения выполняются не зависимо от данных и не могут создавать требуемое количество разделов, то для каждого из оставшихся разделов метод должен возвращать пустой перечислитель. В противном случае, как PLINQ, так и TPL выдадут исключение InvalidOperationException.

  • GetPartitions, GetOrderablePartitions, GetDynamicPartitions и GetOrderableDynamicPartitions никогда не должны возвращать null (Nothing в Visual Basic). В противном случае, PLINQ / TPL выдаст исключение InvalidOperationException.

  • Методы, возвращающие разделы, всегда должны возвращать разделы, которые могут полностью и однозначно перечислить источник данных. В источнике данных не допускается дублирование ли пропущенные элементы, кроме особо необходимых для конструктора модуля разделения. При несоблюдении этого правила, может быть нарушен порядок выхода.

  • Следующие логические получатели всегда должны точно возвращать перечисленные ниже значения, чтобы не нарушать порядок выхода:

    • KeysOrderedInEachPartition. Каждый раздел возвращает элементы с возрастающими ключевыми индексами.

    • KeysOrderedAcrossPartitions. Для всех возвращенных разделов ключевые индексы в разделе i выше, чем ключевые индексы в разделе i-1.

    • KeysNormalized. Все ключевые индексы монотонно возрастают без перерывов, начиная с нуля.

  • Все индексы должны быть уникальными. Не допускается дублирование индексов. При несоблюдении этого правила, может быть нарушен порядок выхода.

  • Все индексы должны быть неотрицательными. При несоблюдении этого правила, PLINQ/TPL может выдать исключение.

См. также

Задачи

Практическое руководство. Реализация динамических разделов

Практическое руководство. Реализация разделителя со статическим числом разделов

Основные понятия

Параллельное программирование в .NET Framework