LAgent: an agent framework in F# – Part VIII – Implementing MapReduce (user model)


Download framework here.

All posts are here:

    For this post I use a newer version of the framework that I just uploaded on CodeGallery. In the process of using LAgent I grew more and more unhappy with the weakly typed way of sending messages. The code that implements that feature is nasty: full of upcasts and downcasts. I was losing faith in it. Bugs were cropping up in all sorts of scenarios (i.e. using generic union types as messages).

    In the end I decided to re-architecture the framework so to make it strongly typed. In essence now each agent can just receive messages of a single type. The limitations that this design choice introduces (i.e. more limited hot swapping) are compensated by the catching of errors at compile time and the streamlining of the code. I left the old framework on the site in case you disagree with me.

    In any case, today’s post is about MapReduce. It assumes that you know what it is (link to the original Google paper that served as inspiration is here: Google Research Publication- MapReduce). What would it take to implement an in-memory MapReduce using my agent framework?

    Let’s start with the user model.

    let mapReduce   (inputs:seq<'in_key * 'in_value>)
                    (map:'in_key -> 'in_value -> seq<'out_key * 'out_value>)
                    (reduce:'out_key -> seq<'out_value> -> seq<'reducedValues>)
                    outputAgent
                    M R partitionF =                

    mapReduce takes seven parameters:

    1. inputs: a sequence of input key/value pairs.
    2. map: this function operates on each input key/value pair. It  returns a sequence of output key/value pairs. The type of the output sequence can be different from the type of the inputs.
    3. reduce: this function operates on an output key and all the values associated with it. It returns a sequence of reduced values (i.e. the average of all the values for this key)
    4. ouputAgent: this is the agent that gets notified every time a new output key has been reduced and at the end when all the operation ends.
    5. M: how many mapper agents to instantiate
    6. R: how many reducer agents to instantiate
    7. partitionF: the partition function used to choose which of the reducers is associated with a key

    Let’s look at how to use this function to find how often each word is used in a set of files. First a simple partition function can be defined as:

    let partitionF = fun key M -> abs(key.GetHashCode()) % M 

    Given a key and some buckets, it picks one of the buckets. Its type is: ‘a –> int –> int, so it’s fairly reusable.

    Let’s also create a basic agent that just prints out the reduced values:

    let printer = spawnWorker (fun msg ->
                                match msg with
                                | Reduced(key, value)   -> printfn "%A %A" key value
                                | MapReduceDone         -> printfn "All done!!")

    The agent gets notified whenever a new key is reduced or the algorithm ends. It is useful to be notified immediately instead of waiting for everything to be done. If I hadn’t written this code using agents I would have not realized that possibility. I would simply have framed the problem as a function that takes an input and returns an output. Agents force you to think explicitly about the parallelism in your app. That’s a good thing.

    The mapping function simply split the content of a file into words and adds a word/1 pair to the list. I know that there are much better ways to do this (i.e. regular expressions for the parsing and summing words counts inside the function), but I wanted to test the basic framework capabilities and doing it this way does it better.

    let map = fun (fileName:string) (fileContent:string) ->
                let l = new List<string * int>()
                let wordDelims = [|' ';',';';';'.';':';'?';'!';'(';')';'\n';'\t';'\f';'\r';'\b'|]
                fileContent.Split(wordDelims) |> Seq.iter (fun word -> l.Add((word, 1)))
                l :> seq<string * int>

    The reducer function simply sums the various word statistics sent by the mappers:

    let reduce = fun key (values:seq<int>) -> [values |> Seq.sum] |> seq<int>

    Now we can create some fake input to check that it works:

    let testInput = ["File1", "I was going to the airport when I saw someone crossing";
    "File2", "I was going home when I saw you coming toward me"]

    And execute the mapReduce:

    mapReduce testInput map reduce printer 2 2 partitionF

    On my machine I get the following. You might get a different order because of the async/parallel processing involved. If I wanted a stable order I would need to change the printer agent to cache results on Reduced and process them on MapReduceDone (see next post).

    "I" [4]

    "crossing" [1]

    "going" [2]

    "home" [1]

    "me" [1]

    "the" [1]

    "toward" [1]

    "airport" [1]

    "coming" [1]

    "saw" [2]

    "someone" [1]

    "to" [1]

    "was" [2]

    "when" [2]

    "you" [1]

    In the next post we’ll process some real books …

    Comments (4)

    1. Gary Davidson says:

      I cannot compile in VS 2008 or VS2010

      VS2010 say build failed because of the method FromContinuations

      in AgentSystem.fs

      type AsyncResultCell<‘T>() =

             let source = new TaskCompletionSource<‘T>()

             member this.RegisterResult r = source.SetResult(r)

             member this.AsyncWaitResult =

                 Async.FromContinuations(fun (cont,_,_) ->

                     let y = fun (t:Task<‘T>) -> cont (t.Result)

                     source.Task.ContinueWith(y) |> ignore)

    2. lucabol says:

      Oh sorry,

      it was called Primitive in VS10 B1. I’m using my dev machine to code this, which has B2.

      Just change the code and it should work (or wait for B2 to show up 🙂 )  

    3. Gary Davidson says:

      Still having compile issues, sigh!!! So who do you have to sleep with to get b2? lol

    4. Fanboy says:

      If you replace it with Primitive you get:

      Error 1 The method ‘ContinueWith’ is overloaded. Possible matches are shown below (or in the Error List window) D:AgentsAgentSystem.fs 169 32 Agents

      Error 2  Possible overload: ‘Task.ContinueWith(continuationAction: Action<Task<‘T>>) : Task’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 3  Possible overload: ‘Task.ContinueWith<‘TNewResult>(continuationFunction: Func<Task<‘T>,’TNewResult>) : Task<‘TNewResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 4  Possible overload: ‘Task.ContinueWith<‘TNewResult>(continuationFunction: Func<Task<‘T>,Task<‘TNewResult>>) : Task<‘TNewResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 5  Possible overload: ‘Task.ContinueWith(continuationAction: Action<Task>) : Task’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 6  Possible overload: ‘Task.ContinueWith<‘TResult>(continuationFunction: Func<Task,’TResult>) : Task<‘TResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 7  Possible overload: ‘Task.ContinueWith<‘TResult>(continuationFunction: Func<Task,Task<‘TResult>>) : Task<‘TResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 8  Possible overload: ‘Task.ContinueWith(continuationAction: Action<Task<‘T>>, scheduler: TaskScheduler) : Task’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 9  Possible overload: ‘Task.ContinueWith(continuationAction: Action<Task<‘T>>, continuationOptions: TaskContinuationOptions) : Task’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 10  Possible overload: ‘Task.ContinueWith<‘TNewResult>(continuationFunction: Func<Task<‘T>,’TNewResult>, scheduler: TaskScheduler) : Task<‘TNewResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 11  Possible overload: ‘Task.ContinueWith<‘TNewResult>(continuationFunction: Func<Task<‘T>,’TNewResult>, continuationOptions: TaskContinuationOptions) : Task<‘TNewResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 12  Possible overload: ‘Task.ContinueWith<‘TNewResult>(continuationFunction: Func<Task<‘T>,Task<‘TNewResult>>, scheduler: TaskScheduler) : Task<‘TNewResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 13  Possible overload: ‘Task.ContinueWith<‘TNewResult>(continuationFunction: Func<Task<‘T>,Task<‘TNewResult>>, continuationOptions: TaskContinuationOptions) : Task<‘TNewResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 14  Possible overload: ‘Task.ContinueWith(continuationAction: Action<Task>, scheduler: TaskScheduler) : Task’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 15  Possible overload: ‘Task.ContinueWith(continuationAction: Action<Task>, continuationOptions: TaskContinuationOptions) : Task’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 16  Possible overload: ‘Task.ContinueWith<‘TResult>(continuationFunction: Func<Task,’TResult>, scheduler: TaskScheduler) : Task<‘TResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 17  Possible overload: ‘Task.ContinueWith<‘TResult>(continuationFunction: Func<Task,’TResult>, continuationOptions: TaskContinuationOptions) : Task<‘TResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 18  Possible overload: ‘Task.ContinueWith<‘TResult>(continuationFunction: Func<Task,Task<‘TResult>>, scheduler: TaskScheduler) : Task<‘TResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 19  Possible overload: ‘Task.ContinueWith<‘TResult>(continuationFunction: Func<Task,Task<‘TResult>>, continuationOptions: TaskContinuationOptions) : Task<‘TResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 20  Possible overload: ‘Task.ContinueWith(continuationAction: Action<Task<‘T>>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 21  Possible overload: ‘Task.ContinueWith<‘TNewResult>(continuationFunction: Func<Task<‘T>,’TNewResult>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task<‘TNewResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 22  Possible overload: ‘Task.ContinueWith<‘TNewResult>(continuationFunction: Func<Task<‘T>,Task<‘TNewResult>>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task<‘TNewResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 23  Possible overload: ‘Task.ContinueWith(continuationAction: Action<Task>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 24  Possible overload: ‘Task.ContinueWith<‘TResult>(continuationFunction: Func<Task,’TResult>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task<‘TResult>’. D:AgentsAgentSystem.fs 169 32 Agents

      Error 25  Possible overload: ‘Task.ContinueWith<‘TResult>(continuationFunction: Func<Task,Task<‘TResult>>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task<‘TResult>’. D:AgentsAgentSystem.fs 169 32 Agents