Hadoop Streaming and Reporting

If like me you are a .Net developer and have written some Streaming jobs it is not immediately obvious how one can do any reporting. However if you dig through the Streaming Documentation you will come across this in the FAQs:

  • How do I update counters in streaming applications? A streaming process can use the stderr to emit counter information.reporter:counter:<group>,<counter>,<amount> should be sent to stderr to update the counter.

  • How do I update status in streaming applications? A streaming process can use the stderr to emit status information. To set a status, reporter:status:<message> should be sent to stderr.

So this does provide an easy mechanism to provide feedback from a running streaming job.

If you take my last binary streaming post, in running the code one has no idea of how many Microsoft Word, PDF, or Unknown documents have been processed.

Thus using the counter output format, one can define a simple counterReporter function:

let counterReporter docType =
    stderr.WriteLine (sprintf "reporter:counter:Documents Processed,%s,1" docType)

One can then easily report on documents processed using the following slight code modification:

match Path.GetExtension(filename) with
| WordDocument ->
    // Get access to the word processing document from the input stream
    use document = WordprocessingDocument.Open(reader, false)
    // Process the word document with the mapper
    OfficeWordPageMapper.Map document
    |> Seq.iter (fun value -> outputCollector value)        
    // close document
    document.Close()
    counterReporter "Word Document"
| PdfDocument ->
    // Get access to the pdf processing document from the input stream
    let document = new PdfReader(reader)
    // Process the word document with the mapper
    OfficePdfPageMapper.Map document
    |> Seq.iter (fun value -> outputCollector value)        
    // close document
    document.Close()
    counterReporter "PDF Document"
| UnsupportedDocument ->
    counterReporter "Unknown Document Type"
    ()

Thus we update the Group “Documents Processed”, with the document type, each time we process a document. Looking at the Hadoop job log we can now see:

All nice and easy.

If you want to do some error reporting the process is the same just with a different string format.