Jaa


F# Array.Parallel sort functions demonstrating a Merge Sort using Barrier

If you follow the excellent Parallel Programing with .Net blog, you will have read a recent post by Emad Omara demonstrating a Parallel Merge Sort using Barrier. While there may be more efficient parallel sorting options, as this post notes, this is a good demonstration of the usage of a Barrier, and presents a reasonable parallel sorting solution. As such I thought it would be useful to present this code in F#.

Before going into the code in detail here are some comparisons of the F# parallel sorting performance on my quad core laptop, for an array of 5 million floats.

Operation Sort Time (seconds)
Array.Parallel.sortInPlace 0.702001
Array.sortInPlace 1.778403
Array.Parallel.sort 0.780001
Array.sort 1.794003
Array.Parallel.sortInPlaceWith 1.903203
Array.sortInPlaceWith 5.538010
Array.Parallel.sortInPlaceBy 0.795601
Array.sortInPlaceBy 1.794003

As you can see the parallel sort performance is as stated quite reasonable; at least doubling the base sort performance. The code to be demonstrated will provide an implementation for all six operations; sort, sortInPlace, sortBy, sortInPlaceBy, sortWith, and, sortInPlaceWith.

A quick point about the original implementation. In the original code it is the original array that is sorted. F# sort operations support sorts that return a new array and sort that operate InPlace. As such the InPlace performance will be slightly faster. The reason for this is that for the non-InPlace version the original array is copied into a secondary array, which is sorted InPlace.

So without further adieu, what does the code look like? Here is a complete listing:

  1. namespace MSDN.FSharp.Parallel
  2.  
  3. open System
  4. open System.Threading.Tasks
  5. open System.Threading
  6.  
  7. type private MergeArrayType =
  8.     | FromArray
  9.     | ToArray
  10.  
  11. type ParallelSort() =    
  12.     
  13.     static member public Sort(array: 'T []) =
  14.         let arraySort = Array.copy array
  15.         ParallelSort.SortInPlaceInternal(arraySort)
  16.         arraySort
  17.     
  18.     static member public SortBy(array: 'T [], projection: 'T -> 'Key) =
  19.         let arraySort = Array.copy array
  20.         ParallelSort.SortInPlaceInternal(array, projection = projection)
  21.         arraySort
  22.  
  23.     static member public SortWith(array: 'T [], comparer: 'T -> 'T -> int) =
  24.         let arraySort = Array.copy array
  25.         ParallelSort.SortInPlaceInternal(array, comparer = comparer)
  26.         arraySort
  27.  
  28.     static member public SortInPlace(array: 'T []) =
  29.         ParallelSort.SortInPlaceInternal(array)
  30.     
  31.     static member public SortInPlaceBy(array: 'T [], projection: 'T -> 'Key) =
  32.         ParallelSort.SortInPlaceInternal(array, projection = projection)
  33.  
  34.     static member public SortInPlaceWith(array: 'T [], comparer: 'T -> 'T -> int) =
  35.         ParallelSort.SortInPlaceInternal(array, comparer = comparer)
  36.    
  37.     // Private function that is used to control the sorting
  38.     static member private SortInPlaceInternal(array: 'T [], ?comparer: 'T -> 'T -> int, ?projection: 'T -> 'Key) =
  39.  
  40.         // used to do the merge and sort comparisions
  41.         let sortComparer =
  42.             match comparer with
  43.             | Some c -> ComparisonIdentity.FromFunction c
  44.             | _ -> ComparisonIdentity.Structural<'T>
  45.  
  46.         let projectionComparer = ComparisonIdentity.Structural<'Key>
  47.  
  48.         let inline sortComparerResult (item1: 'T) (item2: 'T) =
  49.             match projection with
  50.             | Some p -> projectionComparer.Compare(p item1, p item2)
  51.             | None -> sortComparer.Compare(item1, item2)
  52.  
  53.         // The merge of the two array
  54.         let merge (toArray: 'T []) (fromArray: 'T []) (low1: int) (low2: int) (high1: int) (high2: int) =
  55.             let mutable ptr1 = low1
  56.             let mutable ptr2 = high1
  57.  
  58.             for ptr in low1..high2 do
  59.                 if (ptr1 > low2) then
  60.                     toArray.[ptr] <- fromArray.[ptr2]
  61.                     ptr2 <- ptr2 + 1
  62.                 elif (ptr2 > high2) then
  63.                     toArray.[ptr] <- fromArray.[ptr1]
  64.                     ptr1 <- ptr1 + 1
  65.                 elif ((sortComparerResult fromArray.[ptr1] fromArray.[ptr2]) <= 0) then
  66.                     toArray.[ptr] <- fromArray.[ptr1]
  67.                     ptr1 <- ptr1 + 1
  68.                 else
  69.                     toArray.[ptr] <- fromArray.[ptr2]
  70.                     ptr2 <- ptr2 + 1
  71.  
  72.         // define the sort operation
  73.         let parallelSort (array: 'T []) =              
  74.  
  75.             // control flow parameters
  76.             let totalWorkers = int (2.0 ** float (int (Math.Log(float Environment.ProcessorCount, 2.0))))
  77.             let auxArray : 'T array = Array.zeroCreate array.Length
  78.             let workers : Task array = Array.zeroCreate (totalWorkers - 1)
  79.             let iterations = int (Math.Log((float totalWorkers), 2.0))
  80.  
  81.             // define a key array if needed for sorting on a projection
  82.             let keysArray =
  83.                 match projection with
  84.                 | Some p -> Array.init array.Length (fun idx -> p array.[idx])
  85.                 | None -> [||]
  86.  
  87.             // Number of elements for each array, if the elements number is not divisible by the workers
  88.             // the remainders will be added to the first worker (the main thread)
  89.             let partitionSize = ref (int (array.Length / totalWorkers))
  90.             let remainder = array.Length % totalWorkers
  91.  
  92.             // Define the arrays references for processing as they are swapped during each iteration
  93.             let swapped = ref false
  94.  
  95.             let inline getMergeArray (arrayType: MergeArrayType) =
  96.                 match (arrayType, !swapped) with
  97.                 | (FromArray, true) -> auxArray
  98.                 | (FromArray, false) -> array
  99.                 | (ToArray, true) -> array
  100.                 | (ToArray, false) -> auxArray
  101.  
  102.             use barrier = new Barrier(totalWorkers, fun (b) ->
  103.                 partitionSize := !partitionSize <<< 1
  104.                 swapped := not !swapped)
  105.  
  106.             // action to perform the sort an merge steps
  107.             let action (index: int) =   
  108.                          
  109.                 //calculate the partition boundary
  110.                 let low = index * !partitionSize + match index with | 0 -> 0 | _ -> remainder
  111.                 let high = (index + 1) * !partitionSize - 1 + remainder
  112.  
  113.                 // Sort the specified range - could implement QuickSort here
  114.                 let sortLen = high - low + 1
  115.                 match (comparer, projection) with
  116.                 | (Some _, _) -> Array.Sort(array, low, sortLen, sortComparer)
  117.                 | (_, Some p) -> Array.Sort(keysArray, array, low, sortLen)
  118.                 | (_, _) -> Array.Sort(array, low, sortLen)
  119.  
  120.                 barrier.SignalAndWait()
  121.  
  122.                 let rec loopArray loopIdx actionIdx loopHigh =
  123.                     if loopIdx < iterations then                                  
  124.                         if (actionIdx % 2 = 1) then
  125.                             barrier.RemoveParticipant()  
  126.                         else
  127.                             let newHigh = loopHigh + !partitionSize / 2
  128.                             merge (getMergeArray FromArray) (getMergeArray ToArray) low loopHigh (loopHigh + 1) newHigh
  129.                             barrier.SignalAndWait()
  130.                             loopArray (loopIdx + 1) (actionIdx >>> 1) newHigh
  131.                 loopArray 0 index high
  132.  
  133.             for index in 1 .. workers.Length do
  134.                 workers.[index - 1] <- Task.Factory.StartNew(fun() -> action index)
  135.  
  136.             action 0
  137.  
  138.             // if odd iterations return auxArray otherwise array (swapped will be false)
  139.             if not (iterations % 2 = 0) then  
  140.                 Array.blit auxArray 0 array 0 array.Length
  141.  
  142.         // Perform the sorting
  143.         match array with
  144.         | [||] -> failwith "Empty Array"
  145.         | small when small.Length < (Environment.ProcessorCount * 2) ->
  146.             match (comparer, projection) with
  147.             | (Some c, _) -> Array.sortInPlaceWith c array
  148.             | (_, Some p) -> Array.sortInPlaceBy p array
  149.             | (_, _) -> Array.sortInPlace array
  150.         | _ -> parallelSort array

As you can see, the internal method SortInPlaceInternal is the actual implementation of the sort. The remaining members deal with settings parameters for calling this function, based on the sort options.

Whereas the nature of this sort is identical to the original implementation, there are some subtle differences. These mostly dealing with comparison, optional projection for the sortBy, and array references.

Firstly it is worth talking about the implementation of the Barrier and array references. In this Barrier implementation the partition size is decreased (as before) and a flag is set indicating that a swap has occurred. So what does this mean? The swapped flag is used to determine the direction of the merge operation after the initial parallelized sorts. When a merge is performed the From and To arrays are determined function in which the returned array is determined by this swapped flag:

let inline getMergeArray (arrayType: MergeArrayType) =
    match (arrayType, !swapped) with
    | (FromArray, true) -> auxArray
    | (FromArray, false) -> array
    | (ToArray, true) -> array
    | (ToArray, false) -> auxArray

So why do this? The rational behind this was so that the array references between merge passes did not have to change. Instead the merge operation just gathers a reference to the appropriate array.

As mentioned one of the big differences in this implementations is the processing of comparisons and the optional projection for sortBy operations.

During the sort process there are 2 comparisons that take place. The first is that for the initial Array.Sort, and the second is that used for the merge steps. The reason for this distinction is the use of Array.Sort for sorting the sections of the array in Parallel. Array.Sort supports the optional use of an optional IComparer(T) generic interface. Thus when a comparer is specified it has to be converted to this interface. Luckily F# makes this easy:

let sortComparer =
    match comparer with
    | Some c -> ComparisonIdentity.FromFunction c
    | _ -> ComparisonIdentity.Structural<'T>

When dealing with the optional projection for sortBy one could similarly construct an object implementing IComparer using a comparer like:

let pCompare a b = compare (projection a) (projection b)
let pComparer = ComparisonIdentity.FromFunction pCompare

However, for sorting performance, I found a more efficient approach was to define an array containing the projected keys for the array. This allows the usage of the override for Array.Sort that takes a set of keys, in addition to the array.

Thus combing both the comparer and projection requirements for sorting, each parallel range is sorted with the following:

let sortLen = high - low + 1
match (comparer, projection) with
| (Some _, _) -> Array.Sort(array, low, sortLen, sortComparer)
| (_, Some p) -> Array.Sort(keysArray, array, low, sortLen)
| (_, _) -> Array.Sort(array, low, sortLen)

For merging, if possible structural comparisons are used. Thus the comparison of array elements is defined using the following:

let projectionComparer = ComparisonIdentity.Structural<'Key>

let inline sortComparerResult (item1: 'T) (item2: 'T) =
    match projection with
    | Some p -> projectionComparer.Compare(p item1, p item2)
    | None -> sortComparer.Compare(item1, item2)

With the internal sorting process defined all that remains is to actually perform the sort:

match array with
| [||] -> failwith "Empty Array"
| small when small.Length < (Environment.ProcessorCount * 2) ->
    match (comparer, projection) with
    | (Some c, _) -> Array.sortInPlaceWith c array
    | (_, Some p) -> Array.sortInPlaceBy p array
    | (_, _) -> Array.sortInPlace array
| _ -> parallelSort array

As in the previous example an optimization is in place to ensure that small arrays are sorted using the base sort operations. One can however configure this to suit as necessary; as one can also configure the total worker threads.

As one of objectives of this exercise was to provide sort operations on the Array.Parallel module, a few extensions are defined:

module Array =
    module Parallel =

        let sort (array: 'T []) =
            ParallelSort.Sort(array)

        let sortBy (projection: 'T -> 'Key) (array: 'T []) =
            ParallelSort.SortBy(array, projection)

        let sortWith (comparer: 'T -> 'T -> int) (array: 'T []) =
            ParallelSort.SortWith(array, comparer)

        let sortInPlace (array: 'T []) =
            ParallelSort.SortInPlace(array)

        let sortInPlaceBy (projection: 'T -> 'Key) (array: 'T []) =
            ParallelSort.SortInPlaceBy(array, projection)

        let sortInPlaceWith (comparer: 'T -> 'T -> int) (array: 'T []) =
            ParallelSort.SortInPlaceWith(array, comparer)

These definitions allow one to perform parallel sort operations as one would do for base sorting:

array
|> Array.Parallel.sortInPlace

So if you have the need to perform parallel sort operation hopefully this will get you off the ground. If you want to see the code run, here is an fsx source file definition that I have used for testing.

// This file is a script that can be executed with the F# Interactive.  

#load "ParallelMergeSort.fs"

open System
open MSDN.FSharp.Parallel

// Sort which runs a serial
let items0 = [| 9; 7; 5; 3; 1 |]
items0
|> Array.Parallel.sortInPlace   

printfn "Serial: %A" items0

// Sort simple collection of numbers
let items1 = [| 10000 .. -1 .. 1 |]

items1
|> Array.Parallel.sort
|> printfn "Simple New: %A"

items1
|> Array.Parallel.sortInPlace   

printfn "Simple In Place: %A" items1

// Base parallel sort test
let items2 = [| for f in 0.0 .. 0.1 .. 100.0 -> sin f |]
items2
|> Array.Parallel.sortInPlace   

printfn "Sorted: %A" items2

// Parallel sort with a projection
let items3 = [| for f in 0.0 .. 0.1 .. 100.0 -> sin f |]
items3
|> Array.Parallel.sortInPlaceBy (fun item -> abs item)

printfn "Sorted ABS: %A" items3

// Some 5 million item array performance testing
#load "ParallelMergeSort.fs"

open System
open MSDN.FSharp.Parallel

let rnd = System.Random();

let recordTime func =
    GC.Collect(GC.MaxGeneration)
    GC.WaitForFullGCComplete() |> ignore
    GC.WaitForPendingFinalizers()
    let started = DateTime.Now
    func()
    DateTime.Now - started

let writeTime (message:string) (sortCount:int) (timespan : TimeSpan) =
    printfn "%s: Sort took %f seconds : Element count = %i" message timespan.TotalSeconds sortCount

let itemsBase = [| for f in 0 .. 1 .. 5000000 -> (rnd.NextDouble() - 0.8) * 1000.0 |]

// Base sort
let items8 = Array.copy itemsBase
let items9 = Array.copy itemsBase

recordTime (fun () ->
    items8
    |> Array.Parallel.sortInPlace)
|> writeTime "ParallelInPlace" 5000000

recordTime (fun () ->
    items9
    |> Array.sortInPlace)
|> writeTime "SequentialInPlace" 5000000

// Base sort new array
let items8n = Array.copy itemsBase
let items9n = Array.copy itemsBase

recordTime (fun () ->
    items8n
    |> Array.Parallel.sort
    |> ignore)
|> writeTime "Parallel" 5000000

recordTime (fun () ->
    items9n
    |> Array.sort
    |> ignore)
|> writeTime "Sequential" 5000000

// With sort
let items8w = Array.copy itemsBase
let items9w = Array.copy itemsBase

recordTime (fun () ->
    items8w
    |> Array.Parallel.sortInPlaceWith compare)
|> writeTime "ParallelInPlaceWith" 5000000

recordTime (fun () ->
    items9w
    |> Array.sortInPlaceWith compare)
|> writeTime "SequentialInPlaceWith" 5000000

// By sort
let items8b = Array.copy itemsBase
let items9b = Array.copy itemsBase

recordTime (fun () ->
    items8b
    |> Array.Parallel.sortInPlaceBy (fun item -> abs item))
|> writeTime "ParallelInPlaceBy" 5000000

recordTime (fun () ->
    items9b
    |> Array.sortInPlaceBy (fun item -> abs item))
|> writeTime "SequentialInPlaceBy" 5000000

In future posts I will look at QuickSort options in more detail.