Partager via


Load-balancing partitioner with work stealing, part two

In part one work stealing range was introduced that allows stealing of work items in contiguous chunks up to half of available work space. Now is the time for the partitioner itself.

If you recall partitioning can be done either statically up front or dynamically on demand. As we are looking at the case of known work space size handing out chunk of work items on demand dynamically is the way to deal with uneven work distribution. Since work stealing is a load balancing mechanism itself static partitioning for initial workload distribution and as computation goes work stealing is used to balance it.

Now as the obvious stuff is out of the way there two important things to consider: how work stealing is done and what is the termination condition of the whole computation.

Initial static partitioning feeds workers with contiguous chunks of work items to process. Each worker wraps obtained chunk into work stealing range so that other workers if needed can steal from it and continues to take one item at a time until work stealing range is drained potentially with help from other workers. At this point worker accumulated number of processed items as it is required to do the proper termination detection. Worker tries to steal from others while returning back processed items (or basically marking them as processed) through partition list that serves as a coordinator of work stealing and termination detection. Stealing is attempted until succeeded or termination is detected. Upon success worker wraps stolen range and continues its processing as from the beginning.

Static partitioning assumes work space size knowledge. As work space doesn't grow over time (deliberate decision and if grow is needed it can be done in phases where work items from current phase result in work items for the next phase) it is the basis for termination detection. Initially work space size is set and every worker that encounters drained work stealing range returns back processed items count by decreasing remaining count. Once count reaches zero the processing must be terminated.

Here is work stealing range partitioner in the flesh.

 class WorkStealingIndexRangePartitioner : Partitioner<int>
{
    private readonly int m_fromInclusive;
    private readonly int m_toExclusive;

    public WorkStealingIndexRangePartitioner(int fromInclusive, int toExclusive)
    {
        if (fromInclusive >= toExclusive)
            throw new ArgumentException();

        m_fromInclusive = fromInclusive;
        m_toExclusive = toExclusive;
    }

    public override IList<IEnumerator<int>> GetPartitions(int partitionCount)
    {
        if (partitionCount <= 0)
            throw new ArgumentException();
        // Calculate range and partition size
        var rangeSize = m_toExclusive - m_fromInclusive;
        var partitionSize = Math.Max(1, rangeSize / partitionCount);

        var partitionList = new PartitionList(rangeSize);
        var partitions = new Partition[partitionCount];
        // Create partitiions by statically diving work items 
        // into even sized chunks which is ok even in case 
        // non uniform workload distribution as it will be 
        // balanced out through work stealing
        var from = m_fromInclusive;
        for (var i = 0; i < partitionCount; i++)
        {
            var to = Math.Min(from + partitionSize, m_toExclusive);
            partitions[i] = new Partition(partitionList, from, to);
            from = to;
        }
        // Wire them through a coordinator
        partitionList.SetPartitions(partitions);

        return partitions;
    }

    // Partitioning coordinator
    class PartitionList
    {
        // Holds list of available partitions
        private List<Partition> m_partitions;
        // Holds number of remaining items to process
        private int m_remainingCount;

        public PartitionList(int count)
        {
            m_remainingCount = count;
        }

        public void SetPartitions(IEnumerable<Partition> partitions)
        {
            m_partitions = new List<Partition>(partitions);
        }

        // Return number of items as processed and tries to steal 
        // new work items from other partitions
        public bool TryReturnAndSteal(Partition to, int count, out Tuple<int, int> range)
        {
            // Move toward termination condition 
            Interlocked.Add(ref m_remainingCount, -count);
            // Until either termination condition is 
            // reached or successful steal attempt
            range = null;
            while (true)
            {
                // Enumerate through available partitions and try 
                // to steal from them
                foreach (var from in m_partitions)
                {
                    // Check if nothing to steal
                    if (m_remainingCount <= 0)
                        return false;
                    // Skip requesting partition as it is empty
                    if (from == to)
                        continue;

                    range = from.TrySteal();
                    // Keep trying to steal from others if 
                    // unsuccessful
                    if (range != null)
                        return true;
                }
            }
        }
    }

    // Work stealing partition 
    class Partition : IEnumerator<int>
    {
        // Holds range items currently owned
        private WorkStealingRange m_workStealingRange;
        // Holds number of processed items
        private int m_localCount;
        // Holds reference to partitioning coordinator that 
        // controls in addition termination condition
        private readonly PartitionList m_list;
        // Holds current partition element or null if move next
        // was called or returned false
        private int? m_current;

        public Partition(PartitionList list, int fromInclusive, int toExclusive)
        {
            m_list = list;
            m_workStealingRange = new WorkStealingRange(fromInclusive, toExclusive);
        }

        public int Current
        {
            get
            {
                if (m_current != null)
                    return (int)m_current;

                throw new InvalidOperationException();
            }
        }

        object IEnumerator.Current
        {
            get { return Current; }
        }

        // Tries to steal from local steal range
        public Tuple<int, int> TrySteal()
        {
            return m_workStealingRange.TryStealRange();
        }

        public bool MoveNext()
        {
            // First try to take item local from available range
            var local = m_workStealingRange.TryTakeOne();
            if (local != null)
            {
                // Mark item as processed 
                m_localCount++;
                // and set up current item
                m_current = local;
                return true;
            }
            // Otherwise try to steal from others
            Tuple<int, int> range;
            if (m_list.TryReturnAndSteal(this, m_localCount, out range))
            {
                // Stolen something
                var from = range.Item1;
                var to = range.Item2;
                // Keep very first element to yourself
                m_localCount = 1;
                m_current = from++;
                // If anything left expose it to allow others
                // to steal
                if (to - from > 0)
                    m_workStealingRange = new WorkStealingRange(from, to);

                return true;
            }
            // Termination condition reached so nothing to steal 
            // from others
            return false;
        }

        public void Dispose()
        {
        }

        public void Reset()
        {
            throw new NotSupportedException();
        }
    }
}

Rough benchmarking shows that on random workload input it performs as well as standard partitioners for indexed collections while providing good performance for worst case scenario in workload distribution.