C# MapReduce Based Co-occurrence Item Based Recommender
As promised, to conclude the Co-occurrence Approach to an Item Based Recommender posts I wanted to port the MapReduce code to C#; just for kicks and to prove the code is also easy to write in C#. For an explanation of the MapReduce post review the previous article:
The latest version of the code now includes the MSDN.Recommender.MapReduceCs project, that is a full C# implementation of the corresponding F# project; MSDN.Recommender.MapReduce. These code variants are intentionally similar in structure.
The code for the Order Mapper is as follows:
Order Mapper
- public class OrderVectorMapper : MapperBaseText<Types.ProductQuantityList>
- {
- // Configuration values
- static double qtyMaximum = 5.0; // Maximum rating contribution for an item
- static double recentFactor = 2.0; // Quantity increase factor for recent items
- static DateTime baseDate = DateTime.Today.AddMonths(-3); // Date for a recent item
- Types.ProductQuantityList products = new Types.ProductQuantityList();
- int? currentOrder = null;
- // Converts an order Id to a string key
- private static string getOrderKey(int orderId)
- {
- return String.Format("{0:D}", orderId);
- }
- // Adds a quantity factor based on recent orders
- private static double orderFactor(Types.OrderDetail order)
- {
- if (DateTime.Compare(order.OrderDate, OrderVectorMapper.baseDate) > 0)
- {
- return OrderVectorMapper.recentFactor;
- }
- else
- {
- return 1.0;
- }
- }
- /// Map the data from input name/value to output name/value
- public override IEnumerable<Tuple<string, Types.ProductQuantityList>> Map(string value)
- {
- // Ensure order can be derived
- Types.OrderDetail order = null;
- try
- {
- order = Types.Helpers.ParseInputData(value);
- }
- catch (ArgumentException)
- {
- order = null;
- }
- // Process the order
- if (order != null)
- {
- double quantity = Math.Min(((double)order.OrderQty), OrderVectorMapper.qtyMaximum) * OrderVectorMapper.orderFactor(order);
- Types.ProductQuantity product = new Types.ProductQuantity(order.ProductId, quantity);
- if (this.currentOrder.HasValue && (order.OrderId != this.currentOrder.Value))
- {
- yield return Tuple.Create(OrderVectorMapper.getOrderKey(currentOrder.Value), this.products);
- this.products.Clear();
- }
- this.currentOrder = order.OrderId;
- this.products.Add(product);
- }
- else
- {
- Context.IncrementCounter("ORDERS", "Skipped Lines");
- }
- }
- /// Output remaining Map items
- public override IEnumerable<Tuple<string, Types.ProductQuantityList>> Cleanup()
- {
- if (this.currentOrder.HasValue)
- {
- yield return Tuple.Create(OrderVectorMapper.getOrderKey(currentOrder.Value), this.products);
- }
- }
- }
And that of the Order Reducer is:
Order Reducer
- class OrderVectorReducer : ReducerBase<Types.ProductQuantityList, Types.ProductQuantityList>
- {
- /// Reduce the order data into a product list
- public override IEnumerable<Tuple<string, Types.ProductQuantityList>> Reduce(string key, IEnumerable<Types.ProductQuantityList> values)
- {
- var products = new Types.ProductQuantityList();
- foreach (var prodList in values)
- {
- products.AddRange(prodList);
- }
- yield return Tuple.Create(key, products);
- }
- }
Your friend when writing the C# code is the yield return command for emitting data, and the Tuple.Create() command for creating the key/value pairs. The two commands enable one to return a key/value sequence; namely a .Net IEnumerable.
For completeness the code for the Product Mapper is:
Product Mapper
- class ProductVectorMapper : MapperBaseText<Types.ProductQuantity>
- {
- static int maxSize = 1024 * 1024;
- Dictionary<Tuple<int, int>, double> prodPairs = new Dictionary<Tuple<int, int>, double>(ProductVectorMapper.maxSize);
- // Converts an order Id to a string key
- static private string getProductKey(int productId)
- {
- return String.Format("{0:D}", productId);
- }
- // Parses an input line of format List<ProductQuantity>
- static private Types.ProductQuantityList deserialize(string input)
- {
- string[] keyValue = input.Split('\t');
- return Types.Helpers.ParseProductQuantityList(keyValue[1].Trim());
- }
- // calculates the pairs for an order
- static private IEnumerable<Tuple<T, T>> pairs<T>(List<T> items)
- {
- int count = items.Count;
- if (count == 2)
- {
- yield return Tuple.Create(items[0], items[1]);
- }
- else if (count > 2)
- {
- for (int idxOut = 0; idxOut <= (count - 2); idxOut++)
- {
- for (int idxIn = (idxOut + 1); idxIn <= (count - 1); idxIn++)
- {
- yield return Tuple.Create(items[idxOut], items[idxIn]);
- }
- }
- }
- }
- // Adds to the table
- private void addRow(Tuple<int, int> idx, double qty)
- {
- if (this.prodPairs.ContainsKey(idx))
- {
- this.prodPairs[idx] = this.prodPairs[idx] + qty;
- }
- else
- {
- this.prodPairs[idx] = qty;
- }
- }
- // Defines a sequence of the current pairs
- private IEnumerable<Tuple<string, Types.ProductQuantity>> currents()
- {
- foreach (var item in prodPairs)
- {
- int product1 = item.Key.Item1;
- int product2 = item.Key.Item2;
- yield return Tuple.Create(ProductVectorMapper.getProductKey(product1), new Types.ProductQuantity(product2, item.Value));
- }
- prodPairs.Clear();
- }
- /// Map the data from input name/value to output name/value
- public override IEnumerable<Tuple<string, Types.ProductQuantity>> Map(string value)
- {
- var products = ProductVectorMapper.pairs<Types.ProductQuantity>(ProductVectorMapper.deserialize(value));
- foreach (var product in products)
- {
- Types.ProductQuantity product1 = product.Item1;
- Types.ProductQuantity product2 = product.Item2;
- double qty = Math.Max(product1.Quantity, product2.Quantity);
- this.addRow(Tuple.Create(product1.ProductId, product2.ProductId), qty);
- this.addRow(Tuple.Create(product2.ProductId, product1.ProductId), qty);
- }
- if (prodPairs.Count > ProductVectorMapper.maxSize)
- {
- return currents();
- }
- else
- {
- return Enumerable.Empty<Tuple<string, Types.ProductQuantity>>();
- }
- }
- /// Output remaining Map items
- public override IEnumerable<Tuple<string, Types.ProductQuantity>> Cleanup()
- {
- return currents();
- }
- }
That of the Product Combiner is:
Product Combiner
- class ProductVectorCombiner : CombinerBase<Types.ProductQuantity>
- {
- /// Combine the data from input name/value to output name/value
- public override IEnumerable<Tuple<string, Types.ProductQuantity>> Combine(string key, IEnumerable<Types.ProductQuantity> values)
- {
- int maxSize = 10000; // Expected number of product correlations
- var products = new Dictionary<int, double>(maxSize);
- // Process the combiner input adding to the table
- foreach (var product in values)
- {
- if (products.ContainsKey(product.ProductId))
- {
- products[product.ProductId] = products[product.ProductId] + product.Quantity;
- }
- else
- {
- products[product.ProductId] = product.Quantity;
- }
- }
- // Return the combiner input
- return products.Select(item => Tuple.Create(key, new Types.ProductQuantity(item.Key, item.Value)));
- }
- }
And finally, that of the Product Reducer is:
Product Reducer
- class ProductVectorReducer : ReducerBase<Types.ProductQuantity, Types.ProductRecommendations>
- {
- /// Reduce the data from input name/value to output name/value
- public override IEnumerable<Tuple<string, Types.ProductRecommendations>> Reduce(string key, IEnumerable<Types.ProductQuantity> values)
- {
- // Configuration values
- double entryThreshold = 20.0; // Minimum correlations for matrix inclusion
- int matrixSize = 10000; // Expected Correlations for Hash Table init
- int minItem = Int32.MaxValue;
- int maxItem = 0;
- var rcTable = new Dictionary<int, double>(matrixSize);
- // Process the combiner input adding to the table
- foreach (var product in values)
- {
- int idx = product.ProductId;
- minItem = Math.Min(idx, minItem);
- maxItem = Math.Max(idx, maxItem);
- if (rcTable.ContainsKey(product.ProductId))
- {
- rcTable[product.ProductId] = rcTable[product.ProductId] + product.Quantity;
- }
- else
- {
- rcTable[product.ProductId] = product.Quantity;
- }
- }
- int offset = minItem;
- int size = maxItem + 1 - minItem;
- // Build the sparse vector
- var vector = new SparseVector(size);
- foreach (var item in rcTable)
- {
- if (item.Value > entryThreshold)
- {
- vector[item.Key - offset] = item.Value;
- }
- }
- var recommendations = new Types.ProductRecommendations(Int32.Parse(key), offset, vector);
- Context.IncrementCounter("PRODUCTS", "Recommendations Written");
- yield return Tuple.Create(key, recommendations);
- }
- }
A lot of the processing is based upon processing IEnumerable sequences. For this reason foreach and Linq expressions are used a lot. I could have defined a ForEach IEnumerable extension, to use Linq exclusively, but have preferred the C# syntax.
One difference in the final Reducer is how the Sparse Matrix is defined. In the F# code an F# extension allows one to build a SparseMatrix type by providing a sequence of index/value tuples. In the C# code one has to define an empty SparseMatrix and then interactively set the element values.
Note: There may be some confusion in the usage of the float type for .Net programmers, when switching between C# and F#, due to F#’s naming for floating-point number types. F# float type corresponds to System.Double in .NET (which is called double in C#), and F# float32 type corresponds to System.Single in .NET (which is called float in C#). For this reason I have switched to using the type double in F#, as it is another type alias for the System.Double type.
Of course the recommendations produced by the C# code are the same as the F# code, and clearly demonstrate that the Framework cleanly support writing MapReduce jobs in C#. I personally find the F# code more succinct and easier to read, but this is my personal preference.