Dela via


F# Parallel Processing and the Command Pattern

In a previous post I talked about a mechanism for Parallel Process Execution with Conditional Completion. This code was a direct translation from a C# pattern. However it is worth noting that the Command pattern is not actually necessary in functional languages.

To demonstrate this here is a version of the ForEachPartial method, from the previous post, that operates on a sequence of functions; rather than a sequence of command types:

static member ForEachPartial(inputs : seq<unit -> 'a>, collector : 'a -> bool) =

    let exceptions = new ConcurrentQueue<Exception>();
    let cancellationToken = new CancellationTokenSource()
    let syncLock = new System.Object()

    let collectorState (result : 'a) =
        lock syncLock (fun () ->
            if cancellationToken.IsCancellationRequested = false && (collector result) = false then
                cancellationToken.Cancel()
                |> ignore)

    let operation (input : unit -> 'a) =
        try
            if cancellationToken.IsCancellationRequested = false then
                input()
                |> collectorState
        with
            | _ as e ->
                exceptions.Enqueue(e)
                if cancellationToken.IsCancellationRequested = false then
                    cancellationToken.Cancel()

    try
        inputs
        |> PSeq.withCancellation cancellationToken.Token
        |> PSeq.iter operation
    with
        | :? System.OperationCanceledException as e -> ()

    if exceptions.Count > 0 then
        raise (AggregateException("Processing of the inputs has failed.", exceptions))

As you can see the ForEachPartial method now executes, in parallel using PSeq, the specified functions. This is enabled as F# supports higher order functions, removing the need to wrap the command and parameters into a Command type.

If one wanted to support C# clients one could still support an override that operates using the Command pattern.

static member ForEachPartial(inputs : seq<IOperationInputDefinition<'a>>, collector : 'a -> bool) =

    let mapper (input:IOperationInputDefinition<'a>) = fun () -> input.Execute()
    let operations = Seq.map mapper inputs
    ParallelEx.ForEachPartial(operations, collector)

This override merely converts the sequence of Command types into functions of the signature (unit –> ‘a).

To provide a more functional interface, one can also provide an extension to PSeq that operations on the sequence of functions:

module PSeq =

    let iterWhile (collector : 'a -> bool) (inputs : seq<unit -> 'a>) =
        Threading.ParallelEx.ForEachPartial<'a>(inputs, collector)

This way one gets the best of both worlds, a functional call syntax for when programming in F# and an .Net signature that allows one to easily use the code from C#.

Written by Carl Nolan