Introducing F# Asynchronous Workflows

[ Update: Robert pickering has a very nice summary of using asynchonous workflows with web services ]

F# 1.9.2.9 includes a pre-release of F# asynchronous workflows. In this blog post we'll take a look at asynchronous workflows and how you might use them in practice. Asynchronous workflows are an application of F#'s computation expression syntax.

Below is a simple example of two asynchronous workflows and how you can run these in parallel:

    let task1 = async { return 10+10 }

    let task2 = async { return 20+20 }

    Async.Run (Async.Parallel [ task1; task2 ])

Here:

  • The expression "async { return 10+10 }" generates an object of type Async<int>.  
  • These values are not actual results: they are specifications of tasks to run.
  • The expression "Async.Parallel [ task1; task2 ]" composes two taks and forms a new task.
  • This generates a new value of type Async<int[]>.
  • Async.Run takes this and runs it, returning the array [| 20; 40 |] .
  • For the technically minded, the identifier async refers to a builder for a computation expression. You can also dig into the details of the F# implementation for more details on this.

You can try this example in F# Interactive from Visual Studio.

The above example is a bit misleading: asynchronous workflows are not primarily about parallelization of synchronous computations (they can be used for that, but you will probably want PLINQ and Futures). Instead they are for writing concurrent and reactive programs that perform asynchronous I/O where you don't want to block threads. Let's take a look at this in more detail.

Perhaps the most common asynchronous operation we're all do these days is to fetch web pages. Normally we use a browser for this, but we can also do it  programmatically. A synchronous HTTP GET is implemented in F# as follows:

    #light

 

    open System.IO

    open System.Net

   

    let SyncHttp(url:string) =

        // Create the web request object

        let req = WebRequest.Create(url)

        // Get the response, synchronously

        let rsp = req.GetResponse()

        // Grab the response stream and a reader. Clean up when we're done

        use stream = rsp.GetResponseStream()

        use reader = new StreamReader(stream)

        // Synchronous read-to-end, returning the result

        reader.ReadToEnd()

You can run this using:

    SyncHttp "maps.google.com"

    SyncHttp "maps.live.com"

But what if we want to read multiple web pages in parallel, i.e. asynchronously? Here is how we can do this using asynchronous workflows:

    let AsyncHttp(url:string) =

        async { // Create the web request object

                 let req = WebRequest.Create(url)

                

                 // Get the response, asynchronously

                 let! rsp = req.GetResponseAsync()

                

                 // Grab the response stream and a reader. Clean up when we're done

                 use stream = rsp.GetResponseStream()

                 use reader = new System.IO.StreamReader(stream)

                 // synchronous read-to-end

                 return reader.ReadToEnd() }

[ Note: This sample requires some helper code, defined at the end of this blog post, partly because one fuction called BuildPrimitive didn't make it into the 1.9.2.9 release. ]

Here AsyncHttp has type:

    val AsyncHttp : string -> Async<string>

This function accepts a URL and returns a Async task which, when run, will eventually generate a string for the HTML of the page we've requested. We can now get four web pages in parallel as follows:

    Async.Run

        (Async.Parallel [ AsyncHttp "www.live.com";

          AsyncHttp "www.google.com";

                          AsyncHttp "maps.live.com";

                          AsyncHttp "maps.google.com"; ])

How does this work? Let's add some print statements to take a closer look:

    let AsyncHttp(url:string) =

        async { do printfn "Created web request for %s" url

                 // Create the web request object

                 let req = WebRequest.Create(url)

                

                 do printfn "Getting response for %s" url

                 // Get the response, asynchronously

                 let! rsp = req.GetResponseAsync()

                

                 do printfn "Reading response for %s" url

                 // Grab the response stream and a reader. Clean up when we're done

                 use stream = rsp.GetResponseStream()

                 use reader = new System.IO.StreamReader(stream)

                 // synchronous read-to-end

                 return reader.ReadToEnd() }

When we run we now get the following output:

Created web request for www.live.com

Created web request for www.google.com

Getting response for www.live.com

Getting response for www.google.com

Created web request for maps.live.com

Created web request for maps.google.com

Getting response for maps.google.com

Getting response for maps.live.com

Reading response for maps.google.com

Reading response for www.google.com

Reading response for www.live.com

Reading response for maps.live.com

As can be seen from the above, there are multiple web requests in flight simultaneously, and indeed you may see the diagnostics output interleaved. Obviously, multiple threads of execution are being used to handle the requests. However, the key observation is that threads are not blocked during the execution of each asynchronous workflow. This means we can, in principle, have thousands of outstanding web requests: the limit being the number supproted by the machine, not the number of threads used to host them.

In the current underlying implementation, most of these web requests will be paused in the GetResponseAsync call. The magic of F# workflows is always in the "let!" operator. In this case this should be interpreted as "run the asynchronous computation on the right and wait for its result. If necessary suspend the rest of the workflow as a callback awaiting some system event."

The remainder of the asynchronous workflow is suspended as an I/O completion item in the .NET thread pool waiting on an event. Thus one advantage of asynchronous workflows is that they let you combine event based systems with portions of thread-based programming.

It is illuminating to augment the diagnostics with a thread id: this can be done by changing printfn to use the following:

    let tprintfn fmt =

        printf "[%d]" System.Threading.Thread.CurrentThread.ManagedThreadId

        printfn fmt

The output then becomes:

[9]Created web request for www.live.com

[9]Getting response for www.live.com

[4]Created web request for www.google.com

[4]Getting response for www.google.com

[9]Created web request for maps.live.com

[9]Getting response for maps.live.com

[9]Created web request for maps.google.com

[9]Getting response for maps.google.com

[12]Reading response for maps.google.com

[13]Reading response for www.google.com

[13]Reading response for www.live.com

[13]Reading response for maps.live.com

Note that the execution of the asynchronous workflow to fetch www.live.com "hopped" between different threads. This is characteristic of asynchronous workflows. As each step of the workflow completes the remainder of the workflow is executed as a callback.

The Microsoft.FSharp.Control.Async library type has a number of other interesting combinators and ways of specifying asynchronous computations. We'll be looking at some of these in future blog posts.Also, one solution to the asynchronous I/O puzzle is to use methods such as WebRequest.BeginGetResponse and WebRequest.EndGetResponse directly, or for streams use Stream.BeginRead and Stream.EndRead. You can see uses of these methods in the MSDN .NET sample of bulk asynchronous image processing that runs to about 190 lines. In a future blog post we'll look at how this program becomes a rather elegant 20 liner in F#, largely due to the use of async workflows.

Asynchronous workflows are essentially a way of writing simple continuation passing programs in a nice, linear syntax. Importantly standard control operators such as try/finally, use, while, if/then/else and for can be used inside these workflow specifications. Furthermore this style of writing agents matches well with functional programming: agents that are state machines can often be defined as recursive functions, and the actual information carried in each state passed as immutable data. Mutable data such as hash tables can also be used locally within a workflow as long as it is not transferred to other agents. Finally, message passing agents are particularly sweet in this style, and we'll lok at those in later blog posts.

One important topic in this kind of programming is exceptions. In reality, each asynchronous workflow runs with two continuations: one for success and one for failure. In later blog posts we'll take a look at how errors are handled and propagated by asynchronous workflows, or you can play around with the 1.9.2.9 implementation today.

In summary, we've seen above that asynchronous workflows are one promising syntactic device you can use to help tame the asynchronous and reactive parts of the asynchronous/reactive/concurrent/parallel programming landscape. They can be seen as a nice, fluid F#-specific surface syntax for common compositional patterns of accessing user-level task scheduling algorithms and libraries. They are also a primary use of the monadic techniques that underpin computation expressions and LINQ, and similar techniques have been used in Haskell (see Koen Classen's classic 1999 paper, and related work was reported by Peng Li and Steve Zdancewic at PLDI and by Chris Waterson at CUFP this year). 

I'll be talking more about asynchronous workflows at TechEd Europe 2007 in Barcelona, and they are also covered in Chapter 13 of Expert F#, which is entering the final stages of production as I write.

Some examples of the underlying techniques that might be used to execute portions of asynchronous workflows now or in the future are the .NET Thread Pool (used in F# 1.9.2.9), Futures and the CCR, all of which incorporate many advanced algorithms essential to good performance and reliability in these areas.  As we move ahead with the F# design in this space we will ensure that asynchronous workflows can be used effectively with all of these.

Enjoy!

----------------------

Finally, here is the extra code required for the web sample above. These functions will be included in future release of F#.

module Async =

let trylet f x = (try Choice2_1 (f x) with exn -> Choice2_2(exn))

let protect econt f x cont =

match trylet f x with

| Choice2_1 v -> cont v

| Choice2_2 exn -> econt exn

let BuildPrimitive(beginFunc,endFunc) =

Async.Primitive(fun (cont,econt) ->

(beginFunc(System.AsyncCallback(fun iar -> protect econt endFunc iar cont),

(null:obj)) : System.IAsyncResult) |> ignore)

type System.Net.WebRequest with

member x.GetResponseAsync() =

Async.BuildPrimitive(x.BeginGetResponse, x.EndGetResponse)