Async and Parallel Design Patterns in F#: Reporting Progress with Events (plus Twitter Sample)

In this post we will look at a common async design pattern I call Reporting Progress With Events. Later in this post we use this design pattern to read a sample stream of tweets from Twitter.

This is the second part of a series covering basic techniques in F# async programming. Some of the samples are drawn from code in the F# JAOO Tutorial.

  • Part 3 describes lightweight, reactive, isolated agents in F# .

Pattern #3: Reporting Progress With Events

Let’s first take a look at an instance of the essence of the design pattern. Below, we define an object to coordinate the parallel execution of a group of asyncs. Each job reports its result as it is finishes, rather than waiting for the collection of results.

The essence of the design pattern is highlighted in yellow:

  •  The current “synchronization context” is captured from the GUI thread in the Start method of the object. This is a handle that allows us to run code and raise events in the GUI context. A private helper function is defined to trigger any F# event. This is not strictly needed but makes code much neater.
  •  One or more events are defined. The events are published as properties, and annotated with [<CLIEvent>] if the object is to be used from other .NET languages.
  •   A background job is started, in this case by specifying an asynchronous workflow which defines the background work to be performed. Async.Start begins an instance of the workflow (though Async.StartWithContinuations is often used instead, as in a later example in this post). The events are raised at appropriate points in the execution of the background job as progress is made.

 

type AsyncWorker<'T>(jobs: seq<Async<'T>>) = 

    // This declares an F# event that we can raise

    let jobCompleted = new Event<int * 'T>()

    /// Start an instance of the work

    member x.Start() =

        // Capture the synchronization context to allow us to raise events back on the GUI thread

        let syncContext = SynchronizationContext.CaptureCurrent()

        // Mark up the jobs with numbers

        let jobs = jobs |> Seq.mapi (fun i job -> (job,i+1))

        let work =

            Async.Parallel

               [ for (job,jobNumber) in jobs ->

                   async { let! result = job

                           syncContext.RaiseEvent jobCompleted (jobNumber,result)

                           return result } ]

        Async.Start(work |> Async.Ignore)

    /// Raised when a particular job completes

    member x.JobCompleted = jobCompleted.Publish

This code uses two helper extension methods on System.Threading.SynchronizationContext which we will use frequently in this series of articles. These are shown below:

 

type SynchronizationContext with

    /// A standard helper extension method to raise an event on the GUI thread

    member syncContext.RaiseEvent (event: Event<_>) args =

      syncContext.Post((fun _ -> event.Trigger args),state=null)

    /// A standard helper extension method to capture the current synchronization context.

    /// If none is present, use a context that executes work in the thread pool.

    static member CaptureCurrent () =

        match SynchronizationContext.Current with

        | null -> new SynchronizationContext()

        | ctxt -> ctxt

 

You can now use this component to supervise the execution of a collection of CPU-intensive asyncs:

    let rec fib i = if i < 2 then 1 else fib (i-1) + fib (i-2)   

    let worker =

        new AsyncWorker<_>( [ for i in 1 .. 100 -> async { return fib (i % 40) } ] )

    worker.JobCompleted.Add(fun (jobNumber, result) ->

        printfn "job %d completed with result %A" jobNumber result)

    worker.Start()

When run, the progress is reported as each job completes:

job 1 completed with result 1

job 2 completed with result 2

...

job 39 completed with result 102334155

job 77 completed with result 39088169

job 79 completed with result 102334155

There are a number of ways to report results from running background processes. For 90% of cases, the easiest way is that shown above: report results by raising .NET events back on a GUI (or ASP.NET Page Load) thread. This technique fully hides the use of background threading and makes use of entirely standard .NET idioms that will be familiar to any .NET programmer. This ensures that the techniques used to implement your parallel programming are appropriately encapsulated.

Reporting Progress of I/O Asyncs

The Reporting Progress With Events pattern can also be used with I/O asyncs. For example, consider this set of I/O tasks:

    open System.IO

    open System.Net

    open Microsoft.FSharp.Control.WebExtensions

    /// Fetch the contents of a web page, asynchronously.

    let httpAsync(url:string) =

        async { let req = WebRequest.Create(url)

                use! resp = req.AsyncGetResponse()

                use stream = resp.GetResponseStream()

                use reader = new StreamReader(stream)

                let text = reader.ReadToEnd()

                return text }

    let urls =

        [ "www.live.com";

          "news.live.com";

          "www.yahoo.com";

          "news.yahoo.com";

          "www.google.com";

          "news.google.com"; ]

    let jobs = [ for url in urls -> httpAsync url ]

   

    let worker = new AsyncWorker<_>(jobs)

    worker.JobCompleted.Add(fun (jobNumber, result) ->

        printfn "job %d completed with result %A" jobNumber result.Length)

    worker.Start()

When run, the progressive results are reported, showing the lengths of each web page:

job 5 completed with result 8521

job 6 completed with result 155767

job 3 completed with result 117778

job 1 completed with result 16490

job 4 completed with result 175186

job 2 completed with result 70362

 

Some Jobs May Report Multiple, Different Events

In this design pattern, one reason we use an object to encapsulate and supervise the execution of a parallel composition of asyncs is that it makes it simple to enrich the API of supervisor with further events. For example, the code below adds additional events that are raised when all jobs complete, or when an error is detected among any of the jobs, or when the overall composition was successfully cancelled before completion. The highlighted portions show the events that are declared, raised and published.

open System

open System.Threading

open System.IO

open Microsoft.FSharp.Control.WebExtensions

type AsyncWorker<'T>(jobs: seq<Async<'T>>) =

    // Each of these lines declares an F# event that we can raise

    let allCompleted = new Event<'T[]>()

    let error = new Event<System.Exception>()

    let canceled = new Event<System.OperationCanceledException>()

    let jobCompleted = new Event<int * 'T>()

    let cancellationCapability = new CancellationTokenSource()

    /// Start an instance of the work

    member x.Start() =                                                                                                             

        // Capture the synchronization context to allow us to raise events back on the GUI thread

let syncContext = SynchronizationContext.CaptureCurrent()

        // Mark up the jobs with numbers

 

        let jobs = jobs |> Seq.mapi (fun i job -> (job,i+1))

        let work =

            Async.Parallel

               [ for (job,jobNumber) in jobs ->

                   async { let! result = job

                           syncContext.RaiseEvent jobCompleted (jobNumber,result)

                           return result } ]

        Async.StartWithContinuations

            ( work,

              (fun res -> raiseEventOnGuiThread allCompleted res),

              (fun exn -> raiseEventOnGuiThread error exn),

              (fun exn -> raiseEventOnGuiThread canceled exn ),

             cancellationCapability.Token)

    member x.CancelAsync() =

       cancellationCapability.Cancel()

       

    /// Raised when a particular job completes

    member x.JobCompleted = jobCompleted.Publish

    /// Raised when all jobs complete

    member x.AllCompleted = allCompleted.Publish

    /// Raised when the composition is cancelled successfully

    member x.Canceled = canceled.Publish

    /// Raised when the composition exhibits an error

    member x.Error = error.Publish

We can make use of these additional events in the usual way, e.g.

    let worker = new AsyncWorker<_>(jobs)

    worker.JobCompleted.Add(fun (jobNumber, result) ->

        printfn "job %d completed with result %A" jobNumber result.Length)

    worker.AllCompleted.Add(fun results ->

        printfn "all done, results = %A" results )

    worker.Start()

The supervised async workflow can support cancellation, as shown in the example above.

Tweet Tweet, Tweet Tweet

The Reporting Progress With Events pattern can be applied to pretty much any background processing component which reports results along the way. In the following example, we use the pattern to encapsulate the background read of a stream of tweets from Twitter (see the Twitter API pages). The sample requires a Twitter account and password. Only one event is raised in this case, though the sample could be extended to raise other events in other conditions.

A version of this sample is included in the F# JAOO Tutorial.

// F# Twitter Feed Sample using F# Async Programming and Event processing

//

#r "System.Web.dll"

#r "System.Windows.Forms.dll"

#r "System.Xml.dll"

open System

open System.Globalization

open System.IO

open System.Net

open System.Web

open System.Threading

open Microsoft.FSharp.Control.WebExtensions

/// A component which listens to tweets in the background and raises an

/// event each time a tweet is observed

type TwitterStreamSample(userName:string, password:string) =

    let tweetEvent = new Event<_>()

    let streamSampleUrl = "stream.twitter.com/1/statuses/sample.xml?delimited=length"

    /// The cancellation condition

    let mutable group = new CancellationTokenSource()

    /// Start listening to a stream of tweets

    member this.StartListening() =

                                                       

         /// The background process

        // Capture the synchronization context to allow us to raise events back on the GUI thread

let syncContext = SynchronizationContext.CaptureCurrent()

 

        let listener (syncContext: SynchronizationContext) =

            async { let credentials = NetworkCredential(userName, password)

                    let req = WebRequest.Create(streamSampleUrl, Credentials=credentials)

                    use! resp = req.AsyncGetResponse()

                    use stream = resp.GetResponseStream()

                    use reader = new StreamReader(stream)

                    let atEnd = reader.EndOfStream

                    let rec loop() =

                        async {

                            let atEnd = reader.EndOfStream

                            if not atEnd then

                                let sizeLine = reader.ReadLine()

                                if String.IsNullOrEmpty sizeLine then return! loop() else

                                let size = int sizeLine

                                let buffer = Array.zeroCreate size

                                let _numRead = reader.ReadBlock(buffer,0,size)

                                let text = new System.String(buffer)

                                syncContext.RaiseEvent tweetEvent text

                                return! loop()

                        }

                    return! loop() }

        Async.Start(listener, group.Token)

    /// Stop listening to a stream of tweets

    member this.StopListening() =

        group.Cancel();

        group <- new CancellationTokenSource()

    /// Raised when the XML for a tweet arrives

    member this.NewTweet = tweetEvent.Publish

This raises an event each time a tweet occurs from the standard sample stream provided by Twitter, and provides the contents of that tweet. We can listen into this stream as follows:

let userName = "..." // set Twitter user name here

let password = "..." // set Twitter user name here

let twitterStream = new TwitterStreamSample(userName, password)

twitterStream.NewTweet

   |> Event.add (fun s -> printfn "%A" s)

twitterStream.StartListening()

twitterStream.StopListening()

When run, a stream of the raw XML for tweets is printed (pretty quickly!). See the Twitter API pages for how this stream is sampled.

If you would like to also parse these tweets, here’s some sample code that does an approximate job of this (though also be aware of the guidance on the Twitter API pages, e.g. that tweets should often be saved or queued before processing when building a high-reliability system)

#r "System.Xml.dll"

#r "System.Xml.Linq.dll"

open System.Xml

open System.Xml.Linq

let xn (s:string) = XName.op_Implicit s

/// The results of the parsed tweet

type UserStatus =

    { UserName : string

      ProfileImage : string

      Status : string

      StatusDate : DateTime }

/// Attempt to parse a tweet

let parseTweet (xml: string) = 

    let document = XDocument.Parse xml

    let node = document.Root

    if node.Element(xn "user") <> null then

        Some { UserName = node.Element(xn "user").Element(xn "screen_name").Value;

               ProfileImage = node.Element(xn "user").Element(xn "profile_image_url").Value;

               Status = node.Element(xn "text").Value |> HttpUtility.HtmlDecode;

               StatusDate = node.Element(xn "created_at").Value |> (fun msg ->

                                   DateTime.ParseExact(msg, "ddd MMM dd HH:mm:ss +0000 yyyy",

                                                       CultureInfo.CurrentCulture)); }

    else

        None

And combinator programming can be used to pipeline from this stream:

twitterStream.NewTweet

   |> Event.choose parseTweet

   |> Event.add (fun s -> printfn "%A" s)

twitterStream.StartListening()

And to collect statistics from the stream:

let addToMultiMap key x multiMap =

   let prev = match Map.tryFind key multiMap with None -> [] | Some v -> v

   Map.add x.UserName (x::prev) multiMap

/// An event which triggers on every 'n' triggers of the input event

let every n (ev:IEvent<_>) =

   let out = new Event<_>()

   let count = ref 0

   ev.Add (fun arg -> incr count; if !count % n = 0 then out.Trigger arg)

   out.Publish

twitterStream.NewTweet

   |> Event.choose parseTweet

   // Build up the table of tweets indexed by user

   |> Event.scan (fun z x -> addToMultiMap x.UserName x z) Map.empty

   // Take every 20’th index

   |> every 20

   // Listen and display the average of #tweets/user

   |> Event.add (fun s ->

        let avg = s |> Seq.averageBy (fun (KeyValue(_,d)) -> float d.Length)

        printfn "#users = %d, avg tweets = %g" s.Count avg)

twitterStream.StartListening()

This indexes the tweets by user and determines the average number of tweets from each user in this sample stream, reporting results every 20 successfully parsed tweets:

#users = 19, avg tweets = 1.05263

#users = 39, avg tweets = 1.02564

#users = 59, avg tweets = 1.01695

#users = 79, avg tweets = 1.01266

#users = 99, avg tweets = 1.0101

#users = 118, avg tweets = 1.01695

#users = 138, avg tweets = 1.01449

#users = 158, avg tweets = 1.01266

#users = 178, avg tweets = 1.01124

#users = 198, avg tweets = 1.0101

#users = 218, avg tweets = 1.00917

#users = 237, avg tweets = 1.01266

#users = 257, avg tweets = 1.01167

#users = 277, avg tweets = 1.01083

#users = 297, avg tweets = 1.0101

#users = 317, avg tweets = 1.00946

#users = 337, avg tweets = 1.0089

#users = 357, avg tweets = 1.0084

#users = 377, avg tweets = 1.00796

#users = 396, avg tweets = 1.0101

#users = 416, avg tweets = 1.00962

#users = 435, avg tweets = 1.01149

#users = 455, avg tweets = 1.01099

#users = 474, avg tweets = 1.01266

#users = 494, avg tweets = 1.01215

#users = 514, avg tweets = 1.01167

#users = 534, avg tweets = 1.01124

#users = 554, avg tweets = 1.01083

#users = 574, avg tweets = 1.01045

#users = 594, avg tweets = 1.0101

Using a slightly different analysis we can display those users who have tweeted more than once in the sample stream provided by Twitter, along with their latest tweet. This is executed interactively from F# Interactive and uses the F# Interactive data grid view snippet from a previous post:

open System.Drawing

open System.Windows.Forms

let form = new Form(Visible = true, Text = "A Simple F# Form", TopMost = true, Size = Size(600,600))

let data = new DataGridView(Dock = DockStyle.Fill, Text = "F# Programming is Fun!",

                            Font = new Font("Lucida Console",12.0f),

                            ForeColor = Color.DarkBlue)

form.Controls.Add(data)

data.DataSource <- [| (10,10,10) |]

data.Columns.[0].Width <- 200

data.Columns.[2].Width <- 500

twitterStream.NewTweet

   |> Event.choose parseTweet

   // Build up the table of tweets indexed by user

   |> Event.scan (fun z x -> addToMultiMap x.UserName x z) Map.empty

   // Take every 20’th index

   |> every 20

   // Listen and display those with more than one tweet

   |> Event.add (fun s ->

        let moreThanOneMessage = s |> Seq.filter (fun (KeyValue(_,d)) -> d.Length > 1)

        data.DataSource <-

            moreThanOneMessage

            |> Seq.map (fun (KeyValue(user,d)) -> (user, d.Length, d.Head.Status))

            |> Seq.filter (fun (_,n,_) -> n > 1)

            |> Seq.sortBy (fun (_,n,_) -> -n)

            |> Seq.toArray)

twitterStream.StartListening()

Here are some sample results:

tweets

Note: In the above example, we have used blocking I/O to read the Twitter stream. This is adequate for two reasons – the Twitter stream is very active (and probably will remain so for some time J), and we can also assume that there are not many outstanding connections to many Twitter streams – in this case there is only one, and in any case it appears Twitter places limitations on how many times you can listen to the sample stream for an account. In a later post we’ll show how to do a non-blocking read of this kind of stream of XML fragments.

F# for Parallel, C#/VB for GUI

The Reporting Progress With Events pattern is highly useful for the case where the F# programmer implements the background computation components, based on some inputs and the C# or VB programmer uses this component. In this case, the published events should be labeled with [<CLIEvent>] to ensure they appear as .NET events to the C# or VB programmer. For the second example above, you would use

    /// Raised when a particular job completes

    [<CLIEvent>]

    member x.JobCompleted = jobCompleted.Publish

    /// Raised when all jobs complete

    [<CLIEvent>]

    member x.AllCompleted = allCompleted.Publish

    /// Raised when the composition is cancelled successfully

    [<CLIEvent>]

    member x.Canceled = canceled.Publish

    /// Raised when the composition exhibits an error

    [<CLIEvent>]

    member x.Error = error.Publish

Limitations of the Pattern

The Reporting Progress With Events pattern assumes that a parallel processing component is hosted in a GUI application (e.g. Windows Forms), server-side application (e.g. ASP.NET) or some other context where it is possible to raise events back to some supervisor. It is possible to adjust the pattern to raise events in other ways, e.g. to post a message to a MailboxProcessor or simply to log them. However be aware that there is still an assumption in the design pattern that some kind of main thread or supervisor exists that is ready to listen to the events at any moment and queue them sensibly.

The Reporting Progress With Events pattern also assumes that the encapsulating object is able to capture the synchronization context of the GUI thread, normally implicitly (as in the examples above). This is usually a reasonable assumption. Alternatively this context could be given as an explicit parameter, though that is not a very common idiom in .NET programming.

For those familiar with the IObservable interface (added in .NET 4.0), you might have considered having the TwitterStreamSample type implement this interface. However, for root sources of events this doesn’t necessarily gain that much. For example, in the future, the TwitterStreamSample may need to provide multiple events, such as reporting auto-reconnections if errors occur, or reporting pauses or delays. In this scenario, simply raising .NET events is adequate, partly to ensure your object looks familiar to many .NET programmers. In F#, all published IEvent<_> values implement IObservable automatically and can be used directly with observable combinators.

Conclusion 

The Reporting Progress With Events is a powerful and elegant way to encapsulate parallel execution behind a boundary while still reporting results and progress.

From the outside, the AsyncWorker object effectively appears single threaded. Assuming your input asyncs are isolated, then this means the component does not expose the rest of your program to multi-threaded race conditions. All users of Javascript, ASP.NET and GUI frameworks like Windows Forms know that single-threadedness of those frameworks is both a blessing and a curse – you get simplicity (no data races!), but parallel and asynchronous programming is hard. In .NET programming, I/O and heavy CPU computations has to be offloaded to background threads. The above design pattern gives you the best of both worlds: you get independent, cooperative, “chatty” background processing components, including ones that do parallel processing and I/O, while maintaining the simplicity of single threaded GUI programming for most of your code. These components can be generic and reusable, like the ones shown above. This makes them amenable to independent unit testing. 

In future blog posts we’ll be looking at additional design topics for parallel and reactive programming with F# async, including

Ø  defining lightweight async agents

Ø  authoring .NET tasks using async

Ø  authoring the.NET APM patterns using async

Ø  cancelling asyncs