F# Parallel Processing and the Command Pattern
In a previous post I talked about a mechanism for Parallel Process Execution with Conditional Completion. This code was a direct translation from a C# pattern. However it is worth noting that the Command pattern is not actually necessary in functional languages.
To demonstrate this here is a version of the ForEachPartial method, from the previous post, that operates on a sequence of functions; rather than a sequence of command types:
type ParallelEx =
// F# wrapper using functions sequence
static member ForEachPartial(inputs : seq<unit -> 'a>, collector : 'a -> bool) =
let exceptions = new ConcurrentQueue<Exception>();
let cancellationToken = new CancellationTokenSource()
let syncLock = new System.Object()
let collectorState (result : 'a) =
lock syncLock (fun () ->
if cancellationToken.IsCancellationRequested = false && (collector result) = false then
cancellationToken.Cancel()
|> ignore)
let operation (input : unit -> 'a) =
try
if cancellationToken.IsCancellationRequested = false then
input()
|> collectorState
with
| _ as e ->
exceptions.Enqueue(e)
if cancellationToken.IsCancellationRequested = false then
cancellationToken.Cancel()
try
inputs
|> PSeq.withCancellation cancellationToken.Token
|> PSeq.iter operation
with
| :? System.OperationCanceledException as e -> ()
if exceptions.Count > 0 then
raise (AggregateException("Processing of the inputs has failed.", exceptions))
As you can see the ForEachPartial method now executes, in parallel using PSeq, the specified functions. This is enabled as F# supports higher order functions, removing the need to wrap the command and parameters into a Command type. A word of not about PSeq. PSeq is based on PLINQ, and as a result a loop state is unavailable. This means a cancellation token is used to terminate future scheduled iterations.
The collectorState function takes the result of the operation execution and passes it into the collector function, performing the necessary locking. The return from the collector operation determines if further iterations are cancelled. The operation function merely wraps the call to the long running process.
If one wanted to support C# clients one could still support an override that operates using the Command pattern.
type ParallelEx =
// C# interface allowing command pattern
static member ForEachPartial(inputs : seq<IOperationInputDefinition<'a>>, collector : 'a -> bool) =
let mapper (input:IOperationInputDefinition<'a>) = fun () -> input.Execute()
let operations = Seq.map mapper inputs
ParallelEx.ForEachPartial(operations, collector)
This override merely converts the sequence of Command types into functions of the signature (unit –> ‘a). The definition of the command types is:
[<AbstractClass>]
type IOperationInputDefinition<'a>() =
// All operations must support command execution
abstract member Execute : unit -> 'a
// input type with 1 parameter
type OperationInputDefinition<'Tinput, 'Tresult>(parameter : 'Tinput, operation : 'Tinput -> 'Tresult) =
inherit IOperationInputDefinition<'Tresult>()
override this.Execute() =
operation parameter
// input type with 2 parameters
type OperationInputDefinition<'Tinput1, 'Tinput2, 'Tresult>(parameter1 : 'Tinput1, parameter2 : 'Tinput2, operation : 'Tinput1 -> 'Tinput2 -> 'Tresult) =
inherit IOperationInputDefinition<'Tresult>()
override this.Execute() =
operation parameter1 parameter2
To provide a more functional interface, one can also provide an extension to PSeq that operations on the sequence of functions:
module PSeq =
let iterWhile (collector : 'a -> bool) (inputs : seq<unit -> 'a>) =
Threading.ParallelEx.ForEachPartial<'a>(inputs, collector)
This way one gets the best of both worlds, a functional call syntax for when programming in F# and an .Net signature that allows one to easily use the code from C#.