F# Parallel Process Execution with Conditional Completion

In a previous posting, available here, I discussed a pattern for running long running processes synchronously and in parallel, but terminating processing when a certain condition was met. As an F# exercise I decided to write a similar code pattern using PSeq, an F#-style API for parallel operations on sequences that are part in .NET 4.0 as System.Linq.ParallelEnumerable class, available in the F# Power Pack.

A follow-up article demonstrating this code without the need for the Command patter can be found here.

Once again, as this is only a small amount of code, here is a full listing:

namespace MSDN.FSharp

open System
open System.Collections.Concurrent
open System.Collections.Generic
open System.Linq
open System.Threading

open MSDN.FSharp.Collections

module Threading =

    [<AbstractClass>]
    type OperationInputDefinitionBase<'a>() =        
        // All operations must support command execution
        abstract member Execute : unit -> 'a

    // input type with 1 parameter
    type OperationInputDefinition<'Tinput, 'Tresult>(parameter : 'Tinput, operation : 'Tinput -> 'Tresult) =
        inherit OperationInputDefinitionBase<'Tresult>()

        override this.Execute() =
            operation parameter

    // input type with 2 parameters
    type OperationInputDefinition<'Tinput1, 'Tinput2, 'Tresult>(parameter1 : 'Tinput1, parameter2 : 'Tinput2, operation : 'Tinput1 -> 'Tinput2 -> 'Tresult) =
        inherit OperationInputDefinitionBase<'Tresult>()

        override this.Execute() =
            operation parameter1 parameter2

    // parallel execution method
    type ParallelEx() =
        static member ForEachPartial<'a>(inputs : seq<OperationInputDefinitionBase<'a>>, collector : 'a -> bool) =

            let exceptions = new ConcurrentQueue<Exception>();
            let cancellationToken = new CancellationTokenSource()
            let syncLock = new System.Object()

            let collectorState (result : 'a) =
                lock syncLock (fun () ->
                    if cancellationToken.IsCancellationRequested = false && (collector result) = false then
                        cancellationToken.Cancel()
                        |> ignore)

            let operation (input : OperationInputDefinitionBase<'a>) =
                try
                    if cancellationToken.IsCancellationRequested = false then
                        input.Execute()
                        |> collectorState
                with
                    | _ as e ->
                        exceptions.Enqueue(e)
                        if cancellationToken.IsCancellationRequested = false then
                            cancellationToken.Cancel()

            try
                inputs
                |> PSeq.withCancellation cancellationToken.Token
                |> PSeq.iter operation
            with
                | :? System.OperationCanceledException as e -> ()

            if exceptions.Count > 0 then
                raise (AggregateException("Processing of the inputs has failed.", exceptions))

A little explanation of this code is warranted. As in the previous C# implementation, the operation input definitions (based on the Command pattern) allow for the definition of an operation to be performed with associated parameters.

The ForEachPartial extension method once again takes in a seq of operations, and a results collector operation. It is this results collector operation that informs the parallel processing whether any further iterations are required.

One of the big differences in this implementation is the use of PSeq, which is based on PLINQ, and as a result a loop state is unavailable. This means the cancellation token is used to terminate future scheduled iterations. The other difference is the definition of the collectorState and operation functions. The collectorState function takes the result of the operation execution and passes it into the collector, with the necessary locking. The return from the collector operation determines if further iterations are cancelled. The operation function merely wraps the call to the long running process.

To perform some testing here are some simple definitions:

let operation (value : int) =
    Thread.Sleep(2000)
    value * value

let mutable summation : int = 0

let collector value =
    summation <- summation + value
    if summation > 400 then false
    else true
    
let inputs : seq<OperationInputDefinitionBase<int>> = seq {
    for idx in 1..100 do
        yield new OperationInputDefinition<int, int>(idx, operation) :> OperationInputDefinitionBase<int>}

The operation merely returns the square of a number, after sleeping for 2 seconds, and the collector terminates when the collected results top a given value.

So to call this code, in an object based fashion one can use:

ParallelEx.ForEachPartial<int>(inputs, collector)

However this does not really follow the F# calling schematics. To support this the F# code includes the following extension to PSeq:

module PSeq =
    let iterWhile (collector : 'a -> bool) (inputs : seq<Threading.OperationInputDefinitionBase<'a>>) =
        Threading.ParallelEx.ForEachPartial<'a>(inputs, collector)

 Using this function one can now write, in a much more functional style:

inputs
|> PSeq.iterWhile collector

To finalize the testing code, one of the testing tasks is to record the operation timings and output the result to the console. Using the power of F# function I have defined the following two functions:

let writeTime (timespan : TimeSpan) =
    Console.WriteLine(sprintf "Operations took %f seconds : summation = %i" timespan.TotalSeconds summation)

let recordTime func =
    let started = DateTime.Now
    func()
    DateTime.Now - started

Thus to call the operations in parallel, record and display the timing we can write:

recordTime (fun () ->
    inputs
    |> PSeq.iterWhile collector)
|> writeTime

If you are using F# hopefully you will find this code useful.

Written by Carl Nolan