Hadoop XML Streaming and F# MapReduce

So, to round out the Hadoop Streaming samples I thought I would put together an XML Streaming sample. As always the code can be found here:

https://code.msdn.microsoft.com/Hadoop-Streaming-and-F-f2e76850

XML Streaming Reader

So how does one stream in XML? If you read the Hadoop Streaming documentation you will notice the following FAQ:

You can use the record reader StreamXmlRecordReader to process XML documents.

 hadoop jar hadoop-streaming.jar -inputreader "StreamXmlRecord,begin=BEGIN_STRING,end=END_STRING" ..... (rest of the command)

Anything found between BEGIN_STRING and END_STRING would be treated as one record for map tasks.

This should allow one to define a start and end tag using commands such as:

-inputreader "StreamXmlRecord,begin=<Row>,end=</Row>"

However, if one tries to run this from the command console one gets an error due to the “<” character; as this is used to redirect standard input. It seems that using the usual caret escape character ^ is not enough.

In this example, to parse the XML one could just specify the XML Element name, “Row” in this case, and the start and end tags could easily be derived. As such to assist in XML processing I have provided an XML reader to do exactly this. I was going to modify the base Hadoop stream reader but it seems that there is traction behind the usage of a Mahout library, called “org.apache.mahout.classifier.bayes.XmlInputFormat”.

I have changed this base library to allow a configuration value of “"xmlinput.element" that allows one to just define the XML Element to be processed. This allows one to support an XML Streaming job used with the following configuration:

"-D xmlinput.element=Store" -inputformat org.apache.mahout.classifier.bayes.XmlElementStreamingInputFormat

As a note one has to remember to place the job configuration setting in quotes.

The changes to support this are quite minimal. Firstly the Input Format is defined as:

public class XmlElementStreamingInputFormat

    extends FileInputFormat <NullWritable, Text> {

  

    public static final String XML_ELEMENT_KEY = "xmlinput.element";

   

    @Override

    public RecordReader<NullWritable, Text> getRecordReader(

        InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {

        

        return new XmlElementStreamingRecordReader((FileSplit) inputSplit, jobConf);

    }

}

Secondly the Record Reader is modified to define the start and end tags based on the new configuration value:

String elementTagName = jobConf.get(XML_ELEMENT_KEY);

            

startTag = ("<" + elementTagName + ">").getBytes("UTF8");

endTag = ("</" + elementTagName + ">").getBytes("UTF8");

In using this code one has the restriction that the element tag being searched for is well-formed and has no corresponding attributes. If this is not the case one can easily create a new XML Input Format class specific to your needs.

The complete original source can be found in various places; including github. My modifications are in the source code download.

With the new XML Input Format class in place we are good to go with our F# code.

Map and Reduce Classes

As always, lets start with Map and Reduce functions. The goal of the sample code is to support a Map and Reduce with the following prototypes:

 Map : XElement –> seq<(string * string) * obj>
 Reduce : string * string -> seq<string> -> obj option

The idea is the Mapper takes in an XElement and projects this into a sequence of keys and values. The Reducer, as in the text streaming case, takes in a key and value pair and returns an optional reduced value.

The main difference that you will notice in this sample is that the key is a tuple, which in turn is mapped into multiple map keys. This is supported in Streaming application through the configuration value “stream.num.map.output.key.fields”. The Map in the previous samples returns a single string key value.

For the sample I was going to process a series of XML nodes with the following structure (created by running a TSQL Select from the Adventure Works Store table):

<Store>

  <BusinessEntityID>294</BusinessEntityID>

  <Name>Professional Sales and Service</Name>

  <SalesPersonID>276</SalesPersonID>

  <Demographics>

    <StoreSurvey xmlns="https://schemas.microsoft.com/sqlserver/2004/07/adventure-works/StoreSurvey">

      <AnnualSales>800000</AnnualSales>

      <AnnualRevenue>80000</AnnualRevenue>

      <BankName>International Bank</BankName>

      <BusinessType>BM</BusinessType>

      <YearOpened>1991</YearOpened>

      <Specialty>Touring</Specialty>

      <SquareFeet>18000</SquareFeet>

      <Brands>4+</Brands>

      <Internet>T1</Internet>

      <NumberEmployees>14</NumberEmployees>

    </StoreSurvey>

  </Demographics>

  <ModifiedDate>2008-10-13T11:15:07.497</ModifiedDate>

</Store>

The Map code is going to extract the Sales Amount for each Business Type and Bank:

let Map (element:XElement) =

    let aw = "https://schemas.microsoft.com/sqlserver/2004/07/adventure-works/StoreSurvey"

    let demographics = element.Element(XName.Get("Demographics")).Element(XName.Get("StoreSurvey", aw))

    if not(demographics = null) then

        let business = demographics.Element(XName.Get("BusinessType", aw)).Value

        let bank = demographics.Element(XName.Get("BankName", aw)).Value

        let key = (business, bank)

        let sales = Decimal.Parse(demographics.Element(XName.Get("AnnualSales", aw)).Value)

        Seq.singleton (key, sales)

    else

        Seq.empty

The Reduce then sums the Sales across each of the Business Types and Banks:

let Reduce (key:(string*string)) (values:seq<string>) =

    let totalRevenue =

        values |>

        Seq.fold (fun revenue value -> revenue + Int32.Parse(value)) 0

    Some(totalRevenue)

The rationale for the Map returning a sequence rather than a single value as the Map calling code allows for Map function to return multiples values per XML node.

All simple processing. So how are the Map and Reduce functions called?

Mapper and Reducer Executable

The complexity in the Mapper executable, which calls the Map function, comes in processing the XML input stream. Whereas in text streaming one gets a line per record to be processed, in the case of XML this is not the case. For XML Streaming one will get a stream of XML which will more than likely be presented over multiple lines. As such, the Mapper executable will need to parse the input, in my case line by line, and extract the required nodes for the construction of the XElement type. The requirement here then is not that different from the Java XML Input Format class.

In the sample code the construction of the XElement sequence is achieved through the XMLElements sequence definition. The full code listing is as follows:

module Controller =

    let Run (args:string array) =    

    

        // Define what arguments are expected

        let defs = [

            {ArgInfo.Command="input"; Description="Input XML File"; Required=false };

            {ArgInfo.Command="output"; Description="Output File"; Required=false } ]

        // Parse Arguments into a Dictionary

        let parsedArgs = Arguments.ParseArgs args defs        

        

        // Ensure Standard Input/Output and allow for debug configuration

        use reader =

            if parsedArgs.ContainsKey("input") then

                new StreamReader(Path.GetFullPath(parsedArgs.["input"]))

            else

                new StreamReader(Console.OpenStandardInput())

        use writer =

            if parsedArgs.ContainsKey("output") then

                new StreamWriter(Path.GetFullPath(parsedArgs.["output"]))

            else

                new StreamWriter(Console.OpenStandardOutput(), AutoFlush = true)

        // Combine the name/value output into a string

        let outputCollector ((outputKey1, outputKey2) , outputValue) =

            let output = sprintf "%s\t%s\t%O" outputKey1 outputKey2 outputValue

            writer.WriteLine(output)

        // Write an counter entry

        let counterReporter docType =

            stderr.WriteLine (sprintf "reporter:counter:Elements Processed,%s,1" docType)

        let nodename = StoreXmlElementMapper.MapNode

        let nodestart = "<" + nodename + ">"

        let nodeend = "</" + nodename + ">"

        // Read the next line of the input stream

        let readLine() =

            reader.ReadLine()      

        // Parse the input stream into a sequence of XElement types

        let elementBuilder = new StringBuilder(1024)

        let rec xmlElements inContent (someContent:string option) = seq {

            let line =

                match someContent with

                | Some(content) -> content

                | None -> readLine()

            if not (box line = null) then

                if (inContent) then

                    // Try to find the end element and yield accordingly

                    let offset = line.IndexOf(nodeend, 0, StringComparison.InvariantCultureIgnoreCase)

                    if (offset > -1) then

                        // Found the endnode so append and start new XElement

                        let content = line.Substring(0, offset + nodeend.Length)

                        elementBuilder.Append(content) |> ignore

                        let nextContent =                             

                            if (offset + nodeend.Length = line.Length) then

                                None

                            else

                                Some(line.Substring(offset + nodeend.Length))

                        yield XElement.Parse(elementBuilder.ToString())

                        elementBuilder.Clear() |> ignore

                        yield! xmlElements false nextContent

                    else

                        // Just a content line so append

                        elementBuilder.AppendLine(line) |> ignore

                        yield! xmlElements true None

                else

                    // Find the start node element and start building

                    let offset = line.IndexOf(nodestart, 0, StringComparison.InvariantCultureIgnoreCase)

                    if (offset > -1) then

                        yield! xmlElements true (Some(line.Substring(offset)))

                    else

                        yield! xmlElements false None            

        }        

        

        // Process the XElement sequence and report on successes

        let elementProcessed value =

            outputCollector value

            counterReporter "Successfully Processed"

        try

            xmlElements false None

            |> Seq.map StoreXmlElementMapper.Map

            |> Seq.iter (Seq.iter elementProcessed)

        with

        | :? System.Xml.XmlException ->

            // Ignore invalid xml elements but report on count

            counterReporter "Invalid Elements"

        // Close the streams

        reader.Close()

        writer.Close()

The premise of the creation of the sequence of XElement types is that a StringBuilder is used to build up the text used to create the XElement. The opening tag is located after which any content is appended to the StringBuilder. Once the end tag is located and appended to the StringBuilder, its contents are yielded as an XElement, and the process repeated. One has to remember that after finding the end tag the remainder of the line needs to be fed into the locate for the next opening tag.

As this processing is dependant on knowing the element name to process I decided to have the module containing the Map function return the node name. Other methods could be used but this seemed to be the cleanest as the Mapper needs this understanding to process the XML.

In addition to handling the XML processing this sample code also demonstrates writing out the composite key from the tuple and generating counters showing the number of elements processed; both success and failures.

With the XML previously mentioned the output from the Mapper executable will be:

BM Guardian Bank 1100000

BM International Bank 3000000

BM International Security 2700000

BM Primary Bank & Reserve 2200000

BM Primary International 1100000

BM Reserve Security 1100000

BM United Security 2900000

BS International Security 1500000

BS Primary Bank & Reserve 1500000

BS Reserve Security 800000

OS Guardian Bank 6000000

OS International Bank 4500000

OS International Security 4500000

OS Primary Bank & Reserve 10500000

OS Primary International 6000000

OS Reserve Security 6000000

OS United Security 4500000

So onto the Reducer. The Reducer, as in the previous samples, just has to read in this text stream and call the Reduce function. The full code listing is as follows:

module Controller =

    let Run (args:string array) =

        // Define what arguments are expected

        let defs = [

            {ArgInfo.Command="input"; Description="Input File"; Required=false };

            {ArgInfo.Command="output"; Description="Output File"; Required=false } ]

        // Parse Arguments into a Dictionary

        let parsedArgs = Arguments.ParseArgs args defs

        // Ensure Standard Input/Output and allow for debug configuration

        use reader =

            if parsedArgs.ContainsKey("input") then

                new StreamReader(Path.GetFullPath(parsedArgs.["input"]))

            else

                new StreamReader(Console.OpenStandardInput())

        use writer =

            if parsedArgs.ContainsKey("output") then

                new StreamWriter(Path.GetFullPath(parsedArgs.["output"]))

            else

                new StreamWriter(Console.OpenStandardOutput(), AutoFlush = true)

        

        // Combine the name/value output into a string

        let outputCollector (outputKey1, outputKey2) outputValue =            

            let output = sprintf "%s\t%s\t%O" outputKey1 outputKey2 outputValue

            writer.WriteLine(output)

        // Read the next line of the input stream

        let readLine() =

            reader.ReadLine()

        // Parse the input into the required key/value pair

        let parseLine (input:string) =

            let keyValue = input.Split('\t')

            ((keyValue.[0].Trim(), keyValue.[1].Trim()), keyValue.[2].Trim())

        // Converts a input line into an option

        let getInput() =

            let input = readLine()

            if not(String.IsNullOrWhiteSpace(input)) then

                Some(parseLine input)

            else

                None

        // Creates a sequence of the input based on the provided key

        let lastInput = ref None

        let continueDo = ref false

        let inputsByKey key firstValue = seq {

            // Yield any value from previous read

            yield firstValue

            continueDo := true

            while !continueDo do                

                match getInput() with

                | Some(input) when (fst input) = key ->

                    // Yield found value and remainder of sequence

                    yield (snd input)                    

                | Some(input) ->

                    // Have a value but different key

                    lastInput := Some(input)

                    continueDo := false

                | None ->

                    // Have no more entries

                    lastInput := None

                    continueDo := false

        }

        // Controls the calling of the reducer

        let rec processInput (input:((string * string) * string) option) =

            if input.IsSome then

                let (key, value) = input.Value

                let reduced = StoreXmlElementReducer.Reduce key (inputsByKey key value)

                if reduced.IsSome then

                    outputCollector key reduced.Value

                processInput lastInput.contents

        processInput (getInput())

The main difference in this code is the support for a tuple as a key. As in previous samples the Reducer has to create a sequence of sequences, as lazy evaluated groupBy, of the input data based on the key values. In this instance to find a match one needs to ensure both key elements are the same. This is achieved in F# by the fact tuple types support structural equality (check out Equality and Comparison Constraints in F# by Don Syme for more details).

Of course one could generalize this code to support N key elements; but maybe more on this at another time.

Finally onto running the job. The command to run this MapReduce job is as follows:

hadoop.cmd jar lib/hadoop-streaming-ms.jar

"-D xmlinput.element=Store" "-D stream.num.map.output.key.fields=2"

-input "/stores/demographics" -output "/stores/banking/release"

-mapper "..\..\jars\FSharp.Hadoop.MapperXml.exe"

-combiner "..\..\jars\FSharp.Hadoop.ReducerXml.exe"

-reducer "..\..\jars\FSharp.Hadoop.ReducerXml.exe"

-file "C:\bin\Release\FSharp.Hadoop.MapperXml.exe"

-file "C:\bin\Release\FSharp.Hadoop.ReducerXml.exe"

-file "C:\bin\Release\FSharp.Hadoop.MapReduce.dll"

-file "C:\bin\Release\FSharp.Hadoop.Utilities.dll"

-inputformat org.apache.mahout.classifier.bayes.XmlElementStreamingInputFormat

The configuration values one has to set are the number of output key fields, “stream.num.map.output.key” with a value of 2, and the XML element name to be processed, “xmlinput.element” with a value of “Store”.

As in the Binary Document Streaming sample, I created a new Streaming JAR called “hadoop-streaming-ms.jar” which contains the new Java classes on top of the base Hadoop streaming classes. The downloadable code has the full listing of the Java classes along with a command file to compile the source code.

Tester Application

As always I have put together a tester application for calling the Mapper and Reducer executable:

module MapReduceConsole =

        

    let Run args =

        // Define what arguments are expected

        let defs = [            

            {ArgInfo.Command="input"; Description="Input File"; Required=true };

            {ArgInfo.Command="output"; Description="Output File"; Required=true };

            {ArgInfo.Command="tempPath"; Description="Temp File Path"; Required=true };

            {ArgInfo.Command="mapper"; Description="Mapper EXE"; Required=true };

            {ArgInfo.Command="reducer"; Description="Reducer EXE"; Required=true };

            {ArgInfo.Command="nodename"; Description="XML Node"; Required=true }; ]

        // Parse Arguments into a Dictionary

        let parsedArgs = Arguments.ParseArgs args defs

        Arguments.DisplayArgs parsedArgs

        // define the executables

        let mapperExe = Path.GetFullPath(parsedArgs.["mapper"])

        let reducerExe = Path.GetFullPath(parsedArgs.["reducer"])

        let nodename = parsedArgs.["nodename"]

        let nodestart = "<" + nodename + ">"

        let nodeend = "</" + nodename + ">"

        Console.WriteLine()

        Console.WriteLine (sprintf "The Mapper file is:\t%O" mapperExe)

        Console.WriteLine (sprintf "The Reducer file is:\t%O" reducerExe)

        Console.WriteLine (sprintf "Processing Nodename:\t%O" nodename)

        // Get the file names

        let inputfile = Path.GetFullPath(parsedArgs.["input"])

        let outputfile = Path.GetFullPath(parsedArgs.["output"])

        let tempPath = Path.GetFullPath(parsedArgs.["tempPath"])

        let tempFile = Path.Combine(tempPath, Path.GetFileName(outputfile))

        let mappedfile = Path.ChangeExtension(tempFile, "mapped")

        let reducefile = Path.ChangeExtension(tempFile, "reduced")

        Console.WriteLine()

        Console.WriteLine (sprintf "The input file is:\t\t%O" inputfile)

        Console.WriteLine (sprintf "The mapped temp file is:\t%O" mappedfile)

        Console.WriteLine (sprintf "The reduced temp file is:\t%O" reducefile)

        Console.WriteLine (sprintf "The output file is:\t\t%O" outputfile)

        // Give the user an option to continue

        Console.WriteLine()

        Console.WriteLine("Hit ENTER to continue...")

        Console.ReadLine() |> ignore       

        // Parse the input stream into a sequence of XElement types

        let mapperProcess() =

            use mapper = new Process()

            mapper.StartInfo.FileName <- mapperExe

            mapper.StartInfo.UseShellExecute <- false

            mapper.StartInfo.RedirectStandardInput <- true

            mapper.StartInfo.RedirectStandardOutput <- true

            mapper.Start() |> ignore

            use mapperInput = mapper.StandardInput

            use mapperOutput = mapper.StandardOutput

        

            // Map the reader to a background thread so processing can happen in parallel

            Console.WriteLine "Mapper Processing Starting..."   

            let taskMapperFunc() =

                use mapperWriter = File.CreateText(mappedfile)

                while not mapperOutput.EndOfStream do

                    mapperWriter.WriteLine(mapperOutput.ReadLine())

            let taskMapperWriting = Task.Factory.StartNew(Action(taskMapperFunc))

            // Pass the file into the mapper process and close input stream when done

            use mapperReader = new StreamReader(File.OpenRead(inputfile))

            let elementBuilder = new StringBuilder(1024)

            let rec xmlElements inContent (someContent:string option) = seq {

                let line =

                    match someContent with

                    | Some(content) -> content

                    | None -> mapperReader.ReadLine()

                if not (box line = null) then

                    if (inContent) then

                        // Try to find the end element and yield accordingly

                        let offset = line.IndexOf(nodeend, 0, StringComparison.InvariantCultureIgnoreCase)

                        if (offset > -1) then

                            // Found the endnode so always add a new line

                            let content = line.Substring(0, offset + nodeend.Length)

                            elementBuilder.AppendLine(content) |> ignore

                            let nextContent =

                                if (offset + nodeend.Length = line.Length) then

                                    None

                                else

                                    Some(line.Substring(offset + nodeend.Length))

                            yield elementBuilder.ToString()

                            elementBuilder.Clear() |> ignore

                            yield! xmlElements false nextContent

                        else

                            // Just a content line so append

                            elementBuilder.AppendLine(line) |> ignore

                            yield! xmlElements true None

                    else

                        // Find the start node element and start building

                        let offset = line.IndexOf(nodestart, 0, StringComparison.InvariantCultureIgnoreCase)

                        if (offset > -1) then

                            yield! xmlElements true (Some(line.Substring(offset)))

                        else

                            yield! xmlElements false None

            }

            xmlElements false None

            |> Seq.iter mapperInput.WriteLine

            mapperInput.Close()

            taskMapperWriting.Wait()

            mapperOutput.Close()

            mapper.WaitForExit()

            let result = match mapper.ExitCode with | 0 -> true | _ -> false

            mapper.Close()

            result

        // Sort the mapped file by the first field - mimic the role of Hadoop

        let hadoopProcess() =

            Console.WriteLine "Hadoop Processing Starting..."

            let unsortedValues = seq {

                use reader = new StreamReader(File.OpenRead(mappedfile))

                while not reader.EndOfStream do

                    let input = reader.ReadLine()

                    let keyValue = input.Split('\t')

                    yield (keyValue.[0].Trim(), keyValue.[1].Trim(), keyValue.[2].Trim())

                reader.Close()

                }

            use writer = File.CreateText(reducefile)

            unsortedValues

            |> Seq.sortBy (fun (key1, key2, value) -> (key1, key2))

            |> Seq.iter (fun (key1, key2, value) -> writer.WriteLine (sprintf "%O\t%O\t%O" key1 key2 value))

            writer.Close()

        

        // Finally call the reducer process

        let reducerProcess() =

            use reducer = new Process()

            reducer.StartInfo.FileName <- reducerExe

            reducer.StartInfo.UseShellExecute <- false

            reducer.StartInfo.RedirectStandardInput <- true

            reducer.StartInfo.RedirectStandardOutput <- true

            reducer.Start() |> ignore

            use reducerInput = reducer.StandardInput

            use reducerOutput = reducer.StandardOutput

        

            // Map the reader to a background thread so processing can happen in parallel

            Console.WriteLine "Reducer Processing Starting..."

            let taskReducerFunc() =

                use reducerWriter = File.CreateText(outputfile)

                while not reducerOutput.EndOfStream do

                    reducerWriter.WriteLine(reducerOutput.ReadLine())

            let taskReducerWriting = Task.Factory.StartNew(Action(taskReducerFunc))

            // Pass the file into the mapper process and close input stream when done

            use reducerReader = new StreamReader(File.OpenRead(reducefile))

            while not reducerReader.EndOfStream do

                reducerInput.WriteLine(reducerReader.ReadLine())

            reducerInput.Close()

            taskReducerWriting.Wait()

            reducerOutput.Close()

            reducer.WaitForExit()

            let result = match reducer.ExitCode with | 0 -> true | _ -> false

            reducer.Close()

            result

        // Finish test

        if mapperProcess() then

            Console.WriteLine "Mapper Processing Complete..."  

            hadoopProcess()

            Console.WriteLine "Hadoop Processing Complete..."

            if reducerProcess() then

                Console.WriteLine "Reducer Processing Complete..."

                Console.WriteLine "Processing Complete..."     

                   

        Console.ReadLine() |> ignore

Again, the challenge in this code is the processing of the XML. This sample code processes a single XML input file, performing the parsing of the XML into the required elements, and then streaming this into the Mapper executable. If you review the code you will see a string similarity between the Mapper executable XElement sequence generation and how the XML is parsed and streamed into the Mapper.

Finally, a required argument into the tester application is the “nodename” value. This is analogous to the job configuration parameter when running the job within a Hadoop cluster.

Conclusion

So this wraps up Streaming examples for a while. I have enjoyed putting it all together, but I hope it is some useful code.