F# Parallel Processing and the Command Pattern

In a previous post I talked about a mechanism for Parallel Process Execution with Conditional Completion. This code was a direct translation from a C# pattern. However it is worth noting that the Command pattern is not actually necessary in functional languages.

To demonstrate this here is a version of the ForEachPartial method, from the previous post, that operates on a sequence of functions; rather than a sequence of command types:

type ParallelEx =

    // F# wrapper using functions sequence
    static member ForEachPartial(inputs : seq<unit -> 'a>, collector : 'a -> bool) =

        let exceptions = new ConcurrentQueue<Exception>();
        let cancellationToken = new CancellationTokenSource()
        let syncLock = new System.Object()

        let collectorState (result : 'a) =
            lock syncLock (fun () ->
                if cancellationToken.IsCancellationRequested = false && (collector result) = false then
                    cancellationToken.Cancel()
                    |> ignore)

        let operation (input : unit -> 'a) =
            try
                if cancellationToken.IsCancellationRequested = false then
                    input()
                    |> collectorState
            with
                | _ as e ->
                    exceptions.Enqueue(e)
                    if cancellationToken.IsCancellationRequested = false then
                        cancellationToken.Cancel()

        try
            inputs
            |> PSeq.withCancellation cancellationToken.Token
            |> PSeq.iter operation
        with
            | :? System.OperationCanceledException as e -> ()

        if exceptions.Count > 0 then
            raise (AggregateException("Processing of the inputs has failed.", exceptions))

As you can see the ForEachPartial method now executes, in parallel using PSeq, the specified functions. This is enabled as F# supports higher order functions, removing the need to wrap the command and parameters into a Command type. A word of not about PSeq. PSeq is based on PLINQ, and as a result a loop state is unavailable. This means a cancellation token is used to terminate future scheduled iterations.

The collectorState function takes the result of the operation execution and passes it into the collector function, performing the necessary locking. The return from the collector operation determines if further iterations are cancelled. The operation function merely wraps the call to the long running process.

If one wanted to support C# clients one could still support an override that operates using the Command pattern.

type ParallelEx =

    // C# interface allowing command pattern
    static member ForEachPartial(inputs : seq<IOperationInputDefinition<'a>>, collector : 'a -> bool) =

        let mapper (input:IOperationInputDefinition<'a>) = fun () -> input.Execute()
        let operations = Seq.map mapper inputs
        ParallelEx.ForEachPartial(operations, collector)

This override merely converts the sequence of Command types into functions of the signature (unit –> ‘a). The definition of the command types is:

[<AbstractClass>]
type IOperationInputDefinition<'a>() =        
    // All operations must support command execution
    abstract member Execute : unit -> 'a

// input type with 1 parameter
type OperationInputDefinition<'Tinput, 'Tresult>(parameter : 'Tinput, operation : 'Tinput -> 'Tresult) =
    inherit IOperationInputDefinition<'Tresult>()

    override this.Execute() =
        operation parameter

// input type with 2 parameters
type OperationInputDefinition<'Tinput1, 'Tinput2, 'Tresult>(parameter1 : 'Tinput1, parameter2 : 'Tinput2, operation : 'Tinput1 -> 'Tinput2 -> 'Tresult) =
    inherit IOperationInputDefinition<'Tresult>()

    override this.Execute() =
        operation parameter1 parameter2

To provide a more functional interface, one can also provide an extension to PSeq that operations on the sequence of functions:

module PSeq =

    let iterWhile (collector : 'a -> bool) (inputs : seq<unit -> 'a>) =
        Threading.ParallelEx.ForEachPartial<'a>(inputs, collector)

This way one gets the best of both worlds, a functional call syntax for when programming in F# and an .Net signature that allows one to easily use the code from C#.