Compartilhar via


Generics based Framework for .Net Hadoop MapReduce Job Submission

Over the past month I have been working on a framework to allow composition and submission of MapReduce jobs using .Net. I have put together two previous blog posts on this, so rather than put together a third on the latest change I thought I would create a final composite post. To understand why lets run through a quick version history of the code:

  1. Initial release where the values are treated as strings, and serialization was handled through Object.ToString()
  2. Made minor modifications to the submission APIs
  3. Modified the Reducer and Combiner types to allow In-Reducer optimizations through the ability to yield a Tuple of the key and value in the Cleanup
  4. Modified the Combiner and Reducer base classes such that data out of the mapper, in and out of the combiner, and in to the reducer uses a binary formatter; thus changing the base classes from strings to objects; meaning the classes can now cast to the expected type rather than performing string parsing
  5. Added support for multiple mapper keys; with supporting utilities

The latest change takes advantage of the fact the objects are serialized in Binary format. This change has allowed for the base abstract classes to move away from object based APIs to one based on Generics. This change hopefully greatly simplifies the creation of .Net MapReduce jobs.

As always to submit MapReduce jobs one can use the following command line syntax:

MSDN.Hadoop.Submission.Console.exe -input "mobile/data/debug/sampledata.txt" -output "mobile/querytimes/debug"
-mapper "MSDN.Hadoop.MapReduceFSharp.MobilePhoneQueryMapper, MSDN.Hadoop.MapReduceFSharp"
-reducer "MSDN.Hadoop.MapReduceFSharp.MobilePhoneQueryReducer, MSDN.Hadoop.MapReduceFSharp"
-file "%HOMEPATH%\MSDN.Hadoop.MapReduce\Release\MSDN.Hadoop.MapReduceFSharp.dll"

The mapper and reducer parameters are .Net types that derive from a base Map and Reduce abstract classes shown below. The input, output, and files options are analogous to the standard Hadoop streaming submissions. The mapper and reducer options (more on a combiner option later) allow one to define a .Net type derived from the appropriate abstract base classes. Under the covers standard Hadoop Streaming is being used, where controlling executables are used to handle the StdIn and StdOut operations and activating the required .Net types. The “file” parameter is required to specify the DLL for the .Net type to be loaded at runtime, in addition to any other required files.

As always the source can be downloaded from:

https://code.msdn.microsoft.com/Framework-for-Composing-af656ef7

Mapper and Reducer Base Classes

The following definitions outline the abstract base classes from which one needs to derive. Lets start with the C# definitions:

C# Abstract Classes

  1. namespace MSDN.Hadoop.MapReduceBase
  2. {
  3.     [Serializable]
  4.     [AbstractClass]
  5.     public abstract class MapReduceBase<V2>
  6.     {
  7.         protected MapReduceBase();
  8.  
  9.         public override IEnumerable<Tuple<string, V2>> Cleanup();
  10.         public override void Setup();
  11.     }
  12.  
  13.     [Serializable]
  14.     [AbstractClass]
  15.     public abstract class MapperBaseText<V2> : MapReduceBase<V2>
  16.     {
  17.         protected MapperBaseText();
  18.  
  19.         public abstract override IEnumerable<Tuple<string, V2>> Map(string value);
  20.     }
  21.  
  22.     [Serializable]
  23.     [AbstractClass]
  24.     public abstract class MapperBaseXml<V2> : MapReduceBase<V2>
  25.     {
  26.         protected MapperBaseXml();
  27.  
  28.         public abstract override IEnumerable<Tuple<string, V2>> Map(XElement element);
  29.     }
  30.  
  31.     [Serializable]
  32.     [AbstractClass]
  33.     public abstract class MapperBaseBinary<V2> : MapReduceBase<V2>
  34.     {
  35.         protected MapperBaseBinary();
  36.  
  37.         public abstract override IEnumerable<Tuple<string, V2>> Map(string filename, Stream document);
  38.     }
  39.  
  40.     [Serializable]
  41.     [AbstractClass]
  42.     public abstract class CombinerBase<V2> : MapReduceBase<V2>
  43.     {
  44.         protected CombinerBase();
  45.  
  46.         public abstract override IEnumerable<Tuple<string, V2>> Combine(string key, IEnumerable<V2> values);
  47.     }
  48.  
  49.     [Serializable]
  50.     [AbstractClass]
  51.     public abstract class ReducerBase<V2, V3> : MapReduceBase<V2>
  52.     {
  53.         protected ReducerBase();
  54.  
  55.         public abstract override IEnumerable<Tuple<string, V3>> Reduce(string key, IEnumerable<V2> values);
  56.     }
  57. }

The equivalent F# definitions are:

F# Abstract Classes

  1. namespace MSDN.Hadoop.MapReduceBase
  2.  
  3. [<AbstractClass>]
  4. type MapReduceBase<'V2>() =
  5.  
  6.     abstract member Setup: unit -> unit
  7.     default this.Setup() = ()
  8.  
  9.     abstract member Cleanup: unit -> IEnumerable<string * 'V2>
  10.     default this.Cleanup() = Seq.empty
  11.  
  12. [<AbstractClass>]
  13. type MapperBaseText<'V2>() =
  14.     inherit MapReduceBase<'V2>()
  15.  
  16.     abstract member Map: value:string -> IEnumerable<string * 'V2>
  17.  
  18. [<AbstractClass>]
  19. type MapperBaseXml<'V2>() =
  20.     inherit MapReduceBase<'V2>()
  21.  
  22.     abstract member Map: element:XElement -> IEnumerable<string * 'V2>
  23.  
  24. [<AbstractClass>]
  25. type MapperBaseBinary<'V2>() =
  26.     inherit MapReduceBase<'V2>()
  27.  
  28.     abstract member Map: filename:string -> document:Stream -> IEnumerable<string * 'V2>
  29.  
  30. [<AbstractClass>]
  31. type CombinerBase<'V2>() =
  32.     inherit MapReduceBase<'V2>()
  33.  
  34.     abstract member Combine: key:string -> values:IEnumerable<'V2> -> IEnumerable<string * 'V2>
  35.  
  36. [<AbstractClass>]
  37. type ReducerBase<'V2, 'V3>() =
  38.     inherit MapReduceBase<'V2>()
  39.  
  40.     abstract member Reduce: key:string -> values:IEnumerable<'V2> -> IEnumerable<string * 'V3>

The objective in defining these base classes was to not only support creating .Net Mapper and Reducers but also to provide a means for Setup and Cleanup operations to support In-Place Mapper/Combiner/Reducer optimizations, utilize IEnumerable and sequences for publishing data from all classes, and finally provide a simple submission mechanism analogous to submitting Java based jobs.

The usage of the Generic types V2 and V3 equate to the names used in the Java definitions. The current type of the input into the Mapper is a string (this normally being V1). This is needed as the mapper, in Streaming jobs, performs the projection from the textual input.

For each class a Setup function is provided to allow one to perform tasks related to the instantiation of the class. The Mapper’s Map and Cleanup functions return an IEnumerable consisting of tuples with a Key/Value pair. It is these tuples that represent the mappers output. The returned types are written to file using binary serialization.

The Combiner and Reducer takes in an IEnumerable, for each key, and reduces this into a key/value enumerable. Once again the Cleanup allows for return values, to allow for In-Reducer optimizations. 

Binary and XML Processing and Multiple Keys

As one can see from the abstract class definitions the framework also provides support for submitting jobs that support Binary and XML based Mappers. To support using Mappers derived from these types a “format” submission parameter is required. Supported values being Text, Binary, and XML; the default value being “Text”. 

To submit a binary streaming job one just has to use a Mapper derived from the MapperBaseBinary abstract class and use the binary format specification:

-format Binary

In this case the input into the Mapper will be a Stream object that represents a complete binary document instance.

To submit an XML streaming job one just has to use a Mapper derived from the MapperBaseXml abstract class and use the XML format specification, along with a node to be processed within the XML documents:

-format XML –nodename Node

In this case the input into the Mapper will be an XElement node derived from the XML document based on the nodename parameter.

Using multiple keys from the Mapper is a two-step process. Firstly the Mapper needs to be modified to output a string based key in the correct format. This is done by passing the set of string key values into the Utilities.FormatKeys() function. This concatenates the keys using the necessary tab character. Secondly, the job has to be submitted specifying the expected number of keys:

MSDN.Hadoop.Submission.Console.exe -input "stores/demographics" -output "stores/banking"
-mapper "MSDN.Hadoop.MapReduceFSharp.StoreXmlElementMapper, MSDN.Hadoop.MapReduceFSharp"
-reducer "MSDN.Hadoop.MapReduceFSharp.StoreXmlElementReducer, MSDN.Hadoop.MapReduceFSharp"
-file "%HOMEPATH%\Projects\MSDN.Hadoop.MapReduce\Release\MSDN.Hadoop.MapReduceFSharp.dll"
-nodename Store -format Xml -numberKeys 2

This parameter equates to the necessary Hadoop job configuration parameter.

Samples

To demonstrate the submission framework, here are some sample Mappers and Reducers with the corresponding command line submissions:

C# Mobile Phone Range (with In-Mapper optimization)

Calculates the mobile phone query time range for a device with an In-Mapper optimization yielding just the Min and Max values:

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5.  
  6. using MSDN.Hadoop.MapReduceBase;
  7.  
  8. namespace MSDN.Hadoop.MapReduceCSharp
  9. {
  10.     public class MobilePhoneRangeMapper : MapperBaseText<TimeSpan>
  11.     {
  12.         private Dictionary<string, Tuple<TimeSpan, TimeSpan>> ranges;
  13.  
  14.         private Tuple<string, TimeSpan> GetLineValue(string value)
  15.         {
  16.             try
  17.             {
  18.                 string[] splits = value.Split('\t');
  19.                 string devicePlatform = splits[3];
  20.                 TimeSpan queryTime = TimeSpan.Parse(splits[1]);
  21.                 return new Tuple<string, TimeSpan>(devicePlatform, queryTime);
  22.             }
  23.             catch (Exception)
  24.             {
  25.                 return null;
  26.             }
  27.         }
  28.  
  29.         public override void Setup()
  30.         {
  31.             this.ranges = new Dictionary<string, Tuple<TimeSpan, TimeSpan>>();
  32.         }
  33.  
  34.         public override IEnumerable<Tuple<string, TimeSpan>> Map(string value)
  35.         {
  36.             var range = GetLineValue(value);
  37.             if (range != null)
  38.             {
  39.                 if (ranges.ContainsKey(range.Item1))
  40.                 {
  41.                     var original = ranges[range.Item1];
  42.                     if (range.Item2 < original.Item1)
  43.                     {
  44.                         // Update Min amount
  45.                         ranges[range.Item1] = new Tuple<TimeSpan, TimeSpan>(range.Item2, original.Item2);
  46.                     }
  47.                     if (range.Item2 > original.Item2)
  48.                     {
  49.                         //Update Max amount
  50.                         ranges[range.Item1] = new Tuple<TimeSpan, TimeSpan>(original.Item1, range.Item2);
  51.                     }
  52.                 }
  53.                 else
  54.                 {
  55.                     ranges.Add(range.Item1, new Tuple<TimeSpan, TimeSpan>(range.Item2, range.Item2));
  56.                 }
  57.             }
  58.  
  59.             return Enumerable.Empty<Tuple<string, TimeSpan>>();
  60.         }
  61.  
  62.         public override IEnumerable<Tuple<string, TimeSpan>> Cleanup()
  63.         {
  64.             foreach (var range in ranges)
  65.             {
  66.                 yield return new Tuple<string, TimeSpan>(range.Key, range.Value.Item1);
  67.                 yield return new Tuple<string, TimeSpan>(range.Key, range.Value.Item2);
  68.             }
  69.         }
  70.     }
  71.  
  72.     public class MobilePhoneRangeReducer : ReducerBase<TimeSpan, Tuple<TimeSpan, TimeSpan>>
  73.     {
  74.         public override IEnumerable<Tuple<string, Tuple<TimeSpan, TimeSpan>>> Reduce(string key, IEnumerable<TimeSpan> value)
  75.         {
  76.             var baseRange = new Tuple<TimeSpan, TimeSpan>(TimeSpan.MaxValue, TimeSpan.MinValue);
  77.             var rangeValue = value.Aggregate(baseRange, (accSpan, timespan) =>
  78.                 new Tuple<TimeSpan, TimeSpan>((timespan < accSpan.Item1) ? timespan : accSpan.Item1, (timespan > accSpan.Item2) ? timespan : accSpan.Item2));
  79.  
  80.             yield return new Tuple<string, Tuple<TimeSpan, TimeSpan>>(key, rangeValue);
  81.         }
  82.     }
  83. }

MSDN.Hadoop.Submission.Console.exe -input "mobilecsharp/data" -output "mobilecsharp/querytimes"
-mapper "MSDN.Hadoop.MapReduceCSharp.MobilePhoneRangeMapper, MSDN.Hadoop.MapReduceCSharp"
-reducer "MSDN.Hadoop.MapReduceCSharp.MobilePhoneRangeReducer, MSDN.Hadoop.MapReduceCSharp"
-file "%HOMEPATH%\MSDN.Hadoop.MapReduceCSharp\Release\MSDN.Hadoop.MapReduceCSharp.dll"

C# Mobile Min (with Mapper, Combiner, Reducer)

Calculates the mobile phone minimum time for a device with a combiner yielding just the Min value:

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5.  
  6. using MSDN.Hadoop.MapReduceBase;
  7.  
  8. namespace MSDN.Hadoop.MapReduceCSharp
  9. {
  10.     public class MobilePhoneMinMapper : MapperBaseText<TimeSpan>
  11.     {
  12.         private Tuple<string, TimeSpan> GetLineValue(string value)
  13.         {
  14.             try
  15.             {
  16.                 string[] splits = value.Split('\t');
  17.                 string devicePlatform = splits[3];
  18.                 TimeSpan queryTime = TimeSpan.Parse(splits[1]);
  19.                 return new Tuple<string, TimeSpan>(devicePlatform, queryTime);
  20.             }
  21.             catch (Exception)
  22.             {
  23.                 return null;
  24.             }
  25.         }
  26.  
  27.         public override IEnumerable<Tuple<string, TimeSpan>> Map(string value)
  28.         {
  29.             var returnVal = GetLineValue(value);
  30.             if (returnVal != null) yield return returnVal;
  31.         }
  32.     }
  33.  
  34.     public class MobilePhoneMinCombiner : CombinerBase<TimeSpan>
  35.     {
  36.         public override IEnumerable<Tuple<string, TimeSpan>> Combine(string key, IEnumerable<TimeSpan> value)
  37.         {
  38.             yield return new Tuple<string, TimeSpan>(key, value.Min());
  39.         }
  40.     }
  41.  
  42.     public class MobilePhoneMinReducer : ReducerBase<TimeSpan, TimeSpan>
  43.     {
  44.         public override IEnumerable<Tuple<string, TimeSpan>> Reduce(string key, IEnumerable<TimeSpan> value)
  45.         {
  46.             yield return new Tuple<string, TimeSpan>(key, value.Min());
  47.         }
  48.     }
  49. }

MSDN.Hadoop.Submission.Console.exe -input "mobilecsharp/data" -output "mobilecsharp/querytimes"
-mapper "MSDN.Hadoop.MapReduceCSharp.MobilePhoneMinMapper, MSDN.Hadoop.MapReduceCSharp"
-reducer "MSDN.Hadoop.MapReduceCSharp.MobilePhoneMinReducer, MSDN.Hadoop.MapReduceCSharp"
-combiner "MSDN.Hadoop.MapReduceCSharp.MobilePhoneMinCombiner, MSDN.Hadoop.MapReduceCSharp"
-file "%HOMEPATH%\MSDN.Hadoop.MapReduceCSharp\Release\MSDN.Hadoop.MapReduceCSharp.dll"

F# Mobile Phone Query

Calculates the mobile phone range and average time for a device:

  1. namespace MSDN.Hadoop.MapReduceFSharp
  2.  
  3. open System
  4. open MSDN.Hadoop.MapReduceBase
  5.  
  6. type MobilePhoneQueryMapper() =
  7.     inherit MapperBaseText<TimeSpan>()
  8.  
  9.     // Performs the split into key/value
  10.     let splitInput (value:string) =
  11.         try
  12.             let splits = value.Split('\t')
  13.             let devicePlatform = splits.[3]
  14.             let queryTime = TimeSpan.Parse(splits.[1])
  15.             Some(devicePlatform, queryTime)
  16.         with
  17.         | :? System.ArgumentException -> None
  18.  
  19.     // Map the data from input name/value to output name/value
  20.     override self.Map (value:string) =
  21.         seq {
  22.             let result = splitInput value
  23.             if result.IsSome then
  24.                 yield result.Value
  25.         }
  26.  
  27. type MobilePhoneQueryReducer() =
  28.     inherit ReducerBase<TimeSpan, (TimeSpan*TimeSpan*TimeSpan)>()
  29.  
  30.     override self.Reduce (key:string) (values:seq<TimeSpan>) =
  31.         let initState = (TimeSpan.MaxValue, TimeSpan.MinValue, 0L, 0L)
  32.         let (minValue, maxValue, totalValue, totalCount) =
  33.             values |>
  34.             Seq.fold (fun (minValue, maxValue, totalValue, totalCount) value ->
  35.                 (min minValue value, max maxValue value, totalValue + (int64)(value.TotalSeconds), totalCount + 1L) ) initState
  36.  
  37.         Seq.singleton (key, (minValue, TimeSpan.FromSeconds((float)(totalValue/totalCount)), maxValue))

MSDN.Hadoop.Submission.Console.exe -input "mobile/data" -output "mobile/querytimes"
-mapper "MSDN.Hadoop.MapReduceFSharp.MobilePhoneQueryMapper, MSDN.Hadoop.MapReduceFSharp"
-reducer "MSDN.Hadoop.MapReduceFSharp.MobilePhoneQueryReducer, MSDN.Hadoop.MapReduceFSharp"
-file "%HOMEPATH%\MSDN.Hadoop.MapReduceFSharp\Release\MSDN.Hadoop.MapReduceFSharp.dll"

F# Store XML (XML in Samples)

Calculates the total revenue, within the store XML, based on demographic data; also demonstrating multiple keys:

  1. namespace MSDN.Hadoop.MapReduceFSharp
  2.  
  3. open System
  4. open System.Collections.Generic
  5. open System.Linq
  6. open System.IO
  7. open System.Text
  8. open System.Xml
  9. open System.Xml.Linq
  10.  
  11. open MSDN.Hadoop.MapReduceBase
  12.  
  13. type StoreXmlElementMapper() =    
  14.     inherit MapperBaseXml<decimal>()
  15.  
  16.     override self.Map (element:XElement) =
  17.  
  18.         let aw = "https://schemas.microsoft.com/sqlserver/2004/07/adventure-works/StoreSurvey"
  19.         let demographics = element.Element(XName.Get("Demographics")).Element(XName.Get("StoreSurvey", aw))
  20.  
  21.         seq {
  22.             if not(demographics = null) then
  23.                 let business = demographics.Element(XName.Get("BusinessType", aw)).Value
  24.                 let bank = demographics.Element(XName.Get("BankName", aw)).Value
  25.                 let key = Utilities.FormatKeys(business, bank)
  26.                 let sales = Decimal.Parse(demographics.Element(XName.Get("AnnualSales", aw)).Value)
  27.                 yield (key, sales)
  28.             }
  29.  
  30. type StoreXmlElementReducer() =
  31.     inherit ReducerBase<decimal, int>()
  32.  
  33.     override self.Reduce (key:string) (values:seq<decimal>) =
  34.         let totalRevenue =
  35.             values |> Seq.sum            
  36.  
  37.         Seq.singleton (key, int totalRevenue)

MSDN.Hadoop.Submission.Console.exe -input "stores/demographics" -output "stores/banking"
-mapper "MSDN.Hadoop.MapReduceFSharp.StoreXmlElementMapper, MSDN.Hadoop.MapReduceFSharp"
-reducer "MSDN.Hadoop.MapReduceFSharp.StoreXmlElementReducer, MSDN.Hadoop.MapReduceFSharp"
-file "%HOMEPATH%\MSDN.Hadoop.MapReduceFSharp\bin\Release\MSDN.Hadoop.MapReduceFSharp.dll"
-nodename Store -format Xml

F# Binary Document (Word and PDF Documents)

Calculates the pages per author for a combination of Office Word and PDF documents:

  1. namespace MSDN.Hadoop.MapReduceFSharp
  2.  
  3. open System
  4. open System.Collections.Generic
  5. open System.Linq
  6. open System.IO
  7. open System.Text
  8. open System.Xml
  9. open System.Xml.Linq
  10.  
  11. open DocumentFormat.OpenXml
  12. open DocumentFormat.OpenXml.Packaging
  13. open DocumentFormat.OpenXml.Wordprocessing
  14.  
  15. open iTextSharp.text
  16. open iTextSharp.text.pdf
  17.  
  18. open MSDN.Hadoop.MapReduceBase
  19.  
  20. type OfficePageMapper() =
  21.     inherit MapperBaseBinary<int>()
  22.  
  23.     let (|WordDocument|PdfDocument|UnsupportedDocument|) extension =
  24.         if String.Equals(extension, ".docx", StringComparison.InvariantCultureIgnoreCase) then
  25.             WordDocument
  26.         else if String.Equals(extension, ".pdf", StringComparison.InvariantCultureIgnoreCase) then
  27.             PdfDocument
  28.         else
  29.             UnsupportedDocument
  30.  
  31.     let dc = XNamespace.Get("https://purl.org/dc/elements/1.1/")
  32.     let cp = XNamespace.Get("https://schemas.openxmlformats.org/package/2006/metadata/core-properties")
  33.     let unknownAuthor = "unknown author"
  34.     let authorKey = "Author"
  35.  
  36.     let getAuthorsWord (document:WordprocessingDocument) =
  37.         let coreFilePropertiesXDoc = XElement.Load(document.CoreFilePropertiesPart.GetStream())
  38.           
  39.         // Take the first dc:creator element and split based on a ";"
  40.         let creators = coreFilePropertiesXDoc.Elements(dc + "creator")
  41.         if Seq.isEmpty creators then
  42.             [| unknownAuthor |]
  43.         else
  44.             let creator = (Seq.head creators).Value
  45.             if String.IsNullOrWhiteSpace(creator) then
  46.                 [| unknownAuthor |]
  47.             else
  48.                 creator.Split(';')
  49.  
  50.     let getPagesWord (document:WordprocessingDocument) =
  51.         // return page count
  52.         Int32.Parse(document.ExtendedFilePropertiesPart.Properties.Pages.Text)
  53.  
  54.     let getAuthorsPdf (document:PdfReader) =          
  55.         // For PDF documents perform the split on a ","
  56.         if document.Info.ContainsKey(authorKey) then
  57.             let creators = document.Info.[authorKey]
  58.             if String.IsNullOrWhiteSpace(creators) then
  59.                 [| unknownAuthor |]
  60.             else
  61.                 creators.Split(',')
  62.         else
  63.             [| unknownAuthor |]
  64.  
  65.     let getPagesPdf (document:PdfReader) =
  66.         // return page count
  67.         document.NumberOfPages
  68.  
  69.     // Map the data from input name/value to output name/value
  70.     override self.Map (filename:string) (document:Stream) =
  71.  
  72.         let result =
  73.             match Path.GetExtension(filename) with
  74.             | WordDocument ->
  75.                 // Get access to the word processing document from the input stream
  76.                 use document = WordprocessingDocument.Open(document, false)
  77.                 // Process the word document with the mapper
  78.                 let pages = getPagesWord document
  79.                 let authors = (getAuthorsWord document)
  80.                 // close document
  81.                 document.Close()
  82.                 Some(pages, authors)
  83.             | PdfDocument ->
  84.                 // Get access to the pdf processing document from the input stream
  85.                 let document = new PdfReader(document)
  86.                 // Process the pdf document with the mapper
  87.                 let pages = getPagesPdf document
  88.                 let authors = (getAuthorsPdf document)       
  89.                 // close document
  90.                 document.Close()
  91.                 Some(pages, authors)
  92.             | UnsupportedDocument ->
  93.                 None
  94.  
  95.         if result.IsSome then
  96.             snd result.Value
  97.             |> Seq.map (fun author -> (author, fst result.Value))
  98.         else
  99.             Seq.empty
  100.  
  101. type OfficePageReducer() =
  102.     inherit ReducerBase<int, int>()
  103.  
  104.     override self.Reduce (key:string) (values:seq<int>) =
  105.         let totalPages =
  106.             values |> Seq.sum
  107.  
  108.         Seq.singleton (key, totalPages)

      

MSDN.Hadoop.Submission.Console.exe -input "office/documents" -output "office/authors"
-mapper "MSDN.Hadoop.MapReduceFSharp.OfficePageMapper, MSDN.Hadoop.MapReduceFSharp"
-reducer "MSDN.Hadoop.MapReduceFSharp.OfficePageReducer, MSDN.Hadoop.MapReduceFSharp"
-combiner "MSDN.Hadoop.MapReduceFSharp.OfficePageReducer, MSDN.Hadoop.MapReduceFSharp"
-file "%HOMEPATH%\MSDN.Hadoop.MapReduceFSharp\bin\Release\MSDN.Hadoop.MapReduceFSharp.dll"
-file "C:\Reference Assemblies\itextsharp.dll" -format Binary

Optional Parameters

To support some additional Hadoop Streaming options a few optional parameters are supported.

-numberReducers X

As expected this specifies the maximum number of reducers to use.

-debug

The option turns on verbose mode and specifies a job configuration to keep failed task outputs.

To view the the supported options one can use a help parameters, displaying:

Command Arguments:
-input (Required=true) : Input Directory or Files
-output (Required=true) : Output Directory
-mapper (Required=true) : Mapper Class
-reducer (Required=true) : Reducer Class
-combiner (Required=false) : Combiner Class (Optional)
-format (Required=false) : Input Format |Text(Default)|Binary|Xml|
-numberReducers (Required=false) : Number of Reduce Tasks (Optional)
-numberKeys (Required=false) : Number of MapReduce Keys (Optional)
-file (Required=true) : Processing Files (Must include Map and Reduce Class files)
-nodename (Required=false) : XML Processing Nodename (Optional)
-debug (Required=false) : Turns on Debugging Options

UI Submission

The provided submission framework works from a command-line. However there is nothing to stop one submitting the job using a UI; albeit a command console is opened. To this end I have put together a simple UI that supports submitting Hadoop jobs. 

image

This simple UI supports all the necessary options for submitting jobs.

Code Download

As mentioned the actual Executables and Source code can be downloaded from:

https://code.msdn.microsoft.com/Framework-for-Composing-af656ef7

The source includes, not only the .Net submission framework, but also all necessary Java classes for supporting the Binary and XML job submissions. This relies on a custom Streaming JAR which should be copied to the Hadoop lib directory, there are two versions of the Streaming jar; one for running in azure and one for when running local. The difference is that they have been compiled with different versions of the Java compiler. Just remember to use the appropriate version (dropping the –local and –azure prefixes) when copying to your Hadoop lib folder.

To use the code one just needs to reference the EXE’s in the Release directory. This folder also contains the MSDN.Hadoop.MapReduceBase.dll that contains the abstract base class definitions.

Moving Forward

In a separate post I will cover what is actually happening under the covers.

As always if you find the code useful and/or use this for your MapReduce jobs, or just have some comments, please do let me know.

Comments

  • Anonymous
    May 03, 2012
    Why use this or even write hadoop jobs with java when there is pig. More than %80 of hadoop jobs are being run with pig scripts in large corps.
  • Anonymous
    May 03, 2012
    I do like Pig, but not everything that can be done with MapReduce can be achieved with Pig.
  • Anonymous
    May 23, 2012
    I wanted to use the dll for passing a job to hadoop.But how will I pass the credentials..login and password for hadoop.like how will the job in my account??
  • Anonymous
    May 23, 2012
    For .eg before i used to login to hadooponazure and create a job using C# mapper and reducer.But now I can pass the mapper and reducer class but how will I pass the credential for my hadooponazure account?
  • Anonymous
    May 24, 2012
    Currently I have not done anything around hooking up a RunAs for the command options, but this would be simple enough.Currently it will run under the account the submission job runs under.
  • Anonymous
    May 29, 2012
    Hi Carl,Along with multiple mapper, can we expect support of multiple combiner in the first release.
  • Anonymous
    May 31, 2012
    At the moment one can only specify the number of reducers.
  • Anonymous
    June 04, 2012
    Thanks Carl for the info.