Implementing a MapReduce Join with Hadoop and the .Net Framework

I have often been asked how does one implement a Join whilst writing MapReduce code. As such, I thought it would be useful to add an additional sample demonstrating how this is achieved. There are multiple mechanisms one can employ to perform a Join operation, and the one to be discussed will be a Reduce Side 1-to-many join.

As always this sample, amongst others, can be found in the “Generics based Framework for .Net Hadoop MapReduce Job Submission” code download; within the Samples folder.

Join Semantics

Say one wants to join two sets of data, A and B, via a common set attribute. Set A could be defined as a collection of tuples of the form (ki, ai, Ai), where k represents the key value on which we want to do the join, s the set item unique identifier, and S the other attributes. For this set each k value would correspond to a unique value of a.

Set B would be similarly defined as a collection of tuples of the form (ki, bx, Bx). In this case each value of k would equate to multiple values of b.

As an example, set A could be thus represented by an OrderHeader type and B the corresponding OrderDetail types. The k and a values in this case would represent the sales order identifier. The b value would represent the sales order detail identifier.

The basic concept is that the MapReduce job will create a new set C, which would be defined by the tuple collection (ki, ax, by, Ax, By). For each key identifier k, there would be a single value of a, and multiple values for b.

(k64, a52, b106, A52, B106)
(k64, a52, b121, A52, B121)

(k64, a52, b234, A52, B234)

Furthering the sample this new set C could be represented by a new type being the aggregate of the OrderHeader and OrderDetail types; say OrderLine. The SalesOrderId would be the common key attribute.

To perform the Reduce Side join the Mapper would read in the data representing both the sets of data. When processing set A data the emitted value would be (ki, ax, Ax, Ø, Ø), and for set B the value (ki, Ø, Ø, by, By). In both cases the emitted key would be common key attribute value.

The Reducer would receive the set values for each shared key attribute; namely a single (ki, ax, Ax, Ø, Ø) value, and multiple (ki, Ø, Ø, by, By) values. It would then emit the full set of (ki, ax, by, Ax, By) values; for each key attribute.

As an example when processing orders the input data would be parsed into either an OrderHeader or OrderDetail object. In both cases the Mapper would emit an OrderLine item; with a null value for the missing data.  The Reducer would then read these values for a specific SalesOrderID key value and then emit the set of complete OrderLine values.

So as an example, lets do a join between order header and detail information, from the SQL Server AdventureWorks sample database. The sample download also includes a BCP scripts to extract sales data from the sample database. For completeness I have also written the sample in both C# (with heavy use of LINQ) and F#.

Defining the Types

To start we need to define types that represent the sales header and detail information. These type definitions, listed below, are OrderHeader and OrderDetail. As mentioned we also need an aggregate type, called OrderLine, that aggregates both the header and detail types.

C# Classes
  1. /// Order Header Information
  2. public class OrderHeader
  3. {
  4.     public int SalesOrderID { get; set; }
  5.     public DateTime OrderDate { get; set; }
  6.     public DateTime ShipDate { get; set; }
  7.     public int CustomerID { get; set; }
  8.     public string SalesOrderNumber { get; set; }
  9.     public string PurchaseOrderNumber { get; set; }
  10.     public string AccountNumber { get; set; }
  11. }
  12.  
  13. /// Order Header Information
  14. public class OrderDetail
  15. {
  16.     public int SalesOrderID { get; set; }
  17.     public int SalesOrderDetailID { get; set; }
  18.     public int ProductID { get; set; }
  19.     public decimal OrderQty { get; set; }
  20.     public decimal UnitPrice { get; set; }
  21. }
  22.  
  23. /// Order Line Information
  24. public class OrderLine
  25. {
  26.     public OrderHeader OrderHeader { get; set; }
  27.     public OrderDetail OrderDetail { get; set; }
  28. }
F# Records
  1. /// Order Header Information
  2. type OrderHeader = { SalesOrderID:int; OrderDate:DateTime; ShipDate:DateTime; CustomerID:int; SalesOrderNumber:string; PurchaseOrderNumber:string; AccountNumber:string }
  3.  
  4. /// Order Detail Information
  5. type OrderDetail = { SalesOrderID:int; SalesOrderDetailID:int; ProductID:int; OrderQty:decimal; UnitPrice:decimal }
  6.  
  7. /// Order Line Information
  8. type OrderLine = { OrderHeader:OrderHeader; OrderDetail:OrderDetail }

The OrderLine aggregated type is used as the output from the Mapper and the Reducer.

Mapper Processing

The purpose of the mapper is to read in both the header and detail data. For each data record the mapper outputs a value of an OrderLine instance, along with a key value of the SalesOrderID value. When processing a sales header data item an OrderLine is output with a null value for the OrderDetail; and of course vice-versa when processing a detail data item.

The code for parsing the data items is as below. In this instance I have made the determination of the data item type by inspecting the second data value. In this case I am looking for an OrderDate; however one can adopt many different approaches based on the input data.

C# Mapper
  1. // Processes the Order Header and Detail files (In Memory)
  2. class OrderJoinMemoryMapper : MapperBaseText<OrderLine>
  3. {
  4.     public override IEnumerable<Tuple<string, OrderLine>> Map(string value)
  5.     {
  6.         var splits = value.Split('\t');
  7.         int salesOrderID = Int32.Parse(splits[0]);
  8.  
  9.         if (splits[1].Contains("-"))
  10.         {
  11.             DateTime orderDate = DateTime.Parse(splits[1]);
  12.             DateTime shipDate = DateTime.Parse(splits[2]);
  13.             int customerID = Int32.Parse(splits[3]);
  14.             string salesOrderNumber = splits[4];
  15.             string purchaseOrderNumber = splits[5];
  16.             string accountNumber = splits[6];
  17.             var header = new OrderHeader()
  18.             {
  19.                 SalesOrderID = salesOrderID,
  20.                 OrderDate = orderDate,
  21.                 ShipDate = shipDate,
  22.                 CustomerID = customerID,
  23.                 SalesOrderNumber = salesOrderNumber,
  24.                 PurchaseOrderNumber = purchaseOrderNumber,
  25.                 AccountNumber = accountNumber
  26.             };
  27.             yield return Tuple.Create(splits[0], new OrderLine() { OrderHeader = header, OrderDetail = null });
  28.         }
  29.         else
  30.         {
  31.             int salesOrderDetailID = Int32.Parse(splits[1]);
  32.             int productID = Int32.Parse(splits[2]);
  33.             decimal orderQty = Decimal.Parse(splits[3]);
  34.             decimal unitPrice = Decimal.Parse(splits[4]);
  35.             var detail = new OrderDetail()
  36.             {
  37.                 SalesOrderID = salesOrderID,
  38.                 SalesOrderDetailID = salesOrderDetailID,
  39.                 ProductID = productID,
  40.                 OrderQty = orderQty,
  41.                 UnitPrice = unitPrice
  42.             };
  43.             yield return Tuple.Create(splits[0], new OrderLine() { OrderHeader = null, OrderDetail = detail });
  44.         }
  45.     }
  46. }
F# Mapper
  1. // Processes the Order Header and Detail files (In Memory)
  2. type OrderJoinMemoryMapper() =
  3.     inherit MapperBaseText<OrderLine>()
  4.  
  5.     // Performs the split into the correct type
  6.     let (|Header|Detail|Unknown|) (value:string) =
  7.         try
  8.             let splits = value.Split('\t')
  9.             let salesOrderID = Int32.Parse(splits.[0])
  10.             if splits.[1].Contains("-") then
  11.                 // Processing a Header record
  12.                 let orderDate = DateTime.Parse(splits.[1])
  13.                 let shipDate = DateTime.Parse(splits.[2])
  14.                 let customerID = Int32.Parse(splits.[3])
  15.                 let salesOrderNumber = splits.[4]
  16.                 let purchaseOrderNumber = splits.[5]
  17.                 let accountNumber = splits.[6]
  18.                 Header (splits.[0], { OrderHeader.SalesOrderID = salesOrderID; OrderDate = orderDate; ShipDate = shipDate; CustomerID = customerID;
  19.                                     SalesOrderNumber = salesOrderNumber; PurchaseOrderNumber = purchaseOrderNumber; AccountNumber = accountNumber })
  20.             else
  21.                 // Processing a detail record
  22.                 let salesOrderDetailID = Int32.Parse(splits.[1])
  23.                 let productID = Int32.Parse(splits.[2])
  24.                 let orderQty = Decimal.Parse(splits.[3])
  25.                 let unitPrice = Decimal.Parse(splits.[4])
  26.                 Detail (splits.[0], {OrderDetail.SalesOrderID = salesOrderID; SalesOrderDetailID = salesOrderDetailID; ProductID = productID;
  27.                                     OrderQty = orderQty; UnitPrice = unitPrice })
  28.         with
  29.         | :? System.ArgumentException -> Unknown
  30.  
  31.     // Map the data from input name/value to output name/value
  32.     override self.Map (value:string) =
  33.         seq {
  34.             match value with
  35.             | Header (key, header) -> yield (key, { OrderLine.OrderHeader = header; OrderDetail = Unchecked.defaultof<OrderDetail> })
  36.             | Detail (key, detail) -> yield (key, { OrderLine.OrderHeader = Unchecked.defaultof<OrderHeader>; OrderDetail = detail })
  37.             | Unknown -> ()
  38.         }

Literally that is it for the Mapper. It determines the type of input line and emits an OrderLine item.

As you can see the key is the SalesOrderID value. More on this later, when talking about a performance optimization.

Reducer Processing

The purpose of the Reducer is to locate the OrderHeader value for each SalesOrderID key value, and for each corresponding OrderDetail value output a complete OrderLine; consisting of the located header value and the detail values.

In this version of the code, again shown below, use is made of a List of OrderDetail items. Once all the values have been read then the OrderLine sequence of values are returned.

C# Reducer
  1. // Performs the combined data 1-many join (In Memory)
  2. class OrderJoinMemoryReducer : ReducerBase<OrderLine, OrderLine>
  3. {
  4.     public override IEnumerable<Tuple<string, OrderLine>> Reduce(string key, IEnumerable<OrderLine> values)
  5.     {
  6.         OrderHeader orderHeader = null;
  7.         var orderDetails = new List<OrderDetail>();
  8.  
  9.         foreach (var line in values)
  10.         {
  11.             if (line.OrderDetail != null)
  12.             {
  13.                 orderDetails.Add(line.OrderDetail);
  14.             }
  15.             else
  16.             {
  17.                 orderHeader = line.OrderHeader;
  18.             }
  19.         }
  20.  
  21.         if (orderHeader != null)
  22.         {
  23.             return orderDetails.Select(detail => Tuple.Create(key, new OrderLine() { OrderHeader = orderHeader, OrderDetail = detail }));
  24.         }
  25.         else
  26.         {
  27.             return Enumerable.Empty<Tuple<string, OrderLine>>();
  28.         }
  29.     }
  30. }
F# Reducer
  1. // Performs the combined data 1-many join (In Memory)
  2. type OrderJoinMemoryReducer() =
  3.     inherit ReducerBase<OrderLine, OrderLine>()
  4.  
  5.     override self.Reduce (key:string) (values:seq<OrderLine>) =
  6.         let orderHeader = ref Unchecked.defaultof<OrderHeader>
  7.         let hasValue value = not (obj.ReferenceEquals (value, Unchecked.defaultof<_>))
  8.  
  9.         let orderDetails =
  10.             values
  11.             |> Seq.choose (fun item ->
  12.                 if (hasValue item.OrderDetail) then
  13.                     Some(item.OrderDetail)
  14.                 else
  15.                     orderHeader := item.OrderHeader
  16.                     None)
  17.  
  18.         if (hasValue orderHeader) then
  19.             orderDetails
  20.             |> Seq.toList
  21.             |> Seq.map (fun item ->
  22.                 (key, { OrderLine.OrderHeader = !orderHeader; OrderDetail = item }))
  23.         else
  24.             Seq.empty

As you can see this code has a memory limitation. For each SalesOrderID value all the corresponding detail values need to be cached in memory, as no determination can be made as to when the header value is located. This may not be an issue with a small number of details associated to a header, but what about data where this number can be extremely large.

This limitation however is easily overcome through the use of a secondary sort. Basically two key values are used to ensure the header value always arrives first within the Reducer.

Secondary Sort Optimization

When processing the order data for a particular key within the reducer, if one knows that the first value will always be the header data then this can be saved for detail data processing. Thus when reading the subsequent detail lines the aggregated OrderLine instance values can be emitted directly without the intermediate List processing step. This is where a secondary sort comes into play.

If one uses two keys rather than one, it is possible to sort the data such that the first value, for each SalesOrderID, will be the header data. This can be achieved in our case by using a secondary sort key of the SalesOrderDetailID. For the header one can then just use a value of zero to ensure it is the first value; as all details items have a positive value. Of course, different approaches can be taken depending on the data domain.

Although the data is sorted on two separate keys, the partitioning must be such that the data for each SalesOrderID is sent to a single Reducer; spanning multiple SalesOrderDetailID values.

Using this approach here is the code for the modified Mapper logic:

C# Mapper
  1. // Processes the Order Header and Detail files
  2. class OrderJoinMapper : MapperBaseText<OrderLine>
  3. {
  4.     public override IEnumerable<Tuple<string, OrderLine>> Map(string value)
  5.     {
  6.         var splits = value.Split('\t');
  7.         int salesOrderID = Int32.Parse(splits[0]);
  8.  
  9.         if (splits[1].Contains("-"))
  10.         {
  11.             DateTime orderDate = DateTime.Parse(splits[1]);
  12.             DateTime shipDate = DateTime.Parse(splits[2]);
  13.             int customerID = Int32.Parse(splits[3]);
  14.             string salesOrderNumber = splits[4];
  15.             string purchaseOrderNumber = splits[5];
  16.             string accountNumber = splits[6];
  17.             string key = Context.FormatKeys(splits[0], "0");
  18.             var header = new OrderHeader()
  19.             {
  20.                 SalesOrderID = salesOrderID,
  21.                 OrderDate = orderDate,
  22.                 ShipDate = shipDate,
  23.                 CustomerID = customerID,
  24.                 SalesOrderNumber = salesOrderNumber,
  25.                 PurchaseOrderNumber = purchaseOrderNumber,
  26.                 AccountNumber = accountNumber
  27.             };
  28.             yield return Tuple.Create(key, new OrderLine() { OrderHeader = header, OrderDetail = null });
  29.         }
  30.         else
  31.         {
  32.             int salesOrderDetailID = Int32.Parse(splits[1]);
  33.             int productID = Int32.Parse(splits[2]);
  34.             decimal orderQty = Decimal.Parse(splits[3]);
  35.             decimal unitPrice = Decimal.Parse(splits[4]);
  36.             string key = Context.FormatKeys(splits[0], splits[1]);
  37.             var detail = new OrderDetail()
  38.             {
  39.                 SalesOrderID = salesOrderID,
  40.                 SalesOrderDetailID = salesOrderDetailID,
  41.                 ProductID = productID,
  42.                 OrderQty = orderQty,
  43.                 UnitPrice = unitPrice
  44.             };
  45.             yield return Tuple.Create(key, new OrderLine() { OrderHeader = null, OrderDetail = detail });
  46.         }
  47.     }
  48. }
F# Mapper
  1. // Processes the Order Header and Detail files
  2. type OrderJoinMapper() =
  3.     inherit MapperBaseText<OrderLine>()
  4.  
  5.     // Performs the split into the correct type
  6.     let (|Header|Detail|Unknown|) (value:string) =
  7.         try
  8.             let splits = value.Split('\t')
  9.             let salesOrderID = Int32.Parse(splits.[0])
  10.             if splits.[1].Contains("-") then
  11.                 // Processing a Header record
  12.                 let orderDate = DateTime.Parse(splits.[1])
  13.                 let shipDate = DateTime.Parse(splits.[2])
  14.                 let customerID = Int32.Parse(splits.[3])
  15.                 let salesOrderNumber = splits.[4]
  16.                 let purchaseOrderNumber = splits.[5]
  17.                 let accountNumber = splits.[6]
  18.                 let key = Context.FormatKeys(splits.[0], "0")
  19.                 Header (key, { OrderHeader.SalesOrderID = salesOrderID; OrderDate = orderDate; ShipDate = shipDate; CustomerID = customerID;
  20.                                     SalesOrderNumber = salesOrderNumber; PurchaseOrderNumber = purchaseOrderNumber; AccountNumber = accountNumber })
  21.             else
  22.                 // Processing a detail record
  23.                 let salesOrderDetailID = Int32.Parse(splits.[1])
  24.                 let productID = Int32.Parse(splits.[2])
  25.                 let orderQty = Decimal.Parse(splits.[3])
  26.                 let unitPrice = Decimal.Parse(splits.[4])
  27.                 let key = Context.FormatKeys(splits.[0], splits.[1])
  28.                 Detail (key, {OrderDetail.SalesOrderID = salesOrderID; SalesOrderDetailID = salesOrderDetailID; ProductID = productID;
  29.                                     OrderQty = orderQty; UnitPrice = unitPrice })
  30.         with
  31.         | :? System.ArgumentException -> Unknown
  32.  
  33.     // Map the data from input name/value to output name/value
  34.     override self.Map (value:string) =
  35.         seq {
  36.             match value with
  37.             | Header (key, header) -> yield (key, { OrderLine.OrderHeader = header; OrderDetail = Unchecked.defaultof<OrderDetail> })
  38.             | Detail (key, detail) -> yield (key, { OrderLine.OrderHeader = Unchecked.defaultof<OrderHeader>; OrderDetail = detail })
  39.             | Unknown -> ()
  40.         }

The multiple keys values are delimitated with a Tab value.

The Reducer code then becomes a lot simpler as no List caching is involved; and where LINQ enables succinct C# code.

C# Reducer
  1. // Performs the combined data 1-many join
  2. class OrderJoinReducer : ReducerBase<OrderLine, OrderLine>
  3. {
  4.     public override IEnumerable<Tuple<string, OrderLine>> Reduce(string key, IEnumerable<OrderLine> values)
  5.     {
  6.         string salesOrderID = Context.GetKeys(key)[0];
  7.         OrderHeader orderHeader = values.ElementAt(0).OrderHeader;
  8.  
  9.         return values.Skip(1).Select(detail => Tuple.Create(salesOrderID, new OrderLine() { OrderHeader = orderHeader, OrderDetail = detail.OrderDetail }));
  10.     }
  11. }
F# Reducer
  1. // Performs the combined data 1-many join
  2. type OrderJoinReducer() =
  3.     inherit ReducerBase<OrderLine, OrderLine>()
  4.  
  5.     override self.Reduce (key:string) (values:seq<OrderLine>) =
  6.         
  7.         let salesOrderID = Context.GetKeys(key).[0]
  8.         let orderHeader = Seq.nth 0 values
  9.         
  10.         values
  11.         |> Seq.skip 1
  12.         |> Seq.map (fun item ->
  13.             (salesOrderID, { OrderLine.OrderHeader = orderHeader.OrderHeader; OrderDetail = item.OrderDetail }))

As you can see the code is probably simpler, especially when it comes to the Reducer, and is also more efficient than the In-Memory approach.

Submitting the Jobs

For the In-Memory code the job submission is quite simple:

%BASEPATH%\MSDN.Hadoop.MapReduce\Release\MSDN.Hadoop.Submission.Console.exe
-input "join/data"
-output "join/order"
-mapper "MSDN.Hadoop.MapReduceFSharp.OrderJoinMemoryMapper, MSDN.Hadoop.MapReduceFSharp"
-reducer "MSDN.Hadoop.MapReduceFSharp.OrderJoinMemoryReducer, MSDN.Hadoop.MapReduceFSharp"
-file "%BASEPATH%\MSDN.Hadoop.MapReduceFSharp\Release\MSDN.Hadoop.MapReduceFSharp.dll"

However, for the optimized version one needs to accommodate the fact that the sorting takes place on two keys, and the data partitioning only on one. This is achieved by setting these exact options:

%BASEPATH%\MSDN.Hadoop.MapReduce\Release\MSDN.Hadoop.Submission.Console.exe
-input "join/data"
-output "join/order"
-mapper "MSDN.Hadoop.MapReduceFSharp.OrderJoinMapper, MSDN.Hadoop.MapReduceFSharp"
-reducer "MSDN.Hadoop.MapReduceFSharp.OrderJoinReducer, MSDN.Hadoop.MapReduceFSharp"
-file "%BASEPATH%\MSDN.Hadoop.MapReduceFSharp\Release\MSDN.Hadoop.MapReduceFSharp.dll"
-numberKeys 2 -numberPartitionKeys 1

The reason these options were added to version 1.0.0 of the code was to accommodate this exact type of processing.

Conclusion

To conclude here is some sample output from the join.

43667    {"OrderDetail":{"OrderQty":3,"ProductID":710,"SalesOrderDetailID":77,"SalesOrderID":43667,"UnitPrice":5.7000},"OrderHeader":{"AccountNumber":"10-4020-000646","CustomerID":29974,"OrderDate":"\/Date(1120172400000+0100)\/","PurchaseOrderNumber":"PO15428132599","SalesOrderID":43667,"SalesOrderNumber":"SO43667","ShipDate":"\/Date(1120777200000+0100)\/"}}
43667    {"OrderDetail":{"OrderQty":1,"ProductID":773,"SalesOrderDetailID":78,"SalesOrderID":43667,"UnitPrice":2039.9940},"OrderHeader":{"AccountNumber":"10-4020-000646","CustomerID":29974,"OrderDate":"\/Date(1120172400000+0100)\/","PurchaseOrderNumber":"PO15428132599","SalesOrderID":43667,"SalesOrderNumber":"SO43667","ShipDate":"\/Date(1120777200000+0100)\/"}}
43667    {"OrderDetail":{"OrderQty":1,"ProductID":778,"SalesOrderDetailID":79,"SalesOrderID":43667,"UnitPrice":2024.9940},"OrderHeader":{"AccountNumber":"10-4020-000646","CustomerID":29974,"OrderDate":"\/Date(1120172400000+0100)\/","PurchaseOrderNumber":"PO15428132599","SalesOrderID":43667,"SalesOrderNumber":"SO43667","ShipDate":"\/Date(1120777200000+0100)\/"}}
43667    {"OrderDetail":{"OrderQty":1,"ProductID":775,"SalesOrderDetailID":80,"SalesOrderID":43667,"UnitPrice":2024.9940},"OrderHeader":{"AccountNumber":"10-4020-000646","CustomerID":29974,"OrderDate":"\/Date(1120172400000+0100)\/","PurchaseOrderNumber":"PO15428132599","SalesOrderID":43667,"SalesOrderNumber":"SO43667","ShipDate":"\/Date(1120777200000+0100)\/"}}

Utilizing the output is merely a case of serializing the data back into the OrderLine type.

Hopefully I have demonstrated that MapReduce joins are not that difficult to perform. With a little thought around the types, the code is not much more than data parsing.

A word of caution, compared to standard database joins, MapReduce joins are slow. If you can join the data before running MapReduce processing then this should be your preferred approach.