Share via


F# and Running Parallel Tasks

Recently I have been working a lot with the Task Parallel Libraries. In doing so, in F#,  you will quickly learn that one has to perform quite a bit of casting. As such I have found it useful to define some wrapper members that abstract these casting and calling schematics:

type ParallelEx =

    static member inline Do (actions:(unit -> unit) array, ?cancellationToken:CancellationToken) =
        match actions with
        | [||] -> ()
        | [|action|] -> action()
        | _ ->
            let tasks =
                match cancellationToken with
                | Some c -> Array.map (fun x -> Task.Factory.StartNew(Action(x), c)) actions
                | None -> Array.map (fun x -> Task.Factory.StartNew(Action(x))) actions
            Task.WaitAll tasks

    static member inline Let (funcs:(unit -> 'T) array, ?cancellationToken:CancellationToken) =
        match funcs with
        | [||] -> Array.empty
        | [|func|] -> [| func() |]
        | _ ->
            let tasks =
                match cancellationToken with
                | Some c -> Array.map (fun x -> Task.Factory.StartNew(Func<'T>(x), c)) funcs
                | None -> Array.map (fun x -> Task.Factory.StartNew(Func<'T>(x))) funcs
            Task.WaitAll [| for task in tasks -> task :> Task |]
            Array.map (fun (t: Task<'T>) -> t.Result) tasks

Each of these members takes an array of functions to execute, in parallel as tasks, only returning when all tasks have completed, performing a WaitAll on the tasks. For the Let member any results are then collected and returned in an array.

These wrappers use Array prototypes, rather than lists, mostly because the TPL uses arrays. This means one does not have to do any type conversions between list and array types; and vice-versa.

One thing you will notice in the Let member is the casting of the tasks to type Task, when performing the WaitAll. This is because the tasks, when created using Func types, are of type Task<’T>. Whereas C# will automatically perform the downcast, this has to be explicitly performed in F#.

These wrappers currently support an optional CancellationToken. They could easily be extended for other Task capabilities such as state and TaskCreationOptions. If I extend these in the near future I will post them up.

A quick word about Exceptions. As these members are just wrappers over the TPL you will need to cater for exceptions thrown by the TPL, including the AggregateException.

To demonstrate using the Do member, here is the call one would use when performing a parallel Quicksort:

ParallelEx.Do [|
fun () -> quickSortDepth low (pivot - 1) (depth - 1);
fun () -> quickSortDepth (pivot + 1) high (depth - 1) |]

As you can see, using these wrappers saves one having to always perform Action and Func casts for parallel executions.

If one so desired one could just as easily put a wrapper over Parallel.Invoke, rather than using tasks:

static member inline Invoke (actions:(unit -> unit) array) =
    match actions with
    | [||] -> ()
    | [|action|] -> action()
    | _ ->
        Array.map (fun x -> Action(x)) actions
        |> Parallel.Invoke

The usage schematics for this wrapper would be the same as that for the Do member.