And now for something completely different.
As you may know Microsoft has recently announced plans for a Hadoop adoption for both Windows Server and Windows Azure. You can find out more about Hadoop and Windows Azure at Apache Hadoop-based Services for Windows Azure and Availability of Community Technology Preview (CTP) of Hadoop based Service on Windows Azure.
If you are not familiar with MapReduce then there are some useful resources for F# and MapReduce that would also be worth reading Exploring MapReduce with F# and Parsing Log Files with F#, MapReduce and Windows Azure.
As Hadoop is written in Java the main integration point for MapReduce for .Net developers is through Hadoop Streaming. Hadoop streaming being a utility that allows you to create and run MapReduce jobs with any executable or script as the mapper and/or the reducer. You can find out everything you want to know about Hadoop Streaming at: http://hadoop.apache.org/common/docs/current/streaming.html
Before starting it is worth noting that the complete code for this blog post can be found at:
When writing a Hadoop Streaming MapReduce job it will quickly become apparent that there is a codebase that one can reuse. As such, I thought it would be useful to put together this codebase to enable F# developers to write MapReduce libraries through a simple API. The idea was to provide reusable code such that one only needed to be concerned with implementing the MapReduce code with the following function prototype’s:
Map : string –> (string * obj) option
Reduce : string -> seq<string> –> obj option
The idea is that the Hadoop text input is processed and each input line is passed into the Map function which parses and filters the key/value pair for the data. The values are then sorted and merged, by Hadoop. The processed mapped data is then passed into the Reduce function, as a key and corresponding sequence of strings, which then defines the optional output value.
So why the use of the obj type? Hadoop Streaming is based on text data, albeit a binary interface is also available (more on this in a later post). Thus the inputs into the MapReduce are strings. However, when performing the MapReduce operations strings are not always suitable, but they do need to be able to be represented as strings. This is normally handled through serialization but in this case I have used sprintf with a “%O” pattern. Thus any type needs to have a meaningful obj.ToString() implementation such that the data can once again be parsed back into a workable type.
As a sample consider the following data:
This represents some mobile phone data with the 2nd column representing the query time and the 4th column representing the platform device. A simple MapReduce could be:
Pull the Device Platform and Query Time from the data:
Calculate the Min, Average, and Max Query Times.
The box function is used to ensure the types returned from the MapReduce calls is of the obj type; albeit this is not necessary as sprintf() call with the “%O” pattern will box the object before calling ToString().
So onto the executable code.
As this is Hadoop Streaming applications, both the mapper and the reducer are executables that read the input from StdIn (line by line) and emit the output to StdOut. Thus one just needs a console application that does a Console.ReadLine() to get the data, and perform a Console.WriteLine() to emit the output.
As with all F# Console applications the MainEntry point is defined as:
Each executable then contains a Controller module that is run to process the data.
The purpose of the Mapper code is to perform Input Format Parsing, Projection, and Filtering. The code to perform this is as follows:
A majority of this code provides a means for specifying input and output files to better aid testing (more on this in a bit).
The code boils down to the last three lines which performs the main functions:
This parses the input, maps the data into a key value pairs, filters out unwanted rows, and outputs the data using the outputCollector function.
The outputCollector function takes the processed key/value pair and outputs them to the correct stream. In the Java API the types used for output are based on a Writable interface; rather than Java Serialization. In this implementation the outputCollector takes the obj.ToString(); through the use of sprintf(). Thus if the provided types does not have a useful ToString() you will have to create the string representation before calling the outputCollector. The outputCollector performs the formatting of the key/value pair into a Tab delimited string; as required for Hadoop Streaming.
After running the Mapper, the data being parsed into the Reducer will be a key/value pair delimited with a Tab. Using the aforementioned sample data and Map, a sample, but selective, input dataset for a Reducer would be:
RIM OS 17:19:36
RIM OS 17:17:18
RIM OS 00:44:41
Windows Phone 12:54:25
Windows Phone 08:50:46
Windows Phone 11:13:28
iPhone OS 19:07:56
iPhone OS 03:34:59
proprietary development 14:29:20
proprietary development 14:30:17
The processing of the mapped data within the Reducer is a little more complex than the Mapper. The idea is that the data is grouped by the input key and the resulting sequences are passed into the Reduce function; a function call for each key along with the corresponding sequence.
Whereas the Seq.groupBy can perform this operation, the groupBy function makes no assumption on the ordering of the original sequence. As a consequence the resulting sequence is not lazily evaluated, and is thus not suitable for large sequences; an absolute must for Hadoop MapReduce jobs. The code thus has to create a lazily evaluated sequence for each input key. This can be achieved as one knows the input data is sorted.
To achieve this the processing has to be state dependant to handle the transition from one key value to the next. The input data is processed in such a fashion that any change in key value causes a transition to the next sequence. The state persistence is needed to track the key change, and to ensure the first yield of the new sequence is the value that caused this transition; transition values are not lost.
The code for the Reducer is as follows:
The code also contains the same testing structure to allow one to pass in an input and output file; once again to aid in debugging.
The code uses a string.Split() to derive the tuple of key/value pair; as this is how the Mapper writes out the data. Once the data has been Reduced, the outputCollector once again performs the formatting of the key/value data into a Tab delimited string, sending the text to the StdOut stream. The value in this case is the reduced data corresponding to the key.
The output for the sample data and Reduce sample code, listed above, would be:
Android (00:00:06, 12:54:39, 23:59:54)
RIM OS (00:01:07, 13:52:56, 23:59:58)
Unknown (00:00:36, 10:29:27, 23:52:36)
Windows Phone (00:00:32, 12:38:31, 23:55:17)
iPhone OS (00:00:01, 11:51:53, 23:59:50)
proprietary development (14:29:20, 14:29:44, 14:30:17)
The value corresponding to the key is the string representing of a tuple of type (TimeSpan*TimeSpan*TimeSpan).
As you can see, there is a fair amount of code controlling the input and output streams for calling the Map and reduce functions; that can be reused for all Hadoop Streaming jobs.
Testing the Code
So now the code has been put together how can it be tested? Debugging a job once it has been deployed to a Hadoop cluster is not an easy task. As such one is much better off if testing can be performed without Hadoop in the picture. Whereas this will not cover all test cases, as some issues will only be found when deployed to a cluster, it does provide a means to cover most test scenarios.
Unit Testing the individual map and Reduce functions is relatively straight forward. However performing testing on sample input data is a little trickier.
To assist in testing with data files I have put together a Tester application. This application:
- Defines and executes a Process for the Mapper executable in which
- The StdIn is modified to be the file specified in the “–input” command line argument
- the StdOut is modified to be a file with a “mapper” extension
- When the Mapper has completed, Sorts the output file from the Mapper into a file with a “reducer” extension
- When the Sort is complete, defines and executes a Process for the Reducer executable in which
- The StdIn is modified to be the sorted “reducer” file
- The StdOut is modified to be the file specified in the “–output” command line argument
Running the Tester application allows one to check inputs and outputs, in a flow similar to running within Hadoop. The full listing for the Tester application can be found in the download code; but more on this in a later post.
Whereas this allows for simple data testing there is an easier option when one needs to attach a debugger. Bother the Mapper and Reducer executables accept a “–input” and “–output” command line arguments. These options define the files to be used, rather than the Standard Input/Output streams, for data processing. Using these arguments one can easily debug the executable with a set of test data and view the output. To create a Reducer input, for testing, one can run the Tester application with the required input data, and use the output file with a “reducer” extension as the input.
One final thing that has been useful for testing is the concept of a Null controller. This is merely an executable that just takes the input and passes it to the output stream:
This Null controller is useful when deploying your application to a Hadoop cluster to ensure data is being input and output correctly.
Hadoop and F#
I am not going to say much about running the Hadoop job other than to show the command to run the Streaming job:
The FSharp.Hadoop.Utilities.dll file contains utilities for processing command line arguments and the Null Controller.
The sample download code includes all the necessary command line code to run the job, and also includes a sample data file and some commands to copy this data to the cluster.
There is however a point to be made about using F#, namely the dependency on the F# Runtime. Importantly the F# Runtime contains many useful functions and types, including APIs for collections such as lists, arrays, maps, sets and sequences. One should install the F# Runtime on all the nodes in the cluster. The installer can be found at:
If one is running the job on a server on which the FSharp Runtime has not been deployed there is another option. One can copy the appropriate runtime file to the temp execution directory. To achieve this one needs to add the following file option to the execution command:
-file C:\Program Files (x86)\Reference Assemblies\Microsoft\FSharp\2.0\Runtime\v4.0\FSharp.Core.dll
In terms of reusability of the code one could create a Project Template for the base code. Another options would be to Activator.CreateInstance() to create a Type specified in a configuration file. In this case the Mapper and Reducer would have to inherit from a common Interface.
To conclude, don’t forget to check out Channel9 for more information:
I will endeavour to cover other topics, such as Binary Streaming, in future posts.
Hope this is useful!