F# Array.Parallel sort functions demonstrating a Merge Sort using Barrier
If you follow the excellent Parallel Programing with .Net blog, you will have read a recent post by Emad Omara demonstrating a Parallel Merge Sort using Barrier. While there may be more efficient parallel sorting options, as this post notes, this is a good demonstration of the usage of a Barrier, and presents a reasonable parallel sorting solution. As such I thought it would be useful to present this code in F#.
Before going into the code in detail here are some comparisons of the F# parallel sorting performance on my quad core laptop, for an array of 5 million floats.
Operation | Sort Time (seconds) |
Array.Parallel.sortInPlace | 0.702001 |
Array.sortInPlace | 1.778403 |
Array.Parallel.sort | 0.780001 |
Array.sort | 1.794003 |
Array.Parallel.sortInPlaceWith | 1.903203 |
Array.sortInPlaceWith | 5.538010 |
Array.Parallel.sortInPlaceBy | 0.795601 |
Array.sortInPlaceBy | 1.794003 |
As you can see the parallel sort performance is as stated quite reasonable; at least doubling the base sort performance. The code to be demonstrated will provide an implementation for all six operations; sort, sortInPlace, sortBy, sortInPlaceBy, sortWith, and, sortInPlaceWith.
A quick point about the original implementation. In the original code it is the original array that is sorted. F# sort operations support sorts that return a new array and sort that operate InPlace. As such the InPlace performance will be slightly faster. The reason for this is that for the non-InPlace version the original array is copied into a secondary array, which is sorted InPlace.
So without further adieu, what does the code look like? Here is a complete listing:
- namespace MSDN.FSharp.Parallel
- open System
- open System.Threading.Tasks
- open System.Threading
- type private MergeArrayType =
- | FromArray
- | ToArray
- type ParallelSort() =
- static member public Sort(array: 'T []) =
- let arraySort = Array.copy array
- ParallelSort.SortInPlaceInternal(arraySort)
- arraySort
- static member public SortBy(array: 'T [], projection: 'T -> 'Key) =
- let arraySort = Array.copy array
- ParallelSort.SortInPlaceInternal(array, projection = projection)
- arraySort
- static member public SortWith(array: 'T [], comparer: 'T -> 'T -> int) =
- let arraySort = Array.copy array
- ParallelSort.SortInPlaceInternal(array, comparer = comparer)
- arraySort
- static member public SortInPlace(array: 'T []) =
- ParallelSort.SortInPlaceInternal(array)
- static member public SortInPlaceBy(array: 'T [], projection: 'T -> 'Key) =
- ParallelSort.SortInPlaceInternal(array, projection = projection)
- static member public SortInPlaceWith(array: 'T [], comparer: 'T -> 'T -> int) =
- ParallelSort.SortInPlaceInternal(array, comparer = comparer)
- // Private function that is used to control the sorting
- static member private SortInPlaceInternal(array: 'T [], ?comparer: 'T -> 'T -> int, ?projection: 'T -> 'Key) =
- // used to do the merge and sort comparisions
- let sortComparer =
- match comparer with
- | Some c -> ComparisonIdentity.FromFunction c
- | _ -> ComparisonIdentity.Structural<'T>
- let projectionComparer = ComparisonIdentity.Structural<'Key>
- let inline sortComparerResult (item1: 'T) (item2: 'T) =
- match projection with
- | Some p -> projectionComparer.Compare(p item1, p item2)
- | None -> sortComparer.Compare(item1, item2)
- // The merge of the two array
- let merge (toArray: 'T []) (fromArray: 'T []) (low1: int) (low2: int) (high1: int) (high2: int) =
- let mutable ptr1 = low1
- let mutable ptr2 = high1
- for ptr in low1..high2 do
- if (ptr1 > low2) then
- toArray.[ptr] <- fromArray.[ptr2]
- ptr2 <- ptr2 + 1
- elif (ptr2 > high2) then
- toArray.[ptr] <- fromArray.[ptr1]
- ptr1 <- ptr1 + 1
- elif ((sortComparerResult fromArray.[ptr1] fromArray.[ptr2]) <= 0) then
- toArray.[ptr] <- fromArray.[ptr1]
- ptr1 <- ptr1 + 1
- else
- toArray.[ptr] <- fromArray.[ptr2]
- ptr2 <- ptr2 + 1
- // define the sort operation
- let parallelSort (array: 'T []) =
- // control flow parameters
- let totalWorkers = int (2.0 ** float (int (Math.Log(float Environment.ProcessorCount, 2.0))))
- let auxArray : 'T array = Array.zeroCreate array.Length
- let workers : Task array = Array.zeroCreate (totalWorkers - 1)
- let iterations = int (Math.Log((float totalWorkers), 2.0))
- // define a key array if needed for sorting on a projection
- let keysArray =
- match projection with
- | Some p -> Array.init array.Length (fun idx -> p array.[idx])
- | None -> [||]
- // Number of elements for each array, if the elements number is not divisible by the workers
- // the remainders will be added to the first worker (the main thread)
- let partitionSize = ref (int (array.Length / totalWorkers))
- let remainder = array.Length % totalWorkers
- // Define the arrays references for processing as they are swapped during each iteration
- let swapped = ref false
- let inline getMergeArray (arrayType: MergeArrayType) =
- match (arrayType, !swapped) with
- | (FromArray, true) -> auxArray
- | (FromArray, false) -> array
- | (ToArray, true) -> array
- | (ToArray, false) -> auxArray
- use barrier = new Barrier(totalWorkers, fun (b) ->
- partitionSize := !partitionSize <<< 1
- swapped := not !swapped)
- // action to perform the sort an merge steps
- let action (index: int) =
- //calculate the partition boundary
- let low = index * !partitionSize + match index with | 0 -> 0 | _ -> remainder
- let high = (index + 1) * !partitionSize - 1 + remainder
- // Sort the specified range - could implement QuickSort here
- let sortLen = high - low + 1
- match (comparer, projection) with
- | (Some _, _) -> Array.Sort(array, low, sortLen, sortComparer)
- | (_, Some p) -> Array.Sort(keysArray, array, low, sortLen)
- | (_, _) -> Array.Sort(array, low, sortLen)
- barrier.SignalAndWait()
- let rec loopArray loopIdx actionIdx loopHigh =
- if loopIdx < iterations then
- if (actionIdx % 2 = 1) then
- barrier.RemoveParticipant()
- else
- let newHigh = loopHigh + !partitionSize / 2
- merge (getMergeArray FromArray) (getMergeArray ToArray) low loopHigh (loopHigh + 1) newHigh
- barrier.SignalAndWait()
- loopArray (loopIdx + 1) (actionIdx >>> 1) newHigh
- loopArray 0 index high
- for index in 1 .. workers.Length do
- workers.[index - 1] <- Task.Factory.StartNew(fun() -> action index)
- action 0
- // if odd iterations return auxArray otherwise array (swapped will be false)
- if not (iterations % 2 = 0) then
- Array.blit auxArray 0 array 0 array.Length
- // Perform the sorting
- match array with
- | [||] -> failwith "Empty Array"
- | small when small.Length < (Environment.ProcessorCount * 2) ->
- match (comparer, projection) with
- | (Some c, _) -> Array.sortInPlaceWith c array
- | (_, Some p) -> Array.sortInPlaceBy p array
- | (_, _) -> Array.sortInPlace array
- | _ -> parallelSort array
As you can see, the internal method SortInPlaceInternal is the actual implementation of the sort. The remaining members deal with settings parameters for calling this function, based on the sort options.
Whereas the nature of this sort is identical to the original implementation, there are some subtle differences. These mostly dealing with comparison, optional projection for the sortBy, and array references.
Firstly it is worth talking about the implementation of the Barrier and array references. In this Barrier implementation the partition size is decreased (as before) and a flag is set indicating that a swap has occurred. So what does this mean? The swapped flag is used to determine the direction of the merge operation after the initial parallelized sorts. When a merge is performed the From and To arrays are determined function in which the returned array is determined by this swapped flag:
let inline getMergeArray (arrayType: MergeArrayType) =
match (arrayType, !swapped) with
| (FromArray, true) -> auxArray
| (FromArray, false) -> array
| (ToArray, true) -> array
| (ToArray, false) -> auxArray
So why do this? The rational behind this was so that the array references between merge passes did not have to change. Instead the merge operation just gathers a reference to the appropriate array.
As mentioned one of the big differences in this implementations is the processing of comparisons and the optional projection for sortBy operations.
During the sort process there are 2 comparisons that take place. The first is that for the initial Array.Sort, and the second is that used for the merge steps. The reason for this distinction is the use of Array.Sort for sorting the sections of the array in Parallel. Array.Sort supports the optional use of an optional IComparer(T) generic interface. Thus when a comparer is specified it has to be converted to this interface. Luckily F# makes this easy:
let sortComparer =
match comparer with
| Some c -> ComparisonIdentity.FromFunction c
| _ -> ComparisonIdentity.Structural<'T>
When dealing with the optional projection for sortBy one could similarly construct an object implementing IComparer using a comparer like:
let pCompare a b = compare (projection a) (projection b)
let pComparer = ComparisonIdentity.FromFunction pCompare
However, for sorting performance, I found a more efficient approach was to define an array containing the projected keys for the array. This allows the usage of the override for Array.Sort that takes a set of keys, in addition to the array.
Thus combing both the comparer and projection requirements for sorting, each parallel range is sorted with the following:
let sortLen = high - low + 1
match (comparer, projection) with
| (Some _, _) -> Array.Sort(array, low, sortLen, sortComparer)
| (_, Some p) -> Array.Sort(keysArray, array, low, sortLen)
| (_, _) -> Array.Sort(array, low, sortLen)
For merging, if possible structural comparisons are used. Thus the comparison of array elements is defined using the following:
let projectionComparer = ComparisonIdentity.Structural<'Key>
let inline sortComparerResult (item1: 'T) (item2: 'T) =
match projection with
| Some p -> projectionComparer.Compare(p item1, p item2)
| None -> sortComparer.Compare(item1, item2)
With the internal sorting process defined all that remains is to actually perform the sort:
match array with
| [||] -> failwith "Empty Array"
| small when small.Length < (Environment.ProcessorCount * 2) ->
match (comparer, projection) with
| (Some c, _) -> Array.sortInPlaceWith c array
| (_, Some p) -> Array.sortInPlaceBy p array
| (_, _) -> Array.sortInPlace array
| _ -> parallelSort array
As in the previous example an optimization is in place to ensure that small arrays are sorted using the base sort operations. One can however configure this to suit as necessary; as one can also configure the total worker threads.
As one of objectives of this exercise was to provide sort operations on the Array.Parallel module, a few extensions are defined:
module Array =
module Parallel =
let sort (array: 'T []) =
ParallelSort.Sort(array)
let sortBy (projection: 'T -> 'Key) (array: 'T []) =
ParallelSort.SortBy(array, projection)
let sortWith (comparer: 'T -> 'T -> int) (array: 'T []) =
ParallelSort.SortWith(array, comparer)
let sortInPlace (array: 'T []) =
ParallelSort.SortInPlace(array)
let sortInPlaceBy (projection: 'T -> 'Key) (array: 'T []) =
ParallelSort.SortInPlaceBy(array, projection)
let sortInPlaceWith (comparer: 'T -> 'T -> int) (array: 'T []) =
ParallelSort.SortInPlaceWith(array, comparer)
These definitions allow one to perform parallel sort operations as one would do for base sorting:
array
|> Array.Parallel.sortInPlace
So if you have the need to perform parallel sort operation hopefully this will get you off the ground. If you want to see the code run, here is an fsx source file definition that I have used for testing.
// This file is a script that can be executed with the F# Interactive.
#load "ParallelMergeSort.fs"
open System
open MSDN.FSharp.Parallel
// Sort which runs a serial
let items0 = [| 9; 7; 5; 3; 1 |]
items0
|> Array.Parallel.sortInPlace
printfn "Serial: %A" items0
// Sort simple collection of numbers
let items1 = [| 10000 .. -1 .. 1 |]
items1
|> Array.Parallel.sort
|> printfn "Simple New: %A"
items1
|> Array.Parallel.sortInPlace
printfn "Simple In Place: %A" items1
// Base parallel sort test
let items2 = [| for f in 0.0 .. 0.1 .. 100.0 -> sin f |]
items2
|> Array.Parallel.sortInPlace
printfn "Sorted: %A" items2
// Parallel sort with a projection
let items3 = [| for f in 0.0 .. 0.1 .. 100.0 -> sin f |]
items3
|> Array.Parallel.sortInPlaceBy (fun item -> abs item)
printfn "Sorted ABS: %A" items3
// Some 5 million item array performance testing
#load "ParallelMergeSort.fs"
open System
open MSDN.FSharp.Parallel
let rnd = System.Random();
let recordTime func =
GC.Collect(GC.MaxGeneration)
GC.WaitForFullGCComplete() |> ignore
GC.WaitForPendingFinalizers()
let started = DateTime.Now
func()
DateTime.Now - started
let writeTime (message:string) (sortCount:int) (timespan : TimeSpan) =
printfn "%s: Sort took %f seconds : Element count = %i" message timespan.TotalSeconds sortCount
let itemsBase = [| for f in 0 .. 1 .. 5000000 -> (rnd.NextDouble() - 0.8) * 1000.0 |]
// Base sort
let items8 = Array.copy itemsBase
let items9 = Array.copy itemsBase
recordTime (fun () ->
items8
|> Array.Parallel.sortInPlace)
|> writeTime "ParallelInPlace" 5000000
recordTime (fun () ->
items9
|> Array.sortInPlace)
|> writeTime "SequentialInPlace" 5000000
// Base sort new array
let items8n = Array.copy itemsBase
let items9n = Array.copy itemsBase
recordTime (fun () ->
items8n
|> Array.Parallel.sort
|> ignore)
|> writeTime "Parallel" 5000000
recordTime (fun () ->
items9n
|> Array.sort
|> ignore)
|> writeTime "Sequential" 5000000
// With sort
let items8w = Array.copy itemsBase
let items9w = Array.copy itemsBase
recordTime (fun () ->
items8w
|> Array.Parallel.sortInPlaceWith compare)
|> writeTime "ParallelInPlaceWith" 5000000
recordTime (fun () ->
items9w
|> Array.sortInPlaceWith compare)
|> writeTime "SequentialInPlaceWith" 5000000
// By sort
let items8b = Array.copy itemsBase
let items9b = Array.copy itemsBase
recordTime (fun () ->
items8b
|> Array.Parallel.sortInPlaceBy (fun item -> abs item))
|> writeTime "ParallelInPlaceBy" 5000000
recordTime (fun () ->
items9b
|> Array.sortInPlaceBy (fun item -> abs item))
|> writeTime "SequentialInPlaceBy" 5000000
In future posts I will look at QuickSort options in more detail.