Hadoop Binary Streaming and F# MapReduce

As mentioned in my previous post Hadoop Streaming not only supports text streaming, but it also supports Binary Streaming. As such I wanted to put together a sample that supports processing Office documents; more on support for PDF in a later post. As always the code can be downloaded from:

https://code.msdn.microsoft.com/Hadoop-Streaming-and-F-f2e76850

Putting together this sample involved a bit more work than the text streaming case as one needed to put together an implementation of the Java classes to support binary streaming; namely FileInputFormat and RecordReader. The purpose of these implementations is to support Binary Streaming of the document such that it is not split on the usual line boundaries. More on the Java code later.

The implemented Java classes are written, with a key value type pairing of <Text, BytesWritable>, which writes out the data for the mapper in the following format:

  • Filename
  • Tab character
  • UTF8 Encoded Document
  • Linefeed character

The Mapper code will get called with this format for each document in the specified input directories.

Map and Reduce Classes

The goal of the sample code is to support a Map and Reduce with the following prototypes:

 Map : WordprocessingDocument –> seq<string * obj>
 Reduce : string -> seq<string> -> obj option

The idea is the Mapper takes in a Word document and projects this into a sequence of keys and values. The Reducer, as in the text streaming case, takes in a key value pair and returns an optional reduced value.

The use of the obj type is, as in the text streaming case, to support serializing the output values.

As an example, I have put together a MapReduce to process Word documents, where the word document is mapped into the number of pages per author and where multiple authors are credited with the same number of pages.

The sample Mapper code is as follows:

module OfficeWordPageMapper =

    let dc = XNamespace.Get("https://purl.org/dc/elements/1.1/")
    let cp = XNamespace.Get("https://schemas.openxmlformats.org/package/2006/metadata/core-properties")
    let unknownAuthor = "unknown author"

    let getAuthors (document:WordprocessingDocument) =
        let coreFilePropertiesXDoc = XElement.Load(document.CoreFilePropertiesPart.GetStream());
          
        // Take the first dc:creator element and split based on a ";"
        let creators = coreFilePropertiesXDoc.Elements(dc + "creator")
        if Seq.isEmpty creators then
            [| unknownAuthor |]
        else
            let creator = (Seq.head creators).Value
            if String.IsNullOrWhiteSpace(creator) then
                [| unknownAuthor |]
            else
                creator.Split(';')

    let getPages (document:WordprocessingDocument) =
        // return page count
        Int32.Parse(document.ExtendedFilePropertiesPart.Properties.Pages.Text)

    // Map the data from input name/value to output name/value
    let Map (document:WordprocessingDocument) =
        let pages = getPages document
        (getAuthors document)
        |> Seq.map (fun author -> (author, pages))

As you can see the majority code is needed to pull out the document properties. If one wanted to process the words within the document one would use the following code:

 document.MainDocumentPart.Document.Body.InnerText

To run this code it is worth noting there is a dependency on installing the Open XML SDK 2.0 for Microsoft Office.

Finally, the Reducer code is as follows:

module OfficePageReducer =

    let Reduce (key:string) (values:seq<string>) =
        let totalPages =
            values |>
            Seq.fold (fun pages value -> pages + Int32.Parse(value)) 0

        Some(totalPages)

Again, as in the text streaming application, both the mapper and the reducer are executables that read the input from StdIn and emit the output to StdOut. In the case of the mapper the console application will need to do multiple Console.ReadByte() calls to get the data, and then perform a Console.WriteLine() to emit the output. The reducer will do a Console.ReadLine() to get the data, and perform a Console.WriteLine() to emit the output.

The previous post covers the schematics of creating console applications for F#; so I will not cover this again but assume the same program structure.

Mapper Executable

As previously mentioned the purpose of the Mapper code is to perform Input Format Parsing, Projection, and Filtering. In the Mapper for a Word document the bytes are used to create a WordprocessingDocument, with invalid documents ignored, these are then projected into a key/value sequence using the Map function:

module Controller =

    let Run (args:string array) =    
    
        // Define what arguments are expected
        let defs = [
            {ArgInfo.Command="input"; Description="Input File"; Required=false };
            {ArgInfo.Command="output"; Description="Output File"; Required=false } ]

        // Parse Arguments into a Dictionary
        let parsedArgs = Arguments.ParseArgs args defs        
        
        // Ensure Standard Input/Output and allow for debug configuration
        let builder = new StringBuilder()
        let encoder = Text.UTF8Encoding()

        use reader =
            if parsedArgs.ContainsKey("input") then
                builder.Append(Path.GetFileName(parsedArgs.["input"])) |> ignore
                File.Open(Path.GetFullPath(parsedArgs.["input"]), FileMode.Open) :> Stream
            else
                let stream = new MemoryStream()

                // Ignore bytes until one hits a tab
                let rec readTab() =
                    let inByte = Console.OpenStandardInput().ReadByte()
                    if inByte <> 0x09 then                        
                        builder.Append(encoder.GetString([| (byte)inByte |])) |> ignore
                        readTab()
                readTab()

                // Copy the rest of the stream and truncate the last linefeed char
                Console.OpenStandardInput().CopyTo(stream)
                if (stream.Length > 0L) then
                    stream.SetLength(stream.Length - 1L)
                stream.Position <- 0L

                stream :> Stream

        use writer =
            if parsedArgs.ContainsKey("output") then
                new StreamWriter(Path.GetFullPath(parsedArgs.["output"]))
            else
                new StreamWriter(Console.OpenStandardOutput(), AutoFlush = true)
        
        let filename = builder.ToString()

        // Combine the name/value output into a string
        let outputCollector (outputKey, outputValue) =
            let output = sprintf "%s\t%O" outputKey outputValue
            writer.WriteLine(output)
        
        // Check we do not have a null document
        if (reader.Length > 0L) then
            try
                // Get access to the word processing document from the input stream
                use document = WordprocessingDocument.Open(reader, false)

                // Process the word document with the mapper
                OfficeWordPageMapper.Map document
                |> Seq.iter (fun value -> outputCollector value)
        
                // close document
                document.Close()
            with
            | :? System.IO.FileFormatException ->
                // Ignore invalid files formats
                ()

        // Close the streams
        reader.Close()
        writer.Close()

There are a few things of note in the code.

When pulling out the filename of the document that is being processed, it is assumed that UTF8 encoding has been used and that a Tab character is used as a delimiter between the filename and the documents bytes.

In building the Stream that is to be used for creating the WordprocessingDocument a MemoryStream is used. There are several reasons for this. Firstly one needs to remove the last Newline character from the Stream, and secondly when creating a WordprocessingDocument a Stream is needed that supports Seek operations; unfortunately this is not the case for StdIn.

At the moment the code does not use the Filename. However in future posts I will extend the code to also support processing PDF documents.

Reducer Executable

After running the Mapper, the data being parsed into the Reducer will be a key/value pair delimited with a Tab. Using the above Map, a sample input dataset for the Reducer would be:

Brad Sarsfield 44
Carl Nolan 1
Marie West 1
Carl Nolan 9

Thus in this case, the code for the Reducer will be the same as in the text streaming case:

module Controller =

    let Run (args:string array) =

        // Define what arguments are expected
        let defs = [
            {ArgInfo.Command="input"; Description="Input File"; Required=false };
            {ArgInfo.Command="output"; Description="Output File"; Required=false } ]

        // Parse Arguments into a Dictionary
        let parsedArgs = Arguments.ParseArgs args defs

        // Ensure Standard Input/Output and allow for debug configuration
        use reader =
            if parsedArgs.ContainsKey("input") then
                new StreamReader(Path.GetFullPath(parsedArgs.["input"]))
            else
                new StreamReader(Console.OpenStandardInput())

        use writer =
            if parsedArgs.ContainsKey("output") then
                new StreamWriter(Path.GetFullPath(parsedArgs.["output"]))
            else
                new StreamWriter(Console.OpenStandardOutput(), AutoFlush = true)
        
        // Combine the name/value output into a string
        let outputCollector outputKey outputValue =            
            let output = sprintf "%s\t%O" outputKey outputValue
            writer.WriteLine(output)

        // Read the next line of the input stream
        let readLine() =
            reader.ReadLine()

        // Parse the input into the required name/value pair
        let parseLine (input:string) =
            let keyValue = input.Split('\t')
            (keyValue.[0].Trim(), keyValue.[1].Trim())

        // Converts a input line into an option
        let getInput() =
            let input = readLine()
            if not(String.IsNullOrWhiteSpace(input)) then
                Some(parseLine input)
            else
                None

        // Creates a sequence of the input based on the provided key
        let lastInput = ref None
        let continueDo = ref false
        let inputsByKey key firstValue = seq {
            // Yield any value from previous read
            yield firstValue

            continueDo := true
            while !continueDo do
                match getInput() with
                | Some(input) when (fst input) = key ->
                    // Yield found value and remainder of sequence
                    yield (snd input)                    
                | Some(input) ->
                    // Have a value but different key
                    lastInput := Some(fst input, snd input)
                    continueDo := false
                | None ->
                    // Have no more entries
                    lastInput := None
                    continueDo := false
        }

        // Controls the calling of the reducer
        let rec processInput (input:(string*string) option) =
            if input.IsSome then
                let key = fst input.Value
                let value = OfficePageReducer.Reduce key (inputsByKey key (snd input.Value))
                if value.IsSome then
                    outputCollector key value.Value
                if lastInput.contents.IsSome then
                    processInput lastInput.contents

        processInput (getInput())

Once run, the reduced output would be:

Brad Sarsfield 44
Carl Nolan 10
Marie West 1

Once again the only complexity in the code is the fact the Seq.groupBy function cannot be used. Also, as in the text streaming case there is a fair amount of code controlling the input and output streams for calling the Map and Reduce functions, that can be reused for all Hadoop Binary Streaming jobs.

Running the Executables

In the case of Binary Streaming the command parameters are a little different to the text streaming case:

C:\Apps\dist\hadoop.cmd jar lib/hadoop-streaming-ms.jar
-input "/office/documents"
-output "/office/authors"
-mapper "..\..\jars\FSharp.Hadoop.MapperOffice.exe"
-combiner "..\..\jars\FSharp.Hadoop.ReducerOffice.exe"
-reducer "..\..\jars\FSharp.Hadoop.ReducerOffice.exe"
-file "C:\bin\Release\FSharp.Hadoop.MapperOffice.exe"
-file "C:\bin\Release\FSharp.Hadoop.ReducerOffice.exe"
-file "C:\bin\Release\FSharp.Hadoop.MapReduce.dll"
-file "C:\bin\Release\FSharp.Hadoop.Utilities.dll"
-inputformat com.microsoft.hadoop.mapreduce.lib.input.BinaryDocumentWithNameInputFormat

The first difference is the use of the hadoop-streaming-ms.jar. This file contains the InputFormat class specified by the inputformat parameter; the later Java Classes section discusses how this is created. This is needed to support Binary Streaming.

One other difference to my previous text streaming case is the use of the Reducer class as a Combiner. As the output types from the mapper are the same as the reducer then this is possible.

Tester Application

For completeness I have included the base code for the tester application; albeit the full code is included in the download. The code is very similar to the tester application mentioned in my previous post, with the exception of how the mapper executable is called:

module MapReduceConsole =
        
    let Run args =

        // Define what arguments are expected
        let defs = [            
            {ArgInfo.Command="input"; Description="Input File"; Required=true };
            {ArgInfo.Command="output"; Description="Output File"; Required=true };
            {ArgInfo.Command="tempPath"; Description="Temp File Path"; Required=true };
            {ArgInfo.Command="mapper"; Description="Mapper EXE"; Required=true };
            {ArgInfo.Command="reducer"; Description="Reducer EXE"; Required=true }; ]

        // Parse Arguments into a Dictionary
        let parsedArgs = Arguments.ParseArgs args defs
        Arguments.DisplayArgs parsedArgs

        // define the executables
        let mapperExe = Path.GetFullPath(parsedArgs.["mapper"])
        let reducerExe = Path.GetFullPath(parsedArgs.["reducer"])

        Console.WriteLine()
        Console.WriteLine (sprintf "The Mapper file is:\t%O" mapperExe)
        Console.WriteLine (sprintf "The Reducer file is:\t%O" reducerExe)

        // Get the file names
        let inputpath = Path.GetFullPath(parsedArgs.["input"])
        let outputfile = Path.GetFullPath(parsedArgs.["output"])

        let tempPath = Path.GetFullPath(parsedArgs.["tempPath"])
        let tempFile = Path.Combine(tempPath, Path.GetFileName(outputfile))

        let mappedfile = Path.ChangeExtension(tempFile, "mapped")
        let reducefile = Path.ChangeExtension(tempFile, "reduced")

        Console.WriteLine()
        Console.WriteLine (sprintf "The input path is:\t\t%O" inputpath)
        Console.WriteLine (sprintf "The mapped temp file is:\t%O" mappedfile)
        Console.WriteLine (sprintf "The reduced temp file is:\t%O" reducefile)
        Console.WriteLine (sprintf "The output file is:\t\t%O" outputfile)

        // Give the user an option to continue
        Console.WriteLine()
        Console.WriteLine("Hit ENTER to continue...")
        Console.ReadLine() |> ignore

        let CHUNK = 1024
        let buffer:byte array = Array.zeroCreate CHUNK

        // Call the mapper with the input file
        let mapperProcessLoop inputfile =
            use mapper = new Process()
            mapper.StartInfo.FileName <- mapperExe
            mapper.StartInfo.UseShellExecute <- false
            mapper.StartInfo.RedirectStandardInput <- true
            mapper.StartInfo.RedirectStandardOutput <- true
            mapper.Start() |> ignore

            use mapperInput = mapper.StandardInput.BaseStream
            use mapperOutput = mapper.StandardOutput
        
            // Map the reader to a background thread so processing can happen in parallel
            let taskMapperFunc() =
                use mapperWriter = File.AppendText(mappedfile)
                while not mapperOutput.EndOfStream do
                    mapperWriter.WriteLine(mapperOutput.ReadLine())
            let taskMapperWriting = Task.Factory.StartNew(Action(taskMapperFunc))

            // Pass the file into the mapper process and close input stream when done          
            use mapperReader = new BinaryReader(File.OpenRead(inputfile))
            let rec readLoop() =
                let bytesread = mapperReader.Read(buffer, 0, CHUNK)
                if bytesread > 0 then
                    mapperInput.Write(buffer, 0, bytesread)
                    readLoop()

            // Write out a Filename/Tab/Document/LineFeed
            let filename = Path.GetFileName(inputfile)
            let encoding = Text.UTF8Encoding();
            mapperInput.Write(encoding.GetBytes(filename), 0, encoding.GetByteCount(filename))
            mapperInput.Write([| 0x09uy |], 0, 1)
            readLoop()
            mapperInput.Write([| 0x0Auy |], 0, 1)

            mapperInput.Close()
            taskMapperWriting.Wait()
            mapperOutput.Close()

            mapper.WaitForExit()
            let result = match mapper.ExitCode with | 0 -> true | _ -> false

            mapper.Close()
            result
      
        let mapperProcess() =
            Console.WriteLine "Mapper Processing Starting..."  

            // Create the output file
            if File.Exists(mappedfile) then File.Delete(mappedfile)
            use mapperWriter = File.CreateText(mappedfile)
            mapperWriter.Close()

            // function to determine if valid document extension
            let isValidFile inputfile =
                if String.Equals(Path.GetExtension(inputfile), ".docx", StringComparison.InvariantCultureIgnoreCase) ||
                   String.Equals(Path.GetExtension(inputfile), ".pdf", StringComparison.InvariantCultureIgnoreCase) then
                    true
                else
                    false

            // Process the files in the directory
            Directory.GetFiles(inputpath)
            |> Array.filter isValidFile
            |> Array.fold (fun result inputfile -> result && (mapperProcessLoop inputfile)) true

        // Sort the mapped file by the first field - mimic the role of Hadoop
        let hadoopProcess() =
            Console.WriteLine "Hadoop Processing Starting..."

            let unsortedValues = seq {
                use reader = new StreamReader(File.OpenRead(mappedfile))
                while not reader.EndOfStream do
                    let input = reader.ReadLine()
                    let keyValue = input.Split('\t')
                    yield (keyValue.[0].Trim(), keyValue.[1].Trim())
                reader.Close()
                }

            use writer = File.CreateText(reducefile)
            unsortedValues
            |> Seq.sortBy fst
            |> Seq.iter (fun (key, value) -> writer.WriteLine (sprintf "%O\t%O" key value))
            writer.Close()

        
        // Finally call the reducer process
        let reducerProcess() =
            use reducer = new Process()
            reducer.StartInfo.FileName <- reducerExe
            reducer.StartInfo.UseShellExecute <- false
            reducer.StartInfo.RedirectStandardInput <- true
            reducer.StartInfo.RedirectStandardOutput <- true
            reducer.Start() |> ignore

            use reducerInput = reducer.StandardInput
            use reducerOutput = reducer.StandardOutput
        
            // Map the reader to a background thread so processing can happen in parallel
            Console.WriteLine "Reducer Processing Starting..."

            let taskReducerFunc() =
                use reducerWriter = File.CreateText(outputfile)
                while not reducerOutput.EndOfStream do
                    reducerWriter.WriteLine(reducerOutput.ReadLine())
            let taskReducerWriting = Task.Factory.StartNew(Action(taskReducerFunc))

            // Pass the file into the mapper process and close input stream when done
            use reducerReader = new StreamReader(File.OpenRead(reducefile))
            while not reducerReader.EndOfStream do
                reducerInput.WriteLine(reducerReader.ReadLine())

            reducerInput.Close()
            taskReducerWriting.Wait()
            reducerOutput.Close()

            reducer.WaitForExit()
            let result = match reducer.ExitCode with | 0 -> true | _ -> false

            reducer.Close()
            result

        // Finish test
        if mapperProcess() then
            Console.WriteLine "Mapper Processing Complete..."  

            hadoopProcess()
            Console.WriteLine "Hadoop Processing Complete..."

            if reducerProcess() then
                Console.WriteLine "Reducer Processing Complete..."

                Console.WriteLine "Processing Complete..."     
                   
        Console.ReadLine() |> ignore

When calling the mapper it is no longer the case that the mapper executable is called once, with each line being redirected into the executables StdIn. In the binary case the mapper executable is called once for each document, where for each document the data is then redirected into the executables StdIn. As mentioned the format defined is the filename, delimitated with a Tab, followed by the documents bytes, and finally the Newline character:

// Pass the file into the mapper process and close input stream when done          
use mapperReader = new BinaryReader(File.OpenRead(inputfile))
let rec readLoop() =
    let bytesread = mapperReader.Read(buffer, 0, CHUNK)
    if bytesread > 0 then
        mapperInput.Write(buffer, 0, bytesread)
        readLoop()

// Write out a Filename/Tab/Document/LineFeed
let filename = Path.GetFileName(inputfile)
let encoding = Text.UTF8Encoding();
mapperInput.Write(encoding.GetBytes(filename), 0, encoding.GetByteCount(filename))
mapperInput.Write([| 0x09uy |], 0, 1)
readLoop()
mapperInput.Write([| 0x0Auy |], 0, 1)

This code is code for each document found in the directory specified in the input argument.

The previous post discussed the threading and stream processing needed for testing in a little more detail.

Java Classes

To complete the post here is the full listing for the Java class implementations:

package com.microsoft.hadoop.mapreduce.lib.input;

import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.JobContext;

/**
* Reads complete documents in Binary format.
*/
public class BinaryDocumentWithNameInputFormat
    extends FileInputFormat<Text, BytesWritable> {

    public BinaryDocumentWithNameInputFormat() {
        super();
    }
    
    protected boolean isSplitable(FileSystem fs, Path filename) {
        return false;
    }

    @Override
    public RecordReader<Text, BytesWritable> getRecordReader(
            InputSplit split, JobConf job, Reporter reporter) throws IOException {
        
        return new BinaryDocumentWithNameRecordReader((FileSplit) split, job);        
    }
    
    /**
    * BinaryDocumentWithNameRecordReader class to read through a given binary document
    * Outputs the filename along with the complete document
    */    
    public class BinaryDocumentWithNameRecordReader
        implements RecordReader<Text, BytesWritable> {
        
        private FileSplit fileSplit;
        private Configuration conf;
        private boolean processed = false;
        
        public BinaryDocumentWithNameRecordReader(FileSplit fileSplit, Configuration conf)
            throws IOException {
            this.fileSplit = fileSplit;
            this.conf = conf;
        }

        @Override
        public Text createKey() {
            return new Text();
        }

        @Override
        public BytesWritable createValue() {
            return new BytesWritable();
        }
          
        @Override
        public long getPos() throws IOException {
            return this.processed ? this.fileSplit.getLength() : 0;
        }
        
        @Override
        public float getProgress() throws IOException {
            return this.processed ? 1.0f : 0.0f;
        }
        
        @Override
        public boolean next(Text key, BytesWritable value) throws IOException {
            if (!this.processed) {
                byte[] contents = new byte[(int) this.fileSplit.getLength()];
                Path file = this.fileSplit.getPath();
                FileSystem fs = file.getFileSystem(this.conf);
                FSDataInputStream in = null;
                
                try {
                    in = fs.open(file);
                    in.readFully(contents, 0, contents.length);
                    
                    key.set(file.getName());
                    value.set(contents, 0, contents.length);
                }
                finally {
                    in.close();
                }
                
                this.processed = true;
                return true;
            }
            else {
                return false;
            }
        }

        @Override
        public void close() throws IOException {
        }
    }    
}

Once the Java classes have been compiled they need to be merged into a copy of the hadoop-streaming.jar file. In my case I have created a file called hadoop-streaming-ms.jar. This file is a copy of the base file into which I have copied the classes, as the JAR file is just a ZIP file; although one can also use the JAR tool. One just has to remember to use the package path:

com\microsoft\hadoop\mapreduce\lib\input

To use this new streaming package the file needs to be copied to the Hadoop install lib directory; in my case:

C:\Apps\dist\lib

If you download the code there is also an implementation of these classes that support a key value type pairing of <NullWritable, BytesWritable>; rather than the <Text, BytesWritable> key value type pairing that is used to pass in the documents filename. This can used used if the document’s filename is not needed.

Conclusion

Hopefully the code is once again useful if you intend to write any Binary Streaming applications of documents. Although the code only currently demonstrates processing of a Microsoft Word document in a future post I will extend this to support processing PDF documents.

Finally don't forget one can graph the results using the JS Console:

 file = fs.read("/office/authors")
data = parse(file.data, "author, pages:long")
options = {title:"Pages Per Author", orientation:15, x:"author", y:"pages"}
graph.bar(data, options)

Resulting in:

image

Enjoy!