C# MapReduce Based Co-occurrence Item Based Recommender

As promised, to conclude the Co-occurrence Approach to an Item Based Recommender posts I wanted to port the MapReduce code to C#; just for kicks and to prove the code is also easy to write in C#. For an explanation of the MapReduce post review the previous article:

https://blogs.msdn.com/b/carlnol/archive/2012/07/07/mapreduce-based-co-occurrence-approach-to-an-item-based-recommender.aspx

The latest version of the code now includes the MSDN.Recommender.MapReduceCs project, that is a full C# implementation of the corresponding F# project; MSDN.Recommender.MapReduce. These code variants are intentionally similar in structure.

The code for the Order Mapper is as follows:

Order Mapper

  1. public class OrderVectorMapper : MapperBaseText<Types.ProductQuantityList>
  2. {
  3.     // Configuration values
  4.     static double qtyMaximum = 5.0;                      // Maximum rating contribution for an item
  5.     static double recentFactor = 2.0;                    // Quantity increase factor for recent items
  6.     
  7.     static DateTime baseDate = DateTime.Today.AddMonths(-3);    // Date for a recent item
  8.  
  9.     Types.ProductQuantityList products = new Types.ProductQuantityList();
  10.     int? currentOrder = null;
  11.  
  12.     // Converts an order Id to a string key
  13.     private static string getOrderKey(int orderId)
  14.     {
  15.         return String.Format("{0:D}", orderId);
  16.     }
  17.  
  18.     // Adds a quantity factor based on recent orders
  19.     private static double orderFactor(Types.OrderDetail order)
  20.     {
  21.         if (DateTime.Compare(order.OrderDate, OrderVectorMapper.baseDate) > 0)
  22.         {
  23.             return OrderVectorMapper.recentFactor;
  24.         }
  25.         else
  26.         {
  27.             return 1.0;
  28.         }
  29.     }
  30.  
  31.     /// Map the data from input name/value to output name/value
  32.     public override IEnumerable<Tuple<string, Types.ProductQuantityList>> Map(string value)
  33.     {
  34.         // Ensure order can be derived
  35.         Types.OrderDetail order = null;
  36.         try
  37.         {
  38.             order = Types.Helpers.ParseInputData(value);
  39.         }
  40.         catch (ArgumentException)
  41.         {
  42.             order = null;
  43.         }
  44.  
  45.         // Process the order
  46.         if (order != null)
  47.         {
  48.             double quantity = Math.Min(((double)order.OrderQty), OrderVectorMapper.qtyMaximum) * OrderVectorMapper.orderFactor(order);
  49.             Types.ProductQuantity product = new Types.ProductQuantity(order.ProductId, quantity);
  50.  
  51.             if (this.currentOrder.HasValue && (order.OrderId != this.currentOrder.Value))
  52.             {
  53.                 yield return Tuple.Create(OrderVectorMapper.getOrderKey(currentOrder.Value), this.products);
  54.                 this.products.Clear();
  55.             }
  56.             this.currentOrder = order.OrderId;
  57.             this.products.Add(product);
  58.         }
  59.         else
  60.         {
  61.             Context.IncrementCounter("ORDERS", "Skipped Lines");
  62.         }
  63.     }
  64.  
  65.     /// Output remaining Map items
  66.     public override IEnumerable<Tuple<string, Types.ProductQuantityList>> Cleanup()
  67.     {
  68.         if (this.currentOrder.HasValue)
  69.         {
  70.             yield return Tuple.Create(OrderVectorMapper.getOrderKey(currentOrder.Value), this.products);
  71.         }
  72.     }
  73. }

And that of the Order Reducer is:

Order Reducer

  1. class OrderVectorReducer : ReducerBase<Types.ProductQuantityList, Types.ProductQuantityList>
  2. {
  3.     /// Reduce the order data into a product list
  4.     public override IEnumerable<Tuple<string, Types.ProductQuantityList>> Reduce(string key, IEnumerable<Types.ProductQuantityList> values)
  5.     {
  6.         var products = new Types.ProductQuantityList();
  7.  
  8.         foreach (var prodList in values)
  9.         {
  10.             products.AddRange(prodList);
  11.         }
  12.  
  13.         yield return Tuple.Create(key, products);
  14.     }
  15. }

Your friend when writing the C# code is the yield return command for emitting data, and the Tuple.Create() command for creating the key/value pairs. The two commands enable one to return a key/value sequence; namely a .Net IEnumerable.

For completeness the code for the Product Mapper is:

Product Mapper

  1. class ProductVectorMapper : MapperBaseText<Types.ProductQuantity>
  2. {
  3.     static int maxSize = 1024 * 1024;
  4.     Dictionary<Tuple<int, int>, double> prodPairs = new Dictionary<Tuple<int, int>, double>(ProductVectorMapper.maxSize);
  5.  
  6.     // Converts an order Id to a string key
  7.     static private string getProductKey(int productId)
  8.     {
  9.         return String.Format("{0:D}", productId);
  10.     }
  11.  
  12.     // Parses an input line of format List<ProductQuantity>
  13.     static private Types.ProductQuantityList deserialize(string input)
  14.     {
  15.         string[] keyValue = input.Split('\t');
  16.         return Types.Helpers.ParseProductQuantityList(keyValue[1].Trim());
  17.     }
  18.  
  19.     // calculates the pairs for an order
  20.     static private IEnumerable<Tuple<T, T>> pairs<T>(List<T> items)
  21.     {
  22.         int count = items.Count;
  23.         if (count == 2)
  24.         {
  25.             yield return Tuple.Create(items[0], items[1]);
  26.         }
  27.         else if (count > 2)
  28.         {
  29.             for (int idxOut = 0; idxOut <= (count - 2); idxOut++)
  30.             {
  31.                 for (int idxIn = (idxOut + 1); idxIn <= (count - 1); idxIn++)
  32.                 {
  33.                     yield return Tuple.Create(items[idxOut], items[idxIn]);
  34.                 }
  35.             }
  36.         }
  37.     }
  38.  
  39.     // Adds to the table
  40.     private void addRow(Tuple<int, int> idx, double qty)
  41.     {
  42.         if (this.prodPairs.ContainsKey(idx))
  43.         {
  44.             this.prodPairs[idx] = this.prodPairs[idx] + qty;
  45.         }
  46.         else
  47.         {
  48.             this.prodPairs[idx] = qty;
  49.         }
  50.     }
  51.  
  52.     // Defines a sequence of the current pairs
  53.     private IEnumerable<Tuple<string, Types.ProductQuantity>> currents()
  54.     {
  55.         foreach (var item in prodPairs)
  56.         {
  57.             int product1 = item.Key.Item1;
  58.             int product2 = item.Key.Item2;
  59.             yield return Tuple.Create(ProductVectorMapper.getProductKey(product1), new Types.ProductQuantity(product2, item.Value));                
  60.         }
  61.         prodPairs.Clear();
  62.     }
  63.  
  64.     /// Map the data from input name/value to output name/value
  65.     public override IEnumerable<Tuple<string, Types.ProductQuantity>> Map(string value)
  66.     {
  67.         var products = ProductVectorMapper.pairs<Types.ProductQuantity>(ProductVectorMapper.deserialize(value));
  68.         foreach (var product in products)
  69.         {
  70.             Types.ProductQuantity product1 = product.Item1;
  71.             Types.ProductQuantity product2 = product.Item2;
  72.             double qty = Math.Max(product1.Quantity, product2.Quantity);
  73.             this.addRow(Tuple.Create(product1.ProductId, product2.ProductId), qty);
  74.             this.addRow(Tuple.Create(product2.ProductId, product1.ProductId), qty);
  75.         }
  76.  
  77.         if (prodPairs.Count > ProductVectorMapper.maxSize)
  78.         {
  79.             return currents();
  80.         }
  81.         else
  82.         {
  83.             return Enumerable.Empty<Tuple<string, Types.ProductQuantity>>();
  84.         }
  85.     }
  86.  
  87.     /// Output remaining Map items
  88.     public override IEnumerable<Tuple<string, Types.ProductQuantity>> Cleanup()
  89.     {
  90.         return currents();
  91.     }
  92. }

That of the Product Combiner is:

Product Combiner

  1. class ProductVectorCombiner : CombinerBase<Types.ProductQuantity>
  2. {
  3.     /// Combine the data from input name/value to output name/value
  4.     public override IEnumerable<Tuple<string, Types.ProductQuantity>> Combine(string key, IEnumerable<Types.ProductQuantity> values)
  5.     {
  6.         int maxSize = 10000;               // Expected number of product correlations
  7.         var products = new Dictionary<int, double>(maxSize);
  8.  
  9.         // Process the combiner input adding to the table
  10.         foreach (var product in values)
  11.         {
  12.             if (products.ContainsKey(product.ProductId))
  13.             {
  14.                 products[product.ProductId] = products[product.ProductId] + product.Quantity;
  15.             }
  16.             else
  17.             {
  18.                 products[product.ProductId] = product.Quantity;
  19.             }
  20.         }
  21.  
  22.         // Return the combiner input
  23.         return products.Select(item => Tuple.Create(key, new Types.ProductQuantity(item.Key, item.Value)));
  24.     }
  25. }

And finally, that of the Product Reducer is:

Product Reducer

  1. class ProductVectorReducer : ReducerBase<Types.ProductQuantity, Types.ProductRecommendations>
  2. {
  3.     /// Reduce the data from input name/value to output name/value
  4.     public override IEnumerable<Tuple<string, Types.ProductRecommendations>> Reduce(string key, IEnumerable<Types.ProductQuantity> values)
  5.     {
  6.         // Configuration values
  7.         double entryThreshold = 20.0;                     // Minimum correlations for matrix inclusion
  8.         int matrixSize = 10000;                           // Expected Correlations for Hash Table init
  9.  
  10.         int minItem = Int32.MaxValue;
  11.         int maxItem = 0;
  12.         var rcTable = new Dictionary<int, double>(matrixSize);
  13.  
  14.         // Process the combiner input adding to the table
  15.         foreach (var product in values)
  16.         {
  17.             int idx = product.ProductId;
  18.             minItem = Math.Min(idx, minItem);
  19.             maxItem = Math.Max(idx, maxItem);
  20.  
  21.             if (rcTable.ContainsKey(product.ProductId))
  22.             {
  23.                 rcTable[product.ProductId] = rcTable[product.ProductId] + product.Quantity;
  24.             }
  25.             else
  26.             {
  27.                 rcTable[product.ProductId] = product.Quantity;
  28.             }
  29.         }
  30.  
  31.         int offset = minItem;
  32.         int size = maxItem + 1 - minItem;
  33.  
  34.         // Build the sparse vector
  35.         var vector = new SparseVector(size);
  36.         foreach (var item in rcTable)
  37.         {
  38.             if (item.Value > entryThreshold)
  39.             {
  40.                 vector[item.Key - offset] = item.Value;
  41.             }
  42.         }
  43.  
  44.         var recommendations = new Types.ProductRecommendations(Int32.Parse(key), offset, vector);
  45.         Context.IncrementCounter("PRODUCTS", "Recommendations Written");
  46.  
  47.         yield return Tuple.Create(key, recommendations);
  48.     }
  49. }

A lot of the processing is based upon processing IEnumerable sequences. For this reason foreach and Linq expressions are used a lot. I could have defined a ForEach IEnumerable extension, to use Linq exclusively, but have preferred the C# syntax.

One difference in the final Reducer is how the Sparse Matrix is defined. In the F# code an F# extension allows one to build a SparseMatrix type by providing a sequence of index/value tuples. In the C# code one has to define an empty SparseMatrix and then interactively set the element values.

Note: There may be some confusion in the usage of the float type for .Net programmers, when switching between C# and F#, due to F#’s naming for floating-point number types. F# float type corresponds to System.Double in .NET (which is called double in C#), and F# float32 type corresponds to System.Single in .NET (which is called float in C#). For this reason I have switched to using the type double in F#, as it is another type alias for the System.Double type.

Of course the recommendations produced by the C# code are the same as the F# code, and clearly demonstrate that the Framework cleanly support writing MapReduce jobs in C#. I personally find the F# code more succinct and easier to read, but this is my personal preference.