Async and Parallel Design Patterns in F#: Parallelizing CPU and I/O Computations

 

F# is both a parallel and a reactive language. By this we mean that running F# programs can have both multiple active evaluations (e.g. .NET threads actively computing F# results), and multiple pending reactions (e.g. callbacks and agents waiting to react to events and messages).

 

One simple way to write parallel and reactive programs is with F# async expressions. In this and future posts, I will cover some of the basic ways in which you can use F# async programming - roughly speaking, these are design patterns enabled by F# async programming. I assume you already know the basics of using async, e.g. see this introductory guide.

We’ll start with two easy design patterns: Parallel CPU Asyncs and Parallel I/O Asyncs.

  • Part 3 describes lightweight, reactive, isolated agents in F# .

    Pattern #1: Parallel CPU Asyncs

    Let’s take a look at an example of our first pattern: Parallel CPU Asyncs, that is, running a set of CPU-bound computations in parallel. The code below computes the Fibonacci function, and schedules the computations in parallel:

    let rec fib x = if x <= 2 then 1 else fib(x-1) + fib(x-2)

    let fibs =

        Async.Parallel [ for i in 0..40 -> async { return fib(i) } ]

        |> Async.RunSynchronously

    Producing:

       val fibs : int array =

         [|1; 1; 2; 3; 5; 8; 13; 21; 34; 55; 89; 144; 233; 377; 610; 987; 1597; 2584;

           4181; 6765; 10946; 17711; 28657; 46368; 75025; 121393; 196418; 317811;

           514229; 832040; 1346269; 2178309; 3524578; 5702887; 9227465; 14930352;

           24157817; 39088169; 63245986; 102334155|]

    The above code sample shows the elements of the Parallel CPU Asyncspattern:

    (a) “async { … }” is used to specify a number of CPU tasks

    (b) These are composed in parallel using the fork-join combinator Async.Parallel

    In this case the composition is executed using Async.RunSynchronously, which starts an instance of the async and synchronously waits for the overall result.

    You can use this pattern for many routine CPU parallelization jobs (e.g. dividing and parallelizing a matrix multiply), and for batch processing jobs.

    Pattern #2: Parallel I/O Asyncs

    So far we have only seen parallel CPU-bound programming with F#. One key thing about F# async programming is that you can use it for both CPU and I/O computations. This leads to our second pattern: Parallel I/O Asyncs, i.e. doing I/O operations in parallel (also known as overlapped I/O). For example, the following requests multiple web pages in parallel and reacts to the responses for each request, and returns the collected results.

    open System

    open System.Net

    open Microsoft.FSharp.Control.WebExtensions

    let http url =

        async { let req = WebRequest.Create(Uri url)

                use! resp = req.AsyncGetResponse()

                use stream = resp.GetResponseStream()

                use reader = new StreamReader(stream)

                let contents = reader.ReadToEnd()

                return contents }

    let sites = ["www.bing.com";

                 "www.google.com";

                 "www.yahoo.com";

                 "www.search.com"]

    let htmlOfSites =

        Async.Parallel [for site in sites -> http site ]

        |> Async.RunSynchronously

    The above code sample shows the essence of the Parallel I/O Asyncs pattern:

    (a)  “async { … }” is used to write tasks which include some asynchronous I/O.

    (b) These are composed in parallel using the fork-join combinator Async.Parallel

    In this case, the composition is executed using Async.RunSynchronously, which synchronously waits for the overall result

    Using let! (or its resource-disposing equivalent use!) is one basic way of composing asyncs. A line such as

    let! resp = req.AsyncGetResponse()

     

    causes a “reaction” to occur when a response to the HTTP GET occurs. That is, the rest of the async { … } runs when the AsyncGetResponse operation completes. However, no .NET or operating system thread is blocked while waiting for this reaction: only active CPU computations use an underlying .NET or O/S thread. In contrast, pending reactions (for example, callbacks, event handlers and agents) are relatively cheap, often as cheap as a single registered object. As a result you can have thousands or even millions of pending reactions. For example, a typical GUI application has many registered event handlers, and a typical web crawler has a registered handler for each outstanding web request.

     

    In the above, "use!" replaces "let!" and indicates that the resource associated with the web request should be disposed at the end of the lexical scope of the variable.

    One of the nice things about I/O parallelization is scaling. With multi-core CPU-bound programming you often see 2x, 4x or 8x speedups if you work hard enough on a many-core machine. With I/O parallel programming you can perform hundreds or thousands of operations in parallel (though actual parallelization depends on your operating system and network connections), giving speedups of 10x, 100x, 1000x or more, even on a single-core machine. For example, see the use of F# asyncs in this nice sample, ultimately called from an Iron Python application.

    Many modern applications are I/O bound so it’s important to be able to recognize and apply this design pattern in practice.

    Starting on the GUI Thread, finishing on the GUI thread

    There is an important variation on both of these design patterns. This is where Async.RunSynchronously is replaced by Async.StartWithContinuations. Here the parallel composition is started and you specify three functions to run when the async completes with success, failure or cancellation.

     

    Whenever you face the problem “I need to get the result of an async but I really don’t want to use RunSynchronously”, then you should consider either:

    (a) start the async as part of a larger async by using let! (or use!), or

     

    (b) start the async with Async.StartWithContinuations

     

    Async.StartWithContinuations is very useful when starting asyncs on the GUI thread, since you never want to block the GUI thread, instead you want to schedule some GUI updates to occur when the async completes. For example, this is used in the BingTranslator examples in the F# JAOO Tutorial code. A full version of this sample is shown at the end of this blog post, but the important thing here is to note what happens when the “Translate” button is pressed:

    button.Click.Add(fun args ->

        let text = textBox.Text

        translated.Text <- "Translating..."

        let task =

            async { let! languages = httpLines languageUri

                    let! fromLang = detectLanguage text

                    let! results = Async.Parallel [for lang in languages -> translateText (text, fromLang, lang)]

                    return (fromLang,results) }

        Async.StartWithContinuations(

            task,

            (fun (fromLang,results) ->

                for (toLang, translatedText) in results do

                    translated.Text <- translated.Text + sprintf "\r\n%s --> %s: \"%s\"" fromLang toLang translatedText),

            (fun exn -> MessageBox.Show(sprintf "An error occurred: %A" exn) |> ignore),

            (fun cxn -> MessageBox.Show(sprintf "A cancellation error ocurred: %A" cxn) |> ignore)))

     

    In the highlighted parts, the async is specified, and this includes a use of Async.Parallel to translate the input text into multiple languages in parallel. The composite async is started with Async.StartWithContinuations. This unblocks as soon as the async hits its first I/O operation, and specifies three functions to run when the async completes with success, failure or cancellation. Here is a screen shot of the operation after the task completes (no guarantees given about the accuracy of the translation...)

     

     Bing Translator Screen Shot

     

    Async.StartWithContinuations has the important property that if the async is started on the GUI thread (i.e. a thread with a non-null SynchronizationContext.Current), then the completion function is called on the GUI thread. This makes it safe to update the results. The F# async library allows you to specify composite I/O tasks and use them from the GUI thread without having to marshal your updates from background threads, a topic we’ll explore in later posts.

     

    Some notes on how Async.Parallel works:

    Ø When run, asyncs composed with Async.Parallel are initially started through a queue of pending computations. Ultimately this uses QueueUserWorkItem, like most async processing libraries. It is possible to use a separate queue, something we’ll discuss in later posts.

    Ø There is nothing particularly magical about Async.Parallel: you can define your own async combinators that coordinate asyncs in different ways by using other primitives in the Microsoft.FSharp.Control.Async library such as Async.StartChild. We’ll return to this topic in a later post.

    More Examples

    Example uses of these patterns in the F# JAOO Tutorial code are

    Ø BingTranslator.fsx and BingTranslatorShort.fsx: calling a REST API using F#. This is similar to any similar web-based HTTP service. A version of this sample is given below.

    Ø AsyncImages.fsx: parallel disk I/O and image processing

    Ø PeriodicTable.fsx: calling a web service, fetching atomic weights in parallel

    Limitations of the Patterns

    The two parallel patterns shown here have some limitations. Notably, an async generated by Async.Parallel is not, when run, “chatty” – for example, it doesn’t report progress or partial results. To handle that we need to build a more chatty object that raises events as partial operations complete. We’ll be looking at that design pattern in later posts.

     

    Also, Async.Parallel handles a fixed number of jobs. In later posts we'll look at many examples where jobs get generated as work progresses. Another way to look at that is that

    an async generated by Async.Parallel does not immediately accept incoming messages, i.e. it is not an agent whose progress can be directed, apart from cancellation.

     

    Asyncs generated by Async.Parallel do support cancellation. Cancellation is not effective until all the sub-tasks have completed or been effectively cancelled. This is normally what you want.

    Conclusion

    The Parallel CPU Asyncs and Parallel I/O Asyncs patterns are probably the two simplest design patterns using F# async programming. As often with simple things, they are important and powerful. Note that the only difference between the patterns is that I/O Parallel uses asyncs which include (and are often dominated by) I/O requests, plus some CPU processing to create request objects and to do post-processing.

    In future blog posts we’ll be looking at additional design topics for parallel and reactive programming with F# async, including

    Ø starting asyncs from the GUI thread

    Ø defining lightweight async agents

    Ø defining background worker components using async

    Ø authoring .NET tasks using async

    Ø authoring the.NET APM patterns using async

    Ø cancelling asyncs

    BingTranslator Code Sample

     

    Here’s the sample code for the BingTranslator example. You’ll need a Live API 1.1 AppID to run it

     

    (NOTE: the samples would need to be adjusted for the Bing API 2.0, notably the language detection API is not present in 2.0, however the code should still act as a good guide)

     

    open System

    open System.Net

    open System.IO

    open System.Drawing

    open System.Windows.Forms

    open System.Text

    /// A standard helper to read all the lines of a HTTP request. The actual read of the lines is

    /// synchronous once the HTTP response has been received.

    let httpLines (uri:string) =

      async { let request = WebRequest.Create uri

              use! response = request.AsyncGetResponse()

              use stream = response.GetResponseStream()

              use reader = new StreamReader(stream)

              let lines = [ while not reader.EndOfStream do yield reader.ReadLine() ]

              return lines }

    type System.Net.WebRequest with

        /// An extension member to write content into an WebRequest.

        /// The write of the content is synchronous.

        member req.WriteContent (content:string) =

            let bytes = Encoding.UTF8.GetBytes content

            req.ContentLength <- int64 bytes.Length

            use stream = req.GetRequestStream()

            stream.Write(bytes,0,bytes.Length)

        /// An extension member to read the content from a response to a WebRequest.

        /// The read of the content is synchronous once the response has been received.

        member req.AsyncReadResponse () =

            async { use! response = req.AsyncGetResponse()

                    use responseStream = response.GetResponseStream()

                    use reader = new StreamReader(responseStream)

                    return reader.ReadToEnd() }

    #load @"C:\fsharp\staging\docs\presentations\2009-10-04-jaoo-tutorial\BingAppId.fs"

    //let myAppId = "please set your Bing AppId here"

    /// The URIs for the REST service we are using

    let detectUri = "api.microsofttranslator.com/V1/Http.svc/Detect?appId=" + myAppId

    let translateUri = "api.microsofttranslator.com/V1/Http.svc/Translate?appId=" + myAppId + "&"

    let languageUri = "api.microsofttranslator.com/V1/Http.svc/GetLanguages?appId=" + myAppId

    let languageNameUri = "api.microsofttranslator.com/V1/Http.svc/GetLanguageNames?appId=" + myAppId

    /// Create the user interface elements

    let form = new Form (Visible=true, TopMost=true, Height=500, Width=600)

    let textBox = new TextBox (Width=450, Text="Enter some text", Font=new Font("Consolas", 14.0F))

    let button = new Button (Text="Translate", Left = 460)

    let translated = new TextBox (Width = 590, Height = 400, Top = 50, ScrollBars = ScrollBars.Both, Multiline = true, Font=new Font("Consolas", 14.0F))

    form.Controls.Add textBox

    form.Controls.Add button

    form.Controls.Add translated

    /// An async method to call the language detection API

    let detectLanguage text =

      async { let request = WebRequest.Create (detectUri, Method="Post", ContentType="text/plain")

              do request.WriteContent text

              return! request.AsyncReadResponse() }

    /// An async method to call the text translation API

    let translateText (text, fromLang, toLang) =

      async { let uri = sprintf "%sfrom=%s&to=%s" translateUri fromLang toLang

              let request = WebRequest.Create (uri, Method="Post", ContentType="text/plain")

              request.WriteContent text

              let! translatedText = request.AsyncReadResponse()

              return (toLang, translatedText) }

    button.Click.Add(fun args ->

        let text = textBox.Text

        translated.Text <- "Translating..."

        let task =

            async { /// Get the supported languages

                    let! languages = httpLines languageUri

                    /// Detect the language of the input text. This could be done in parallel with the previous step.

                    let! fromLang = detectLanguage text

                    /// Translate into each language, in parallel

                    let! results = Async.Parallel [for lang in languages -> translateText (text, fromLang, lang)]

                  /// Return the results

                    return (fromLang,results) }

        /// Start the task. When it completes, show the results.

        Async.StartWithContinuations(

            task,

            (fun (fromLang,results) ->

                for (toLang, translatedText) in results do

                    translated.Text <- translated.Text + sprintf "\r\n%s --> %s: \"%s\"" fromLang toLang translatedText),

            (fun exn -> MessageBox.Show(sprintf "An error occurred: %A" exn) |> ignore),

            (fun cxn -> MessageBox.Show(sprintf "A cancellation error ocurred: %A" cxn) |> ignore)))