IronPython + F# + Parallel + Async = A Kittehz Brekfst

funny pictures of cats with captions
more animals

As per the image above, cats and threads do no mix. Python and threads may not mix, either.

Whilst thread-safety, dead-locks, queues and other nasties are nasty, Microsoft sees the world of n-cores as a software problem to be solved, not ignored. Both at a low-level and a high-level, well constructed and debugged Parallel libraries are appearing to make the splitting of tasks easier.

IronPython implements the Python language on the DLR and subsequently on the CLR, but this does not automagically provide IronPython with threading and parallelism. Nor could I suggest that IronPython is the silver-bullet for clean parallelism with Python. Various projects such as ParallelPython (including cluster support), Stackless Python and the recent python-multiprocessing package are appearing to move CPython into today’s world of n-cores.

IronPython does have kissing-cousin languages like F#. From F#, and the Microsoft Parallel Extensions Jun08 CTP – a parallel library is a .dll away. Step 1: create a .dll from Luca’s demonstration from PDC2008 specifically the MathLibrary/StockAnalyzer. Combining both the async/parallel versions of the F# code with the single-threaded version was easy.

From IronPython:

    1:  import clr
    2:  clr.AddReferenceToFile("MathLibrary.dll") 
    3:  import StockLibrary as StockLibrary
    4:  import sys, time
    5:  tickers = ['msft','adbe','aapl','ebay']

Line 1 imports the CLR so your code can import a .NET DLL (clr.AddReferenceToFile). The namespace is StockLibrary. Now methods within this DLL can be called. The fifth line sets up the four test Nasdaq codes we are analyzing.

    6:  analysersA = StockLibrary.StockAnalyzerParallel.GetAnalyzers (tickers, 365)
    7:  print ">> starting async"
    8:  start = time.clock()
    9:  for a in analysersA:
    10:      print a.Return
    11:  print "> seconds taken: ", time.clock() - start
    12:  print ">> completed"
  

The analysersA instantiates the async/parallel version of the F# methods. Here I have wrapped the code with a simple time.clock() timer

    13:  analysersP = StockLibrary.StockAnalyzer.GetAnalyzers (tickers, 365)
    14:  print ">> starting normal"
    15:  start = time.clock() # get current time in seconds
    16:  for a in analysersP:
    17:      print a.Return
    18:  print "> seconds taken: ", time.clock() - start
    19:  print ">> completed"

The analysersP (p for plain) uses the single-threaded version in the library.

Results:

On General Melchett, my Intel Quadcore, I was able to execute the parallel version in 0.5s vs 11.0s in single-threaded mode. I suggest the 22x speed improvement comes from the Asyc calls to the network methods more than the calculations involved. More tests would be required to confirm the dramatic difference. 

In my example I am treating the F# DLL as a simple black box: a list of parameters in and single results out. In the real-world, careful consideration would need to be taken on the immutability of any Python objects/variables passed in; and the resulting factoring of code in F# vs. IronPython.

(this was a simple 1 minute demonstration at the Edge of the Web Conference in Perth expanded into a blog post)

For reference: My version of Luca’s F#: MathLibrary.fs

 #light

// ASYNC version
// note: you need to ensure the project has a reference to FSharp.PowerPack to work
// async is difficult, here we change 4 lines, add some curly braces and we have async+parallel

open System.Net
open System.IO

let internal loadPricesAsync ticker = async { // now returns async token
    let url = "https://ichart.finance.yahoo.com/table.csv?s=" + ticker + "&d=10&e=3&f=2008&g=d&a=2&b=13&c=1986&ignore=.csv"

    let req = WebRequest.Create(url)
    let! resp = req.AsyncGetResponse()                      // holds the thread waiting for the network, blocked: so parallelize + asynch
    let stream = resp.GetResponseStream()
    let reader = new StreamReader(stream)
    let! csv = reader.AsyncReadToEnd()                      // holds the thread waiting for the network, blocked: so parallelize + asynch

    let prices =                                        
        csv.Split([|'\n'|])                             
        |> Seq.skip 1
        |> Seq.map ( fun line -> line.Split([|','|]) )    
        |> Seq.filter (fun values -> values |> Seq.length = 7 ) 
        |> Seq.map (fun values -> 
            System.DateTime.Parse(values.[0]),              
            float values.[6] )
    return prices
    }                                                       // added async to let, add a return, look at places for release control, add bang and call Async versions
    
type StockAnalyzerParallel (lprices, days) =
    let prices =
        lprices
        |> Seq.map snd
        |> Seq.take days                                    // chop sequence to days
    static member GetAnalyzers (tickers, days) =
        tickers
        |> Seq.map loadPricesAsync
        |> Async.Parallel                                   // now that loadPrices returns async, parallelize and run; then map
        |> Async.Run                                        // now run in parallel
        |> Seq.map (fun prices -> new StockAnalyzerParallel (prices, days))
    member s.Return =
        let lastPrice = prices |> Seq.nth 0
        let startPrice = prices |> Seq.nth ( days - 1 )
        lastPrice / startPrice - 1.0
    member s.StdDev =
        let logRets =
            prices
            |> Seq.pairwise
            |> Seq.map ( fun (x, y) -> log (x / y))
        let mean = logRets |> Seq.average
        let sqr x = x * x
        let var = logRets |> Seq.average_by (fun r -> sqr (r - mean))
        sqrt var
        
        
        
let internal loadPrices ticker =
    let url = "https://ichart.finance.yahoo.com/table.csv?s=" + ticker + "&d=10&e=3&f=2008&g=d&a=2&b=13&c=1986&ignore=.csv"

    // code is identical to C# version as we are using underlying .Net libraries
    let req = WebRequest.Create(url)
    let resp = req.GetResponse()
    let stream = resp.GetResponseStream()
    let reader = new StreamReader(stream)
    let csv = reader.ReadToEnd()

    let prices =                                        // returns a tuple based on comma
        csv.Split([|'\n'|])                             //note [| syntax passes in .Net array to the string.Split method
        |> Seq.skip 1
        |> Seq.map ( fun line -> line.Split([|','|]) )    // fun define anonymous function/lambda expression
        |> Seq.filter (fun values -> values |> Seq.length = 7 )  // filter out where less than 7 values
        |> Seq.map (fun values -> 
            System.DateTime.Parse(values.[0]),              // get the 0th and 6th column
            float values.[6] )
    prices
    
type StockAnalyzer (lprices, days) =
    let prices =
        lprices
        |> Seq.map snd
        |> Seq.take days // chop sequence to days
    static member GetAnalyzers (tickers, days) =
        tickers
        |> Seq.map loadPrices
        |> Seq.map (fun prices -> new StockAnalyzer (prices, days))
    member s.Return =
        let lastPrice = prices |> Seq.nth 0
        let startPrice = prices |> Seq.nth ( days - 1 )
        lastPrice / startPrice - 1.0
    member s.StdDev =
        let logRets =
            prices
            |> Seq.pairwise // sequence of things, first + second tuple; second + third etc
            |> Seq.map ( fun (x, y) -> log (x / y))
        let mean = logRets |> Seq.average
        let sqr x = x * x
        let var = logRets |> Seq.average_by (fun r -> sqr (r - mean))
        sqrt var