Async and Parallel Design Patterns in F#: Agents

In part 3 of this series, we explore lightweight, reactive agents in F# and look at some typical design patterns associated with these agents, including isolated internal state.

 

Pattern #4 – Your First Agent

Let’s take a look at your first asynchronous agent.

type Agent<'T> = MailboxProcessor<'T>

let agent =

   Agent.Start(fun inbox ->

     async { while true do

               let! msg = inbox.Receive()

               printfn "got message '%s'" msg } )

The agent performs repeated asynchronous waits for messages, and prints each message it receives. In this case, each message is a string, and the type of “agent” is:

                agent : Agent<string>

We can send messages to the agent as follows:

agent.Post "hello!"

which duly prints:

got message 'hello!'

and multiple messages as follows:

for i in 1 .. 10000 do

   agent.Post (sprintf "message %d" i)

which duly prints 10000 responses.

 

You can think of an agent as hiding a message queue (or channel), plus a reaction to be run when a message arrives. An agent often has an asynchronous loop that asynchronously waits for messages and then processes them. In the example above, the code executed by the agent is the “while” loop.

 

Agents will already be familiar to many readers. Erlang, is, of course, built around agents (there called processes). More recently, an experimental .NET incubation language called Axum has highlighted the importance of agent-based programming. Axum has been influential on the design of agents in F#, and vice-versa. Other languages with lightweight threading also emphasize agent-based decomposition and design.

The examples above begin with a type abbreviation to map the name “Agent” to the F# library implementation of in-memory agents called “MailboxProcessor”. You can also use the longer name, if you wish, but I prefer to use the shorter name.

 

Your First 100,000 Agents

Agents are lightweight, because they are based on F# asynchronous programming. For example, you can define hundreds of thousands of agents, or even more, within one .NET process. For example, let’s define 100,000 simple agents:

let agents =

    [ for i in 0 .. 100000 ->

       Agent.Start(fun inbox ->

         async { while true do

                   let! msg = inbox.Receive()

                   if i % 10000 = 0 then

                       printfn "agent %d got message '%s'" i msg } ) ]

You can send a message to each agent as follows:

for agent in agents do

    agent.Post "ping!"

Every 10,000th agent reports when its message has been processed. The collection of agents processes these messages quickly, in a few seconds. Agents and in-memory message processing is very fast.

 

Obviously, agents do not map directly to .NET threads – you can’t create 100,000 threads in a single application (even 1,000 is often too many on a 32-bit O/S). Instead, when an agent is waiting to receive a message it is represented as a simple callback, encompassing a few object allocations and the closure of references held by the agent. After a message is received, the work of the agent is scheduled and executed through a pool of threads (by default, the .NET thread pool).

 

While it is rare to require 100,000 agents in a program, there are compelling examples where 2,000+ are routinely required. We’ll look at some of these below.

 

Requests in Scalable Web Servers

The notion of an asynchronous agent is in fact a design pattern that recurs in many different settings in F# coding. In F#, we often use the word “agent” for any asynchronous computation that is in-flight, though particularly ones with loops, or which consume messages, or which generate results.

 

For example, in future posts we will look at building scalable TCP and HTTP server applications in F#, deploying them to the cloud using EC2 and Windows Azure. One example we’ll use is a quote server, which accepts incoming TCP or HTTP connections and returns a stream of quotes to each client. Each client receives one quote every second. The service is ultimately published as a single URL or REST API.

 

In that implementation, each accepted client request sparks an asynchronous agent, whose definition is as roughly follows (in this case, we repeatedly write the same price for the AAPL stock, for illustrative purposes):

open System.Net.Sockets

/// serve up a stream of quotes

let serveQuoteStream (client: TcpClient) = async {

    let stream = client.GetStream()

    while true do

        do! stream.AsyncWrite( "AAPL 200.38"B )

        do! Async.Sleep 1000.0 // sleep one second

}

Each agent runs until the client disconnects. Because agents are lightweight, the quote service can support many thousands of simultaneous clients within a single machine (and can scale further by using cloud hosting). The actual number of agents in flight depends on the number of clients being served.

 

The above example gives a hint at how sweet asynchronous network protocol programming can be with F# - network protocols become asynchronous agents reading and writing data to streams. We’ll be looking at many more examples of scalable TCP/HTTP programming in F# in later posts.

  

Agents and Isolated State (the imperative way)

One of the keys to successful agent programming in F# is isolation. Isolation means that resources exist which “belong” to a specific agent, and are not accessed except by that agent. This means isolated state is protected from concurrent access and data races.

 

The constructs of F# make it easy to use lexical scoping to achieve isolation within asynchronous agents. For example, the following code uses a dictionary to accumulate a count of the different messages sent to an agent. The internal dictionary is lexically private to the asynchronous agent, and no capability to read or write to the dictionary is made available “outside” of the agent. This means the mutable state in the dictionary is isolated. Non-concurrent, safe access is guaranteed.

 

    type Agent<'T> = MailboxProcessor<'T>

    open System.Collections.Generic

    let agent =

       Agent.Start(fun inbox ->

         async { let strings = Dictionary<string,int>()

                 while true do

                   let! msg = inbox.Receive()

                   if strings.ContainsKey msg then

  strings.[msg] <- strings.[msg] + 1

                   else

                       strings.[msg] <- 0

                   printfn "message '%s' now seen '%d' times" msg strings.[msg] } )

 

State isolation is a fundamental technique in F# and is not just restricted to agents. For example, the following code represents an isolated use of both a stream reader and a ResizeArray (the preferred F# name for System.Collections.Generics.List, and note that an equivalent of this function exists in the .NET libraries as System.IO.File.ReadAllLines)

 

    let readAllLines (file:string) =

        use reader = new System.IO.StreamReader(file)

        let lines = ResizeArray<_>()

        while not reader.EndOfStream do

            lines.Add (reader.ReadLine())

        lines.ToArray()

Here, the reader and ResizeArrray are never used outside the function. In the case of agents and other long-lived computations, isolation is not just temporal – the state is permanently separated and isolated even though other processing is continuing in the program.

 

While isolation is ultimately a dynamic property, it is often enforced lexically. For example, consider the lazy, on-demand version of reading all lines uses an isolated reader:

 

    let readAllLines (file:string) =

        seq { use reader = new System.IO.StreamReader(file)

              while not reader.EndOfStream do

                  yield reader.ReadLine() }

 

Isolated state often includes mutable values. For these, reference cells are used in F#. For example, the code below keeps a count of all messages by using a reference cell.

 

    let agent =

       Agent.Start(fun inbox ->

         async { let count = ref 0

                 while true do

                   let! msg = inbox.Receive()

                   incr count

                   printfn "now seen a total of '%d' messages" !count } )

Again, the mutable state is isolated, and safe, single-threaded access is guaranteed.

 

Agents with Loops and Isolated State (the functional way)

F# programmers know that there are two ways to write loops in F# - using the “imperative” while/for plus mutable accumulators (ref or mutable), or using the “functional” style of one or more recursive functions, passing accumulated state as parameters. For example, a loop to count the lines in a file can be written imperatively:

    let countLines (file:string) =

        use reader = new System.IO.StreamReader(file)

        let count = ref 0

        while not reader.EndOfStream do

            reader.ReadLine() |> ignore

            incr count

        !count

or recursively:

    let countLines (file:string) =

        use reader = new System.IO.StreamReader(file)

        let rec loop n =

            if reader.EndOfStream then n

            else

                reader.ReadLine() |> ignore

                loop (n+1)

        loop 0

For localized uses, either approach is fine: the functional approach is a little more advanced, but greatly reduces the amount of explicit mutation in your code and is often more general. F# programmers are expected to be capable of reading either, and able to translate a “while” loop into an equivalent “let rec” (it makes a good interview question!)

 

Interestingly, this same rule applies to writing asynchronous loops: you can use either the imperative “while”, or the functional “let rec” to define your loops. For example, here is a version of an agent which counts its messages using a recursive asynchronous function:

    let agent =

       Agent.Start(fun inbox ->

         let rec loop n = async {

             let! msg = inbox.Receive()

             printfn "now seen a total of %d messages" (n+1)

             return! loop (n+1)

         }

         loop 0 )

 

Here’s how we post messages to this agent:

for i in 1 .. 10000 do

   agent.Post (sprintf "message %d" i)

 

When we do this, we get the following output:

       now seen a total of 0 messages

       now seen a total of 1 messages

       ....

       now seen a total of 10000 messages

 

To recap, the two common patterns used to define mailbox agents are the imperative:

    let agent =

       Agent.Start(fun inbox ->

         async {

             // isolated imperative state goes here

             ...

             while <condition> do

                 // read messages and respond

                 ...

         })

 

And the recursive/functional:

    let agent =

      Agent.Start(fun inbox ->

         let rec loop arg1 arg2 = async {

             // receive and process messages here

             ...

             return! loop newArg1 newArg2

          }

         loop initialArg1 initialArg2 )

 

Again, either approach is reasonable in F# - using recursive asynchronous functions is slightly more advanced, but more functional and more general.

 

Messages and Union Types

It is common to use a discriminated union for a message type. For example, in an agent-based version of the DirectX sample that I like showing, we use the following message type for the simulation engine: 

 

type Message =

    | PleaseTakeOneStep

    | PleaseAddOneBall of Ball

The simulation engine is then this agent:

    let simulationEngine =

        Agent.Start(fun inbox ->

            async { while true do

                        // Wait for a message

                        let! msg = inbox.Receive()

 

                        // Process a message

                        match msg with

                        | PleaseTakeOneStep -> state.Transform moveBalls

                        | PleaseAddOneBall ball -> state.AddObject ball })

 

Using strongly typed messages like this is a good idea in many circumstances. However, when interoperating with other messaging machinery, you shouldn’t be afraid to use heterogeneous types such as “obj” and “string” as your message type, and have the agent do the decoding with runtime typing and parsing.

 

Parameterizing and Abstracting Agents

Agents are just an F# coding design pattern. This means you can use all the usual techniques of F# code to parameterize, abstract and reuse fragments of agent definitions. For example, you might parameterize the serveQuoteStream function used above by the time between transmitted quotes:

open System.Net.Sockets

/// serve up a stream of quotes

let serveQuoteStream (client: TcpClient, periodMilliseconds: int) = async {

    let stream = client.GetStream()

    while true do

        do! stream.AsyncWrite( "AAPL 439.2"B )

        do! Async.Sleep periodMilliseconds

}

This means different requests in your quote server can have different periods.

 

Similarly, you might abstract whole classes of agents using function parameters:

    let iteratingAgent job =

       Agent.Start(fun inbox ->

         async { while true do

                   let! msg = inbox.Receive()

                   do! job msg })

    let foldingAgent job initialState =

       Agent.Start(fun inbox ->

         let rec loop state = async {

             let! msg = inbox.Receive()

             let! state = job state msg

             return! loop state

           }

         loop initialState )

You can use the first as follows:

    let agent1 = iteratingAgent (fun msg -> async { do printfn "got message '%s'" msg })

and the second:

    let agent2 =

        foldingAgent (fun state msg ->

            async { if state % 1000 = 0 then printfn "count = '%d'" msg;

           return state + 1 }) 0

 

Reporting Results from Agents

In future posts we will look at techniques for accessing partial results from executing agents, for example, using the PostAndAsyncReply method available on each MailboxProcessor agent. These techniques are important when creating networks of communicating agents.

 

However, often that is overkill, and instead we only need to report results to some supervising arena such as a GUI. As such, one simple way of reporting partial results from any agent is to use the design pattern discussed in part 2 of this series. An example is shown below that builds an agent which samples the message stream every 1000th message and routes the sampled events to the GUI or other hosting thread.

// Receive messages and raise an event on each 1000th message

type SamplingAgent() =

    // The event that is raised

    let sample = new Event<string>()

   

    // Capture the synchronization context to allow us to raise events

    // back on the GUI thread

    let syncContext = SynchronizationContext.CaptureCurrent()

    // The internal mailbox processor agent

    let agent =

        new MailboxProcessor<_>(fun inbox ->

            async { let count = ref 0

                    while true do

                        let! msg = inbox.Receive()

                        incr count

                        if !count % 1000 = 0 then

                            syncContext.RaiseEvent sample msg })

    /// Post a message to the agent

    member x.Post msg = agent.Post msg

    /// Start the agent

  member x.Start () = agent.Start()

    /// Raised every 1000'th message

    member x.Sample = sample.Publish

[ Note: this uses the two extension methods SynchronizationContext (CaptureCurrent and RaiseEvent) described in part 2 of this series. ]

 

You can use this agent as follows:

let agent = SamplingAgent()

agent.Sample.Add (fun s -> printfn "sample: %s" s)

agent.Start()

for i = 0 to 10000 do

   agent.Post (sprintf "message %d" i)

 

As expected, this reports a sampling of messages posted to the agent:

sample: message 999

sample: message 1999

sample: message 2999

sample: message 3999

sample: message 4999

sample: message 5999

sample: message 6999

sample: message 7999

sample: message 8999

sample: message 9999

 

Agents and Errors

We all know mistakes and exceptions happen. Good error detection, reporting and logging is essential in agent-based programming. Let’s look at how to detect and forward errors when using F# in-memory agents (mailbox processors).

 

Firstly, part of the magic of F# asynchronous workflows is that exceptions are caught and propagated automatically within async { ... } blocks, even across asynchronous waits and I/O operations. You can also use try/with, try/finally and use constructs within async { ... } blocks to catch exceptions and release resources. This means we only need deal with uncaught errors in agents.

 

When an uncaught error occurs in a mailbox processor agent, the Error event on the agent is raised. A common pattern is to forward all errors to a supervising process, for example:

type Agent<'T> = MailboxProcessor<'T>

let supervisor =

   Agent<System.Exception>.Start(fun inbox ->

     async { while true do

               let! err = inbox.Receive()

               printfn "an error occurred in an agent: %A" err })

let agent =

   new Agent<int>(fun inbox ->

     async { while true do

               let! msg = inbox.Receive()

               if msg % 1000 = 0 then

                   failwith "I don't like that cookie!" })

 

agent.Error.Add(fun error -> supervisor.Post error)

agent.Start()

 

 

It can be convenient to pipeline these configuration operations as follows:

let agent =

   new Agent<int>(fun inbox ->

     async { while true do

               let! msg = inbox.Receive()

               if msg % 1000 = 0 then

                   failwith "I don't like that cookie!" })

   |> Agent.reportErrorsTo supervisor

   |> Agent.start

Using a helper module:

module Agent =

   let reportErrorsTo (supervisor: Agent<exn>) (agent: Agent<_>) =

       agent.Error.Add(fun error -> supervisor.Post error); agent

  

   let start (agent: Agent<_>) = agent.Start(); agent

Here is an example where we create 10000 agents, some of which report errors:

let supervisor =

   Agent<int * System.Exception>.Start(fun inbox ->

     async { while true do

               let! (agentId, err) = inbox.Receive()

               printfn "an error '%s' occurred in agent %d" err.Message agentId })

let agents =

   [ for agentId in 0 .. 10000 ->

        let agent =

            new Agent<string>(fun inbox ->

               async { while true do

                         let! msg = inbox.Receive()

                         if msg.Contains("agent 99") then

                             failwith "I don't like that cookie!" })

        agent.Error.Add(fun error -> supervisor.Post (agentId,error))

        agent.Start()

        (agentId, agent) ]

When sending these messages:

for (agentId, agent) in agents do

   agent.Post (sprintf "message to agent %d" agentId )

We see the following:

an error 'I don't like that cookie!' occurred in agent 99

an error 'I don't like that cookie!' occurred in agent 991

an error 'I don't like that cookie!' occurred in agent 992

an error 'I don't like that cookie!' occurred in agent 993

...

an error 'I don't like that cookie!' occurred in agent 9999

This section has just dealt with errors in F# in-memory MailboxProcessor agents. Other kinds of agents (for example, agents representing server-side requests) should also be designed and architected to handle or forward errors gracefully and restart appropriately.

 

Summary

Isolated agents are a programming pattern that recurs again and again across multiple programming domains, from device driver programming to user interfaces to distributed programming to scalable communication servers. Every time you write an object, thread or asynchronous worker to manage a long-running communication (e.g. sending data to your computer’s speakers, or reading data from the network, or reacting to a stream of incoming events), you are writing a kind of agent. Every time you write an ASP.NET web page handler, you are writing a form of agent (one whose state is reconstituted on each invocation). In all cases, it is normal that some or all of the state associated with the communication is isolated.

 

Isolated agents are a means to an end – e.g. implementing scalable programming algorithms, including scalable request servers and scalable distributed programming algorithms. Like all asynchronous and parallel design patterns, they should not be over used. However, they are an elegant, powerful and efficient technique when used wisely.

 

F# is unique amongst the managed languages that come with Visual Studio 2010 in its integrated support for both lightweight asynchronous computations and in-memory agents. In F#, async agents can be written in a compositional way, without requiring code to be written using callbacks and inverted control. There are some tradeoffs here – for example, in future articles we will be looking at how to publish your agents using the standard APM pattern of the .NET libraries. However, the benefits are high: control, scaling and structured CPU and I/O parallelism when you need it, while keeping the full performance of .NET native code for your synchronous, CPU-bound code.

 

Indeed, there are few other .NET or JVM-based languages that support lightweight reactive agents at all – in early .NET it was said to be “impossible” because of the costs of threads. In this context, F#’s integration of “async { ... }” in 2007 can be seen as somewhat of a breakthrough in applied language design – it allows lightweight, compositional async programming and reactive agents in the context of an industrially accepted and interoperable programming platform. Along with the Axum language prototype (which has been influential on F#), F# has proven that an asynchronous language feature is a feasible way to break through the logjam of “do we make threads lightweight or not” that currently bedevils industrial runtime system design.

 

F# async programming can be seen as an implementation of resumptions, and there are many precursors here, for example OCaml delimited continuations, Haskell embeddings of monadic concurrency and papers emphasising the importance of resumptions with regard to concurrency.

 

You can use F# asynchronous agents on .NET 2.0, 3.5, 4.0, on Linux/Mono/Mac and on Silverlight. Indeed, you can even use F# async programming when F# is translated to Javascript using the WebSharper platform. Enjoy!