Load-balancing partitioner with work stealing, part one
Data parallelism is a computing parallelization technique where data can be separated into independent pieces and distributed across parallel computing nodes. This technique is the core of the Parallel LINQ that partitions data into segments and executes query on each segment in parallel. Depending on scenarios and expected workload distribution different partitioning schemes (I highly recommend to read linked post before reading further) can be employed that can be essentially divided into two types:
- Static where partitioning is done up front which is quite simple but may perform poorly when workload distribution is not even
- Dynamic where partition is done on demand by providing chunks of work to idle workers which better deals with uneven workload distribution
Both types has one thing in common which is once chunk of work is taken by worker it is never given back until processed meaning worker has exclusive responsibility for it. In case of uneven workload distribution it may lead to poor performance which is the performance of the slowest worker. For example, when most of the heavy work is handed out to a single worker even though all other workers are finished in parallel the overall computation is not finished until unlucky worker will finish executing heavy work sequentially.
In order to deal with uneven workload distribution work stealing can be used. Joe Duffy explores in great details of how to build custom thread pool that uses work stealing to balance workload among workers. The approach allows to steal one work item which is in most cases sufficient as work item usually produces other work items and so chances are that idle worker once processed stolen work item will have more work to do (otherwise it can go steal other work item if any).
In data parallelism scenarios stealing a chunk of work items in one shot may be more beneficial (to avoid high synchronization costs). Work stealing benefits from initial work distribution compared to having all work handed over to a single worker and let the others to steal from it. Thus static partitioning with work stealing of chunks of work items is what we are looking for. Static partitioning assumes work space size knowledge which in most cases there.
Parallel LINQ uses partitioner concept to abstract partitioning mechanism. Before developing custom partitioner (this will be part two of the series) work stealing part must be in place:
- Work space is known in advance, it is not growing over time and provides indexed access
- Thieves must be able to steal work items in contiguous chunks ideally half of the available items
As work space is known in advance it is represented through a range of integer values. So basically indexes will be the subject rather than elements themselves as it quite easy to map to the actual elements. Range will be accessed from lower bound by the owner of the range and every time will try to take one index. Higher bound of the range is represented basically by other range called steal range. It defines bounds of indexes that are eligible for stealing. Thieves will contend for the steal range with each other and with owner in case the very last item is in the steal range. Essentially work space can be looked at as [low .. [mid .. high)) where [mid .. high) is a steal range and [low .. high) is overall work space.
Stealing is the fate of idle workers and thus they take the burden letting owner be ignorant of their presence until they are too close:
- Load available steal range and lower bound
- If steal range falls behind meaning no more items left return value that indicates unsuccessful steal attempt
- Otherwise construct new steal range and attempt to atomically compare and swap it
- If succeeded observed range was stolen
- otherwise either other thief succeeded; or the owner if the range contained the very last item; or owner updated steal range as between the moment thief observed steal range and now owner consumed a lot of items and is close to steal range
Owner must be able to take one item at a time without heavy synchronization as follows:
- Reserve item at the lower bound by advancing it by one and once the item is reserved it cannot be reached unless it is in the steal range
- Load available steal range; the order is really important otherwise due to reordering the same item can stolen and taken by owner
- If reserved lower bound item is not in observed steal range
- If steal range is too close try to update to a smaller one and don't worry if unsuccessfully as next steal or local take will make right
- Return reserved item as successfully taken
- If reserved item in the steal range meaning this the very last one and so contend for it with thieves
- Try to atomically compare and swap to a steal range that falls behind to indicate no more items left
- If succeeded return reserved item as successfully taken
- Otherwise lost the race so return value that indicates unsuccessful take attempt
- Otherwise the last item was stolen before owner even contended for it
Now that algorithm is in place here is the implementation.
class WorkStealingRange
{
// Holds range that is available for stealing
private volatile Tuple<int, int> m_stealRange;
// Holds index next to be taken locally
private volatile int m_low;
public WorkStealingRange(int low, int high)
{
m_low = low;
m_stealRange = CreateStealRange(low, high);
}
// tries to steal range of items
public Tuple<int, int> TryStealRange()
{
// Contend for available steal range
var oldRange = m_stealRange;
var mid = oldRange.Item1;
var low = m_low;
// If steal range is behind lower bound it means no
// work items left
if (low > mid)
// Return null to indicate failed steal attempt
return null;
// Calculate new steal range that will replace current
// in case of success
var newRange = CreateStealRange(low, mid);
// Contend with other thieves and owner (in case steal
// range consists of the single last item)
if (Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange) != oldRange)
// Lost the race so indicate failed steal attempt
return null;
// Won contention for the steal range
return oldRange;
}
// Tries to take one item locally
public int? TryTakeOne()
{
var low = m_low;
// Reserve item using exchange to avoid legal
// reordering with steal range read below
Interlocked.Exchange(ref m_low, low + 1);
// Now that the lowest element is reserved it is either
// not avaible to thieves or it is the last one and
// is in steal range
var oldRange = m_stealRange;
var mid = oldRange.Item1;
// If observed non empty steal range that doesn't
// contain reserved item it safe to return it as
// nobody can reach reserved item now
if (low < mid)
{
var high = oldRange.Item2;
// If ahead not enough space in particular at least
// two times of observed steal range attempt to
// adjust steal range to prevent stealing more than
// half of items
if (mid - low <= 2 * (high - mid))
{
// Try to make steal range 1/4 of available work
// space
var newRange = CreateStealRange(low, high);
// Don't worry if failed as next steal or local
// take will fix it
Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange);
}
// Return reserved item as it is not reachable
// by thieves
return low;
}
// If observed steal range contains reserved item contend
// for it with thieves
if (low == mid)
{
// Create new range that falls behind to indicate
// termination
var newRange = CreateStealRange(low, low);
// Otherwise steal range contains only reserved item
// and must contend with the thieves for it
if (Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange) != oldRange)
// Lost the race, return null to indicate no
// more items available
return null;
// Won contention for the last item
return low;
}
// No luck last item was stolen
return null;
}
private static Tuple<int, int> CreateStealRange(int low, int high)
{
// If range is not empty create new one that is
// 1/4 of the available space
if (low != high)
return new Tuple<int, int>((low + 3 * high) / 4, high);
// Otherwise create empty range that falls behind
return new Tuple<int, int>(low - 1, low - 1);
}
}
Next time custom partitioner that uses work stealing range on the surgical table.