MapReduce Tester: A Quick Word
In my previous post I talked a little about testing the Hadoop Streaming F# MapReduce code; but it is worth saying a few words about the tester application.
The complete code for this blog post and the F# MapReduce code can be found at:
https://code.msdn.microsoft.com/Hadoop-Streaming-and-F-f2e76850
As mentioned Unit Testing the individual map and Reduce functions is relatively straight forward. However performing testing on sample input data is a little trickier. As such I put together a tester application that performs the following functions:
- Defines and executes a Process for the Mapper executable in which
- The StdIn is modified to be the file specified in the “–input” command line argument
- the StdOut is modified to be a file with a “mapper” extension
- When the Mapper has completed, Sorts the output file from the Mapper into a file with a “reducer” extension
- When the Sort is complete, defines and executes a Process for the Reducer executable in which
- The StdIn is modified to be the sorted “reducer” file
- The StdOut is modified to be the file specified in the “–output” command line argument
Running the Tester application allows one to check inputs and outputs, in a flow similar to running within Hadoop. The code listing for the tester application is:
namespace FSharp.HadoopTester
open System
open System.IO
open System.Collections.Generic
open System.Diagnostics
open System.Threading
open System.Threading.Tasks
open FSharp.Hadoop.Utilities
open FSharp.Hadoop.Utilities.Arguments
module MapReduceConsole =
let Run args =
// Define what arguments are expected
let defs = [
{ArgInfo.Command="input"; Description="Input File"; Required=true };
{ArgInfo.Command="output"; Description="Output File"; Required=true };
{ArgInfo.Command="tempPath"; Description="Temp File Path"; Required=true };
{ArgInfo.Command="mapper"; Description="Mapper EXE"; Required=true };
{ArgInfo.Command="reducer"; Description="Reducer EXE"; Required=true }; ]
// Parse Arguments into a Dictionary
let parsedArgs = Arguments.ParseArgs args defs
Arguments.DisplayArgs parsedArgs
// define the executables
let mapperExe = Path.GetFullPath(parsedArgs.["mapper"])
let reducerExe = Path.GetFullPath(parsedArgs.["reducer"])
Console.WriteLine()
Console.WriteLine (sprintf "The Mapper file is:\t%O" mapperExe)
Console.WriteLine (sprintf "The Reducer file is:\t%O" reducerExe)
// Get the file names
let inputfile = Path.GetFullPath(parsedArgs.["input"])
let outputfile = Path.GetFullPath(parsedArgs.["output"])
let tempPath = Path.GetFullPath(parsedArgs.["tempPath"])
let tempFile = Path.Combine(tempPath, Path.GetFileName(outputfile))
let mappedfile = Path.ChangeExtension(tempFile, "mapped")
let reducefile = Path.ChangeExtension(tempFile, "reduced")
Console.WriteLine()
Console.WriteLine (sprintf "The input file is:\t\t%O" inputfile)
Console.WriteLine (sprintf "The mapped temp file is:\t%O" mappedfile)
Console.WriteLine (sprintf "The reduced temp file is:\t%O" reducefile)
Console.WriteLine (sprintf "The output file is:\t\t%O" outputfile)
// Give the user an option to continue
Console.WriteLine()
Console.WriteLine("Hit ENTER to continue...")
Console.ReadLine() |> ignore
// Call the mapper with the input file
let mapperProcess() =
use mapper = new Process()
mapper.StartInfo.FileName <- mapperExe
mapper.StartInfo.UseShellExecute <- false
mapper.StartInfo.RedirectStandardInput <- true
mapper.StartInfo.RedirectStandardOutput <- true
mapper.Start() |> ignore
use mapperInput = mapper.StandardInput
use mapperOutput = mapper.StandardOutput
// Map the reader to a background thread so processing can happen in parallel
Console.WriteLine "Mapper Processing Starting..."
let taskMapperFunc() =
use mapperWriter = File.CreateText(mappedfile)
while not mapperOutput.EndOfStream do
mapperWriter.WriteLine(mapperOutput.ReadLine())
let taskMapperWriting = Task.Factory.StartNew(Action(taskMapperFunc))
// Pass the file into the mapper process and close input stream when done
use mapperReader = new StreamReader(File.OpenRead(inputfile))
while not mapperReader.EndOfStream do
mapperInput.WriteLine(mapperReader.ReadLine())
mapperInput.Close()
taskMapperWriting.Wait()
mapperOutput.Close()
mapper.WaitForExit()
let result = match mapper.ExitCode with | 0 -> true | _ -> false
mapper.Close()
result
// Sort the mapped file by the first field - mimic the role of Hadoop
let hadoopProcess() =
Console.WriteLine "Hadoop Processing Starting..."
let unsortedValues = seq {
use reader = new StreamReader(File.OpenRead(mappedfile))
while not reader.EndOfStream do
let input = reader.ReadLine()
let keyValue = input.Split('\t')
yield (keyValue.[0].Trim(), keyValue.[1].Trim())
reader.Close()
}
use writer = File.CreateText(reducefile)
unsortedValues
|> Seq.sortBy fst
|> Seq.iter (fun (key, value) -> writer.WriteLine (sprintf "%O\t%O" key value))
writer.Close()
// Finally call the reducer process
let reducerProcess() =
use reducer = new Process()
reducer.StartInfo.FileName <- reducerExe
reducer.StartInfo.UseShellExecute <- false
reducer.StartInfo.RedirectStandardInput <- true
reducer.StartInfo.RedirectStandardOutput <- true
reducer.Start() |> ignore
use reducerInput = reducer.StandardInput
use reducerOutput = reducer.StandardOutput
// Map the reader to a background thread so processing can happen in parallel
Console.WriteLine "Reducer Processing Starting..."
let taskReducerFunc() =
use reducerWriter = File.CreateText(outputfile)
while not reducerOutput.EndOfStream do
reducerWriter.WriteLine(reducerOutput.ReadLine())
let taskReducerWriting = Task.Factory.StartNew(Action(taskReducerFunc))
// Pass the file into the mapper process and close input stream when done
use reducerReader = new StreamReader(File.OpenRead(reducefile))
while not reducerReader.EndOfStream do
reducerInput.WriteLine(reducerReader.ReadLine())
reducerInput.Close()
taskReducerWriting.Wait()
reducerOutput.Close()
reducer.WaitForExit()
let result = match reducer.ExitCode with | 0 -> true | _ -> false
reducer.Close()
result
// Finish test
if mapperProcess() then
Console.WriteLine "Mapper Processing Complete..."
hadoopProcess()
Console.WriteLine "Hadoop Processing Complete..."
if reducerProcess() then
Console.WriteLine "Reducer Processing Complete..."
Console.WriteLine "Processing Complete..."
Console.ReadLine() |> ignore
The input options for the console tester application are:
- input – Used to specify the input file for processing
- output – The file to be used to save the results
- tempPath – A directory used to save intermediate results and for sorting
- mapper – The mapper executable to run
- reducer – The reducer executable to run
To run the mapper and reducer A Process is defined. The ProcessStartInfo is defined such that both the Standard Input and Outputs are redirected.
To input data into the mapper one just has to open the file and pass into the mapper StandardInput.
use mapperInput = mapper.StandardInput
use mapperReader = new StreamReader(File.OpenRead(inputfile))
while not mapperReader.EndOfStream do
mapperInput.WriteLine(mapperReader.ReadLine())
The important thing to remember is that one needs to process the StandardOuput from the mapper on a separate thread. This is achieved using a Task:
use mapperOutput = mapper.StandardOutput
let taskMapperFunc() =
use mapperWriter = File.CreateText(mappedfile)
while not mapperOutput.EndOfStream do
mapperWriter.WriteLine(mapperOutput.ReadLine())
let taskMapperWriting = Task.Factory.StartNew(Action(taskMapperFunc))
Before waiting for the mapper executable to exit one will need to Close() the mapper input stream, ensure that the task processing the StandardOuput is completed, and finally Close() the mapper output stream.
mapperInput.Close()
taskMapperWriting.Wait()
mapperOutput.Close()
mapper.WaitForExit()
let result = match mapper.ExitCode with | 0 -> true | _ -> false
mapper.Close()
In the input arguments a temp directory is specified. It is this directory that is used to save the output of the mapper. It is this file that is then sorted using the key value:
let unsortedValues = seq {
use reader = new StreamReader(File.OpenRead(mappedfile))
while not reader.EndOfStream do
let input = reader.ReadLine()
let keyValue = input.Split('\t')
yield (keyValue.[0].Trim(), keyValue.[1].Trim())
reader.Close()
}
use writer = File.CreateText(reducefile)
unsortedValues
|> Seq.sortBy fst
|> Seq.iter (fun (key, value) -> writer.WriteLine (sprintf "%O\t%O" key value))
writer.Close()
Finally the output from the sort process is passed into the reducer. The process for calling the reducer executable is very similar to that of calling the mapper executable; including ensureing that StandardOuput is processed on a separate processing thread.
In writing the tester application the key factors in getting the processing working are to remember to process the StandardOutput on a separate processing thread and ensure the streams are closed in the correct order so one can determine the outcome of the mapper and reducer executable calls. Other than this the a tester for any MapReduce should be easy to write.