Writing a Map/Reduce Job for Hadoop using Windows PowerShell

I had a little bit of time on my hand and wanted to whip up a quick sample using PowerShell for a Hadoop job.

This uses the Hadoop streaming capability, which essentially allows for mappers and reducers to be written as arbitrary executables that operate on standard input and output.

The .ps1 scripts are pretty simple, these operate over a set of airline data that looks like this:

 0,EV,18,0,MIA,2011-12-18T00:00:00,12,TPA,1227859,2011    
0,EV,17,0,MIA,2011-12-17T00:00:00,12,TPA,1227860,2011    
6,EV,17,4,CLE,2011-12-17T00:00:00,12,ATL,1227861,2011

The schema here is a comma separated set of US flights with delays.

The goal of my job is to pull out the airlines, the number of flights, and then some very basic (min and max) statistics on the arrival and departure delays. 

My mapper:

    1:  function Map-Airline 
    2:  {
    3:      [Console]::Error.WriteLine( "reporter:counter:powershell,invocations,1")
    4:      $line = [Console]::ReadLine()
    5:      while ($line -ne $null) 
    6:      {
    7:          [Console]::WriteLine($line.Split(",")[1] + "`t" + $line)
    8:          [Console]::Error.WriteLine("reporter:counter:powershell,record,1")
    9:          $line = [Console]::ReadLine()
   10:      }  
   11:  }
   12:   
   13:   
   14:  Map-Airline 

My reducer:

    1:  function Reduce-Airlines
    2:  {
    3:      $line = ""
    4:      $oldLine = "<initial invalid row value>"
    5:      $count = 0
    6:      $minArrDelay = 10000
    7:      $maxArrDelay = 0
    8:      $minDepDelay = 10000 
    9:      $maxDepDelay = 0 
   10:   
   11:      $line = [Console]::ReadLine()
   12:   
   13:      while  ($line -ne $null)
   14:      {
   15:          if (($oldLine -eq $line.Split("`t")[0]) -or ($oldLine -eq "<initial invalid row value>"))
   16:          {
   17:              $flightRecord = $line.Split("`t")[1].Split(',')
   18:              if ([Int32]::Parse($flightRecord[0]) -ne 0) 
   19:              {
   20:                  $minArrDelay = [Math]::Min($minArrDelay, $flightRecord[0])
   21:              }
   22:              if ([Int32]::Parse($flightRecord[3]) -ne 0) 
   23:              {
   24:                  $minDepDelay = [Math]::Min($minDepDelay, [Int32]::Parse($flightRecord[3]))
   25:              }
   26:              $maxArrDelay = [Math]::Max($maxArrDelay, $flightRecord[0])
   27:              $maxDepDelay = [Math]::Max($maxDepDelay, $flightRecord[3])
   28:              $count = $count+ 1
   29:              [Console]::Error.WriteLine( "reporter:counter:powershell,"+$oldLine + ",1")
   30:          }
   31:          else
   32:          {
   33:              [Console]::WriteLine($oldLine + "`t" + $count + "," + $minArrDelay + "," +$maxArrDelay + "," + $minDepDelay +","+ $maxDepDelay)
   34:              $count = 1
   35:              $minArrDelay = 10000
   36:              $maxArrDelay = 0
   37:              $minDepDelay = 10000 
   38:              $maxDepDelay = 0 
   39:              [Console]::Error.WriteLine("reporter:counter:powershell,"+$oldLine + ",1")           
   40:          }
   41:          $oldLine = $line.Split("`t")[0]
   42:          $line = [Console]::ReadLine()
   43:      }
   44:      [Console]::WriteLine($oldLine + "`t" + $count + "," + $minArrDelay + "," +$maxArrDelay + "," + $minDepDelay +","+ $maxDepDelay)
   45:  }
   46:   
   47:  Reduce-Airlines

One thing to note on the reducer is that we use the $oldLine variable in order to keep tabs on when our group of results is moving to the next one.  When using Java, your reduce function will be invoked once per group and so you can reset the state at the beginning of each of those.  With streaming, you will never have groups that split reducers, but your executable will only be spun up once per reducer (which, in the sample here, is one).  You can also see that I’m writing out to STDERR in order to get a few counters recorded as well.

The next trick is to get these to execute.  The process spawned by the Streaming job does not know about .ps1 files, it’s basically just cmd.exe.  To get around that we will also create a small driver .cmd file and upload the file with the –file directive from the command line.

 

AirlineMapperPs.cmd

 @call c:\windows\system32\WindowsPowerShell\v1.0\powershell -file ..\..\jars\AirlineMapper.ps1

 

AirlineReducerPs.cmd

 @call c:\windows\system32\WindowsPowerShell\v1.0\powershell -file ..\..\jars\AirlineReducer.ps1

 

The ..\..\jars directory is where the –file directive will place the files when they execute

And now we execute:

 hadoop jar %HADOOP_HOME%\lib\hadoop-streaming.jar 
    -mapper d:\dev\_full_path_\AirlineMapperPs.cmd 
    -reducer d:\dev\_full_path_\AirlineReducerPs.cmd 
    -input fixed_flights 
    -output psMapReduce3 
    -file d:\dev\_full_path_\AirlineMapper.ps1 
    -file d:\dev\_full_path_\AirlineReducer.ps1

And we get our results.

There is still some work to be done here, I’d like to make it a little easier to get these running (so possibly wrapping submission in a script which takes care of the wrapping for me).  Also, on Azure, we either need to sign the scripts, or log into each of the machines and allow script execution.  As I get that wrapped up, I’ll drop it along side our other samples.  We’ll also work to make this easier to get up and running on Azure if that’s interesting for folks.