如何:实现静态分区程序

下面的示例展示了一种为执行静态分区的 PLINQ 实现简单自定义分区程序的方法。 由于分区程序不支持动态分区,因此无法通过 Parallel.ForEach 使用它。 对于每个元素需要越来越多处理时间的数据源,此分区程序可能会让速度提升(与默认范围分区程序相比)。

示例

// A static range partitioner for sources that require
// a linear increase in processing time for each succeeding element.
// The range sizes are calculated based on the rate of increase
// with the first partition getting the most elements and the
// last partition getting the least.
class MyPartitioner : Partitioner<int>
{
    int[] source;
    double rateOfIncrease = 0;

    public MyPartitioner(int[] source, double rate)
    {
        this.source = source;
        rateOfIncrease = rate;
    }

    public override IEnumerable<int> GetDynamicPartitions()
    {
        throw new NotImplementedException();
    }

    // Not consumable from Parallel.ForEach.
    public override bool SupportsDynamicPartitions
    {
        get
        {
            return false;
        }
    }

    public override IList<IEnumerator<int>> GetPartitions(int partitionCount)
    {
        List<IEnumerator<int>> _list = new List<IEnumerator<int>>();
        int end = 0;
        int start = 0;
        int[] nums = CalculatePartitions(partitionCount, source.Length);

        for (int i = 0; i < nums.Length; i++)
        {
            start = nums[i];
            if (i < nums.Length - 1)
                end = nums[i + 1];
            else
                end = source.Length;

            _list.Add(GetItemsForPartition(start, end));

            // For demonstration.
            Console.WriteLine("start = {0} b (end) = {1}", start, end);
        }
        return (IList<IEnumerator<int>>)_list;
    }
    /*
     *
     *
     *                                                               B
      // Model increasing workloads as a right triangle           /  |
         divided into equal areas along vertical lines.         / |  |
         Each partition  is taller and skinnier               /   |  |
         than the last.                                     / |   |  |
                                                          /   |   |  |
                                                        /     |   |  |
                                                      /  |    |   |  |
                                                    /    |    |   |  |
                                            A     /______|____|___|__| C
     */
    private int[] CalculatePartitions(int partitionCount, int sourceLength)
    {
        // Corresponds to the opposite side of angle A, which corresponds
        // to an index into the source array.
        int[] partitionLimits = new int[partitionCount];
        partitionLimits[0] = 0;

        // Represent total work as rectangle of source length times "most expensive element"
        // Note: RateOfIncrease can be factored out of equation.
        double totalWork = sourceLength * (sourceLength * rateOfIncrease);
        // Divide by two to get the triangle whose slope goes from zero on the left to "most"
        // on the right. Then divide by number of partitions to get area of each partition.
        totalWork /= 2;
        double partitionArea = totalWork / partitionCount;

        // Draw the next partitionLimit on the vertical coordinate that gives
        // an area of partitionArea * currentPartition.
        for (int i = 1; i < partitionLimits.Length; i++)
        {
            double area = partitionArea * i;

           // Solve for base given the area and the slope of the hypotenuse.
            partitionLimits[i] = (int)Math.Floor(Math.Sqrt((2 * area) / rateOfIncrease));
        }
        return partitionLimits;
    }

    IEnumerator<int> GetItemsForPartition(int start, int end)
    {
        // For demonstration purposes. Each thread receives its own enumerator.
        Console.WriteLine("called on thread {0}", Thread.CurrentThread.ManagedThreadId);
        for (int i = start; i < end; i++)
            yield return source[i];
    }
}

class Consumer
{
    public static void Main2()
    {
        var source = Enumerable.Range(0, 10000).ToArray();

        Stopwatch sw = Stopwatch.StartNew();
        MyPartitioner partitioner = new MyPartitioner(source, .5);

        var query = from n in partitioner.AsParallel()
                    select ProcessData(n);

        foreach (var v in query) { }
        Console.WriteLine("Processing time with custom partitioner {0}", sw.ElapsedMilliseconds);

        var source2 = Enumerable.Range(0, 10000).ToArray();

        sw = Stopwatch.StartNew();

        var query2 = from n in source2.AsParallel()
                    select ProcessData(n);

        foreach (var v in query2) { }
        Console.WriteLine("Processing time with default partitioner {0}", sw.ElapsedMilliseconds);
    }

    // Consistent processing time for measurement purposes.
    static int ProcessData(int i)
    {
        Thread.SpinWait(i * 1000);
        return i;
    }
}

此示例中的分区都基于每个元素的处理时间呈线性增加的假设。 实际上,按这种方式预测处理时间可能会很困难。 如果将静态分区程序与特定数据源结合使用,可以优化源的分区公式,也可以添加负载均衡逻辑,或使用区块分区方法(如如何:实现动态分区中所述)。

另请参阅