F# Parallel Processing and the Command Pattern
In a previous post I talked about a mechanism for Parallel Process Execution with Conditional Completion. This code was a direct translation from a C# pattern. However it is worth noting that the Command pattern is not actually necessary in functional languages.
To demonstrate this here is a version of the ForEachPartial method, from the previous post, that operates on a sequence of functions; rather than a sequence of command types:
static member ForEachPartial(inputs : seq<unit -> 'a>, collector : 'a -> bool) =
let exceptions = new ConcurrentQueue<Exception>();
let cancellationToken = new CancellationTokenSource()
let syncLock = new System.Object()
let collectorState (result : 'a) =
lock syncLock (fun () ->
if cancellationToken.IsCancellationRequested = false && (collector result) = false then
cancellationToken.Cancel()
|> ignore)
let operation (input : unit -> 'a) =
try
if cancellationToken.IsCancellationRequested = false then
input()
|> collectorState
with
| _ as e ->
exceptions.Enqueue(e)
if cancellationToken.IsCancellationRequested = false then
cancellationToken.Cancel()
try
inputs
|> PSeq.withCancellation cancellationToken.Token
|> PSeq.iter operation
with
| :? System.OperationCanceledException as e -> ()
if exceptions.Count > 0 then
raise (AggregateException("Processing of the inputs has failed.", exceptions))
As you can see the ForEachPartial method now executes, in parallel using PSeq, the specified functions. This is enabled as F# supports higher order functions, removing the need to wrap the command and parameters into a Command type.
If one wanted to support C# clients one could still support an override that operates using the Command pattern.
static member ForEachPartial(inputs : seq<IOperationInputDefinition<'a>>, collector : 'a -> bool) =
let mapper (input:IOperationInputDefinition<'a>) = fun () -> input.Execute()
let operations = Seq.map mapper inputs
ParallelEx.ForEachPartial(operations, collector)
This override merely converts the sequence of Command types into functions of the signature (unit –> ‘a).
To provide a more functional interface, one can also provide an extension to PSeq that operations on the sequence of functions:
module PSeq =
let iterWhile (collector : 'a -> bool) (inputs : seq<unit -> 'a>) =
Threading.ParallelEx.ForEachPartial<'a>(inputs, collector)
This way one gets the best of both worlds, a functional call syntax for when programming in F# and an .Net signature that allows one to easily use the code from C#.
Written by Carl Nolan