.Net Hadoop MapReduce Job Framework - Revisited (Archived)

An updated version of this post can be found at:

https://blogs.msdn.com/b/carlnol/archive/2012/04/29/generic-based-framework-for-net-hadoop-mapreduce-job-submission.aspx

If you have been using the Framework for Composing and Submitting .Net Hadoop MapReduce Jobs you may want to download an updated version of the code:

https://code.msdn.microsoft.com/Framework-for-Composing-af656ef7

The biggest change in the latest code is the modification of the serialization mechanism. Formerly data was written out of the mapper, and combiner, as a string. This has now been changed to use a binary formatter. This means that the input into the mappers and reducers is no longer a string but rather an object; which can then be cast directly to the expected type. Here are the new Combiner and Reducer base classes:

F# Combiner

  1. [<AbstractClass>]
  2. type CombinerBase() =
  3.     inherit MapReduceBase()
  4.  
  5.     abstract member Combine: key:string -> values:IEnumerable<obj> -> IEnumerable<string * obj>

F# Reducer

  1. [<AbstractClass>]
  2. type ReducerBase() =
  3.     inherit MapReduceBase()
  4.  
  5.     abstract member Reduce: key:string -> values:IEnumerable<obj> -> IEnumerable<string * obj>

C# Combiner

  1. [AbstractClass]
  2. public abstract class CombinerBase : MapReduceBase
  3. {
  4.     protected CombinerBase();
  5.  
  6.     public abstract override IEnumerable<Tuple<string, object>> Combine(string key, IEnumerable<object> values);
  7. }

C# Reducer

  1. [AbstractClass]
  2. public abstract class ReducerBase : MapReduceBase
  3. {
  4.     protected ReducerBase();
  5.  
  6.     public abstract override IEnumerable<Tuple<string, object>> Reduce(string key, IEnumerable<object> values);
  7. }

Here is a sample of an implemented Map and Reduce types:

F#

  1. // Extracts the QueryTime for each Platform Device
  2. type MobilePhoneQueryMapper() =
  3.     inherit MapperBaseText()
  4.  
  5.     // Performs the split into key/value
  6.     let splitInput (value:string) =
  7.         try
  8.             let splits = value.Split('\t')
  9.             let devicePlatform = splits.[3]
  10.             let queryTime = TimeSpan.Parse(splits.[1])
  11.             Some(devicePlatform, box queryTime)
  12.         with
  13.         | :? System.ArgumentException -> None
  14.  
  15.     // Map the data from input name/value to output name/value
  16.     override self.Map (value:string) =
  17.         seq {
  18.             let result = splitInput value
  19.             if result.IsSome then
  20.                 yield result.Value
  21.         }
  22.  
  23. // Calculates the (Min, Avg, Max) of the input stream query time (based on Platform Device)
  24. type MobilePhoneQueryReducer() =
  25.     inherit ReducerBase()
  26.  
  27.     override self.Reduce (key:string) (values:seq<obj>) =
  28.         let initState = (TimeSpan.MaxValue, TimeSpan.MinValue, 0L, 0L)
  29.         let (minValue, maxValue, totalValue, totalCount) =
  30.             values
  31.             |> Seq.map (fun value -> value :?> TimeSpan)
  32.             |> Seq.fold (fun (minValue, maxValue, totalValue, totalCount) value ->
  33.                 (min minValue value, max maxValue value, totalValue + (int64)(value.TotalSeconds), totalCount + 1L) ) initState
  34.  
  35.         Seq.singleton (key, box (minValue, TimeSpan.FromSeconds((float)(totalValue/totalCount)), maxValue))

C#

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5.  
  6. using MSDN.Hadoop.MapReduceBase;
  7.  
  8. namespace MSDN.Hadoop.MapReduceCSharp
  9. {
  10.     public class MobilePhoneMinMapper : MapperBaseText
  11.     {
  12.         private Tuple<string, object> GetLineValue(string value)
  13.         {
  14.             try
  15.             {
  16.                 string[] splits = value.Split('\t');
  17.                 string devicePlatform = splits[3];
  18.                 TimeSpan queryTime = TimeSpan.Parse(splits[1]);
  19.                 return new Tuple<string, object>(devicePlatform, queryTime);
  20.             }
  21.             catch (Exception)
  22.             {
  23.                 return null;
  24.             }
  25.         }
  26.  
  27.         public override IEnumerable<Tuple<string, object>> Map(string value)
  28.         {
  29.             var returnVal = GetLineValue(value);
  30.             if (returnVal != null) yield return returnVal;
  31.         }
  32.     }
  33.  
  34.     public class MobilePhoneMinCombiner : CombinerBase
  35.     {
  36.         public override IEnumerable<Tuple<string, object>> Combine(string key, IEnumerable<object> value)
  37.         {
  38.             yield return new Tuple<string, object>(key, value.Select(timespan => (TimeSpan)timespan).Min());
  39.         }
  40.     }
  41.  
  42.     public class MobilePhoneMinReducer : ReducerBase
  43.     {
  44.         public override IEnumerable<Tuple<string, object>> Reduce(string key, IEnumerable<object> value)
  45.         {
  46.             yield return new Tuple<string, object>(key, value.Select(timespan => (TimeSpan)timespan).Min());
  47.         }
  48.     }
  49. }

The changes are subtle but they simplify the processing in the combiner and reducer, removing the need for any string processing. To add the visibility of the data coming out of the reducer, a string format is still used.

The other change is around support for multiple key output from the mapper. Lets start with a sample showing how this is achieved:

  1. // Extracts the QueryTime for each Platform Device
  2. type StoreXmlElementMapper() =    
  3.     inherit MapperBaseXml()
  4.  
  5.     override self.Map (element:XElement) =
  6.  
  7.         let aw = "https://schemas.microsoft.com/sqlserver/2004/07/adventure-works/StoreSurvey"
  8.         let demographics = element.Element(XName.Get("Demographics")).Element(XName.Get("StoreSurvey", aw))
  9.  
  10.         seq {
  11.             if not(demographics = null) then
  12.                 let business = demographics.Element(XName.Get("BusinessType", aw)).Value
  13.                 let bank = demographics.Element(XName.Get("BankName", aw)).Value
  14.                 let key = Utilities.FormatKeys(business, bank)
  15.                 let sales = Decimal.Parse(demographics.Element(XName.Get("AnnualSales", aw)).Value) |> box
  16.                 yield (key, sales)
  17.             }
  18.  
  19. // Calculates the Total Revenue of the store demographics
  20. type StoreXmlElementReducer() =
  21.     inherit ReducerBase()
  22.  
  23.     override self.Reduce (key:string) (values:seq<obj>) =
  24.         let totalRevenue =
  25.             values |>
  26.             Seq.fold (fun revenue value -> revenue + (value :?> decimal)) 0M
  27.  
  28.         Seq.singleton (key, box totalRevenue)

Using multiple keys from the Mapper is a two-step process. Firstly the Mapper needs to be modified to output a string based key in the correct format. This is done by passing the set of string key values into the Utilities.FormatKeys() function. This concatenates the keys using the necessary tab character. Secondly, the job has to be submitted specifying the expected number of keys:

MSDN.Hadoop.Submission.Console.exe -input "stores/demographics" -output "stores/banking"
-mapper "MSDN.Hadoop.MapReduceFSharp.StoreXmlElementMapper, MSDN.Hadoop.MapReduceFSharp"
-reducer "MSDN.Hadoop.MapReduceFSharp.StoreXmlElementReducer, MSDN.Hadoop.MapReduceFSharp"
-file "%HOMEPATH%\Projects\MSDN.Hadoop.MapReduce\Release\MSDN.Hadoop.MapReduceFSharp.dll"
-nodename Store -format Xml -numberKeys 2

One final note, in the Document Classes folder there are two versions of the Streaming jar; one for running in azure and one for when running local. The difference is that they have been compiled with different version of Java. Just remember to use the appropriate version (dropping the –local and –azure prefixes) when copying to your Hadoop lib folder.

Hopefully you will find these changes useful.