Hadoop Binary Streaming and F# MapReduce
As mentioned in my previous post Hadoop Streaming not only supports text streaming, but it also supports Binary Streaming. As such I wanted to put together a sample that supports processing Office documents; more on support for PDF in a later post. As always the code can be downloaded from:
https://code.msdn.microsoft.com/Hadoop-Streaming-and-F-f2e76850
Putting together this sample involved a bit more work than the text streaming case as one needed to put together an implementation of the Java classes to support binary streaming; namely FileInputFormat and RecordReader. The purpose of these implementations is to support Binary Streaming of the document such that it is not split on the usual line boundaries. More on the Java code later.
The implemented Java classes are written, with a key value type pairing of <Text, BytesWritable>, which writes out the data for the mapper in the following format:
- Filename
- Tab character
- UTF8 Encoded Document
- Linefeed character
The Mapper code will get called with this format for each document in the specified input directories.
Map and Reduce Classes
The goal of the sample code is to support a Map and Reduce with the following prototypes:
Map : WordprocessingDocument –> seq<string * obj>
Reduce : string -> seq<string> -> obj option
The idea is the Mapper takes in a Word document and projects this into a sequence of keys and values. The Reducer, as in the text streaming case, takes in a key value pair and returns an optional reduced value.
The use of the obj type is, as in the text streaming case, to support serializing the output values.
As an example, I have put together a MapReduce to process Word documents, where the word document is mapped into the number of pages per author and where multiple authors are credited with the same number of pages.
The sample Mapper code is as follows:
module OfficeWordPageMapper =
let dc = XNamespace.Get("https://purl.org/dc/elements/1.1/")
let cp = XNamespace.Get("https://schemas.openxmlformats.org/package/2006/metadata/core-properties")
let unknownAuthor = "unknown author"
let getAuthors (document:WordprocessingDocument) =
let coreFilePropertiesXDoc = XElement.Load(document.CoreFilePropertiesPart.GetStream());
// Take the first dc:creator element and split based on a ";"
let creators = coreFilePropertiesXDoc.Elements(dc + "creator")
if Seq.isEmpty creators then
[| unknownAuthor |]
else
let creator = (Seq.head creators).Value
if String.IsNullOrWhiteSpace(creator) then
[| unknownAuthor |]
else
creator.Split(';')
let getPages (document:WordprocessingDocument) =
// return page count
Int32.Parse(document.ExtendedFilePropertiesPart.Properties.Pages.Text)
// Map the data from input name/value to output name/value
let Map (document:WordprocessingDocument) =
let pages = getPages document
(getAuthors document)
|> Seq.map (fun author -> (author, pages))
As you can see the majority code is needed to pull out the document properties. If one wanted to process the words within the document one would use the following code:
document.MainDocumentPart.Document.Body.InnerText
To run this code it is worth noting there is a dependency on installing the Open XML SDK 2.0 for Microsoft Office.
Finally, the Reducer code is as follows:
module OfficePageReducer =
let Reduce (key:string) (values:seq<string>) =
let totalPages =
values |>
Seq.fold (fun pages value -> pages + Int32.Parse(value)) 0
Some(totalPages)
Again, as in the text streaming application, both the mapper and the reducer are executables that read the input from StdIn and emit the output to StdOut. In the case of the mapper the console application will need to do multiple Console.ReadByte() calls to get the data, and then perform a Console.WriteLine() to emit the output. The reducer will do a Console.ReadLine() to get the data, and perform a Console.WriteLine() to emit the output.
The previous post covers the schematics of creating console applications for F#; so I will not cover this again but assume the same program structure.
Mapper Executable
As previously mentioned the purpose of the Mapper code is to perform Input Format Parsing, Projection, and Filtering. In the Mapper for a Word document the bytes are used to create a WordprocessingDocument, with invalid documents ignored, these are then projected into a key/value sequence using the Map function:
module Controller =
let Run (args:string array) =
// Define what arguments are expected
let defs = [
{ArgInfo.Command="input"; Description="Input File"; Required=false };
{ArgInfo.Command="output"; Description="Output File"; Required=false } ]
// Parse Arguments into a Dictionary
let parsedArgs = Arguments.ParseArgs args defs
// Ensure Standard Input/Output and allow for debug configuration
let builder = new StringBuilder()
let encoder = Text.UTF8Encoding()
use reader =
if parsedArgs.ContainsKey("input") then
builder.Append(Path.GetFileName(parsedArgs.["input"])) |> ignore
File.Open(Path.GetFullPath(parsedArgs.["input"]), FileMode.Open) :> Stream
else
let stream = new MemoryStream()
// Ignore bytes until one hits a tab
let rec readTab() =
let inByte = Console.OpenStandardInput().ReadByte()
if inByte <> 0x09 then
builder.Append(encoder.GetString([| (byte)inByte |])) |> ignore
readTab()
readTab()
// Copy the rest of the stream and truncate the last linefeed char
Console.OpenStandardInput().CopyTo(stream)
if (stream.Length > 0L) then
stream.SetLength(stream.Length - 1L)
stream.Position <- 0L
stream :> Stream
use writer =
if parsedArgs.ContainsKey("output") then
new StreamWriter(Path.GetFullPath(parsedArgs.["output"]))
else
new StreamWriter(Console.OpenStandardOutput(), AutoFlush = true)
let filename = builder.ToString()
// Combine the name/value output into a string
let outputCollector (outputKey, outputValue) =
let output = sprintf "%s\t%O" outputKey outputValue
writer.WriteLine(output)
// Check we do not have a null document
if (reader.Length > 0L) then
try
// Get access to the word processing document from the input stream
use document = WordprocessingDocument.Open(reader, false)
// Process the word document with the mapper
OfficeWordPageMapper.Map document
|> Seq.iter (fun value -> outputCollector value)
// close document
document.Close()
with
| :? System.IO.FileFormatException ->
// Ignore invalid files formats
()
// Close the streams
reader.Close()
writer.Close()
There are a few things of note in the code.
When pulling out the filename of the document that is being processed, it is assumed that UTF8 encoding has been used and that a Tab character is used as a delimiter between the filename and the documents bytes.
In building the Stream that is to be used for creating the WordprocessingDocument a MemoryStream is used. There are several reasons for this. Firstly one needs to remove the last Newline character from the Stream, and secondly when creating a WordprocessingDocument a Stream is needed that supports Seek operations; unfortunately this is not the case for StdIn.
At the moment the code does not use the Filename. However in future posts I will extend the code to also support processing PDF documents.
Reducer Executable
After running the Mapper, the data being parsed into the Reducer will be a key/value pair delimited with a Tab. Using the above Map, a sample input dataset for the Reducer would be:
Brad Sarsfield 44
Carl Nolan 1
Marie West 1
Carl Nolan 9
Thus in this case, the code for the Reducer will be the same as in the text streaming case:
module Controller =
let Run (args:string array) =
// Define what arguments are expected
let defs = [
{ArgInfo.Command="input"; Description="Input File"; Required=false };
{ArgInfo.Command="output"; Description="Output File"; Required=false } ]
// Parse Arguments into a Dictionary
let parsedArgs = Arguments.ParseArgs args defs
// Ensure Standard Input/Output and allow for debug configuration
use reader =
if parsedArgs.ContainsKey("input") then
new StreamReader(Path.GetFullPath(parsedArgs.["input"]))
else
new StreamReader(Console.OpenStandardInput())
use writer =
if parsedArgs.ContainsKey("output") then
new StreamWriter(Path.GetFullPath(parsedArgs.["output"]))
else
new StreamWriter(Console.OpenStandardOutput(), AutoFlush = true)
// Combine the name/value output into a string
let outputCollector outputKey outputValue =
let output = sprintf "%s\t%O" outputKey outputValue
writer.WriteLine(output)
// Read the next line of the input stream
let readLine() =
reader.ReadLine()
// Parse the input into the required name/value pair
let parseLine (input:string) =
let keyValue = input.Split('\t')
(keyValue.[0].Trim(), keyValue.[1].Trim())
// Converts a input line into an option
let getInput() =
let input = readLine()
if not(String.IsNullOrWhiteSpace(input)) then
Some(parseLine input)
else
None
// Creates a sequence of the input based on the provided key
let lastInput = ref None
let continueDo = ref false
let inputsByKey key firstValue = seq {
// Yield any value from previous read
yield firstValue
continueDo := true
while !continueDo do
match getInput() with
| Some(input) when (fst input) = key ->
// Yield found value and remainder of sequence
yield (snd input)
| Some(input) ->
// Have a value but different key
lastInput := Some(fst input, snd input)
continueDo := false
| None ->
// Have no more entries
lastInput := None
continueDo := false
}
// Controls the calling of the reducer
let rec processInput (input:(string*string) option) =
if input.IsSome then
let key = fst input.Value
let value = OfficePageReducer.Reduce key (inputsByKey key (snd input.Value))
if value.IsSome then
outputCollector key value.Value
if lastInput.contents.IsSome then
processInput lastInput.contents
processInput (getInput())
Once run, the reduced output would be:
Brad Sarsfield 44
Carl Nolan 10
Marie West 1
Once again the only complexity in the code is the fact the Seq.groupBy function cannot be used. Also, as in the text streaming case there is a fair amount of code controlling the input and output streams for calling the Map and Reduce functions, that can be reused for all Hadoop Binary Streaming jobs.
Running the Executables
In the case of Binary Streaming the command parameters are a little different to the text streaming case:
C:\Apps\dist\hadoop.cmd jar lib/hadoop-streaming-ms.jar
-input "/office/documents"
-output "/office/authors"
-mapper "..\..\jars\FSharp.Hadoop.MapperOffice.exe"
-combiner "..\..\jars\FSharp.Hadoop.ReducerOffice.exe"
-reducer "..\..\jars\FSharp.Hadoop.ReducerOffice.exe"
-file "C:\bin\Release\FSharp.Hadoop.MapperOffice.exe"
-file "C:\bin\Release\FSharp.Hadoop.ReducerOffice.exe"
-file "C:\bin\Release\FSharp.Hadoop.MapReduce.dll"
-file "C:\bin\Release\FSharp.Hadoop.Utilities.dll"
-inputformat com.microsoft.hadoop.mapreduce.lib.input.BinaryDocumentWithNameInputFormat
The first difference is the use of the hadoop-streaming-ms.jar. This file contains the InputFormat class specified by the inputformat parameter; the later Java Classes section discusses how this is created. This is needed to support Binary Streaming.
One other difference to my previous text streaming case is the use of the Reducer class as a Combiner. As the output types from the mapper are the same as the reducer then this is possible.
Tester Application
For completeness I have included the base code for the tester application; albeit the full code is included in the download. The code is very similar to the tester application mentioned in my previous post, with the exception of how the mapper executable is called:
module MapReduceConsole =
let Run args =
// Define what arguments are expected
let defs = [
{ArgInfo.Command="input"; Description="Input File"; Required=true };
{ArgInfo.Command="output"; Description="Output File"; Required=true };
{ArgInfo.Command="tempPath"; Description="Temp File Path"; Required=true };
{ArgInfo.Command="mapper"; Description="Mapper EXE"; Required=true };
{ArgInfo.Command="reducer"; Description="Reducer EXE"; Required=true }; ]
// Parse Arguments into a Dictionary
let parsedArgs = Arguments.ParseArgs args defs
Arguments.DisplayArgs parsedArgs
// define the executables
let mapperExe = Path.GetFullPath(parsedArgs.["mapper"])
let reducerExe = Path.GetFullPath(parsedArgs.["reducer"])
Console.WriteLine()
Console.WriteLine (sprintf "The Mapper file is:\t%O" mapperExe)
Console.WriteLine (sprintf "The Reducer file is:\t%O" reducerExe)
// Get the file names
let inputpath = Path.GetFullPath(parsedArgs.["input"])
let outputfile = Path.GetFullPath(parsedArgs.["output"])
let tempPath = Path.GetFullPath(parsedArgs.["tempPath"])
let tempFile = Path.Combine(tempPath, Path.GetFileName(outputfile))
let mappedfile = Path.ChangeExtension(tempFile, "mapped")
let reducefile = Path.ChangeExtension(tempFile, "reduced")
Console.WriteLine()
Console.WriteLine (sprintf "The input path is:\t\t%O" inputpath)
Console.WriteLine (sprintf "The mapped temp file is:\t%O" mappedfile)
Console.WriteLine (sprintf "The reduced temp file is:\t%O" reducefile)
Console.WriteLine (sprintf "The output file is:\t\t%O" outputfile)
// Give the user an option to continue
Console.WriteLine()
Console.WriteLine("Hit ENTER to continue...")
Console.ReadLine() |> ignore
let CHUNK = 1024
let buffer:byte array = Array.zeroCreate CHUNK
// Call the mapper with the input file
let mapperProcessLoop inputfile =
use mapper = new Process()
mapper.StartInfo.FileName <- mapperExe
mapper.StartInfo.UseShellExecute <- false
mapper.StartInfo.RedirectStandardInput <- true
mapper.StartInfo.RedirectStandardOutput <- true
mapper.Start() |> ignore
use mapperInput = mapper.StandardInput.BaseStream
use mapperOutput = mapper.StandardOutput
// Map the reader to a background thread so processing can happen in parallel
let taskMapperFunc() =
use mapperWriter = File.AppendText(mappedfile)
while not mapperOutput.EndOfStream do
mapperWriter.WriteLine(mapperOutput.ReadLine())
let taskMapperWriting = Task.Factory.StartNew(Action(taskMapperFunc))
// Pass the file into the mapper process and close input stream when done
use mapperReader = new BinaryReader(File.OpenRead(inputfile))
let rec readLoop() =
let bytesread = mapperReader.Read(buffer, 0, CHUNK)
if bytesread > 0 then
mapperInput.Write(buffer, 0, bytesread)
readLoop()
// Write out a Filename/Tab/Document/LineFeed
let filename = Path.GetFileName(inputfile)
let encoding = Text.UTF8Encoding();
mapperInput.Write(encoding.GetBytes(filename), 0, encoding.GetByteCount(filename))
mapperInput.Write([| 0x09uy |], 0, 1)
readLoop()
mapperInput.Write([| 0x0Auy |], 0, 1)
mapperInput.Close()
taskMapperWriting.Wait()
mapperOutput.Close()
mapper.WaitForExit()
let result = match mapper.ExitCode with | 0 -> true | _ -> false
mapper.Close()
result
let mapperProcess() =
Console.WriteLine "Mapper Processing Starting..."
// Create the output file
if File.Exists(mappedfile) then File.Delete(mappedfile)
use mapperWriter = File.CreateText(mappedfile)
mapperWriter.Close()
// function to determine if valid document extension
let isValidFile inputfile =
if String.Equals(Path.GetExtension(inputfile), ".docx", StringComparison.InvariantCultureIgnoreCase) ||
String.Equals(Path.GetExtension(inputfile), ".pdf", StringComparison.InvariantCultureIgnoreCase) then
true
else
false
// Process the files in the directory
Directory.GetFiles(inputpath)
|> Array.filter isValidFile
|> Array.fold (fun result inputfile -> result && (mapperProcessLoop inputfile)) true
// Sort the mapped file by the first field - mimic the role of Hadoop
let hadoopProcess() =
Console.WriteLine "Hadoop Processing Starting..."
let unsortedValues = seq {
use reader = new StreamReader(File.OpenRead(mappedfile))
while not reader.EndOfStream do
let input = reader.ReadLine()
let keyValue = input.Split('\t')
yield (keyValue.[0].Trim(), keyValue.[1].Trim())
reader.Close()
}
use writer = File.CreateText(reducefile)
unsortedValues
|> Seq.sortBy fst
|> Seq.iter (fun (key, value) -> writer.WriteLine (sprintf "%O\t%O" key value))
writer.Close()
// Finally call the reducer process
let reducerProcess() =
use reducer = new Process()
reducer.StartInfo.FileName <- reducerExe
reducer.StartInfo.UseShellExecute <- false
reducer.StartInfo.RedirectStandardInput <- true
reducer.StartInfo.RedirectStandardOutput <- true
reducer.Start() |> ignore
use reducerInput = reducer.StandardInput
use reducerOutput = reducer.StandardOutput
// Map the reader to a background thread so processing can happen in parallel
Console.WriteLine "Reducer Processing Starting..."
let taskReducerFunc() =
use reducerWriter = File.CreateText(outputfile)
while not reducerOutput.EndOfStream do
reducerWriter.WriteLine(reducerOutput.ReadLine())
let taskReducerWriting = Task.Factory.StartNew(Action(taskReducerFunc))
// Pass the file into the mapper process and close input stream when done
use reducerReader = new StreamReader(File.OpenRead(reducefile))
while not reducerReader.EndOfStream do
reducerInput.WriteLine(reducerReader.ReadLine())
reducerInput.Close()
taskReducerWriting.Wait()
reducerOutput.Close()
reducer.WaitForExit()
let result = match reducer.ExitCode with | 0 -> true | _ -> false
reducer.Close()
result
// Finish test
if mapperProcess() then
Console.WriteLine "Mapper Processing Complete..."
hadoopProcess()
Console.WriteLine "Hadoop Processing Complete..."
if reducerProcess() then
Console.WriteLine "Reducer Processing Complete..."
Console.WriteLine "Processing Complete..."
Console.ReadLine() |> ignore
When calling the mapper it is no longer the case that the mapper executable is called once, with each line being redirected into the executables StdIn. In the binary case the mapper executable is called once for each document, where for each document the data is then redirected into the executables StdIn. As mentioned the format defined is the filename, delimitated with a Tab, followed by the documents bytes, and finally the Newline character:
// Pass the file into the mapper process and close input stream when done
use mapperReader = new BinaryReader(File.OpenRead(inputfile))
let rec readLoop() =
let bytesread = mapperReader.Read(buffer, 0, CHUNK)
if bytesread > 0 then
mapperInput.Write(buffer, 0, bytesread)
readLoop()
// Write out a Filename/Tab/Document/LineFeed
let filename = Path.GetFileName(inputfile)
let encoding = Text.UTF8Encoding();
mapperInput.Write(encoding.GetBytes(filename), 0, encoding.GetByteCount(filename))
mapperInput.Write([| 0x09uy |], 0, 1)
readLoop()
mapperInput.Write([| 0x0Auy |], 0, 1)
This code is code for each document found in the directory specified in the input argument.
The previous post discussed the threading and stream processing needed for testing in a little more detail.
Java Classes
To complete the post here is the full listing for the Java class implementations:
package com.microsoft.hadoop.mapreduce.lib.input;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.JobContext;
/**
* Reads complete documents in Binary format.
*/
public class BinaryDocumentWithNameInputFormat
extends FileInputFormat<Text, BytesWritable> {
public BinaryDocumentWithNameInputFormat() {
super();
}
protected boolean isSplitable(FileSystem fs, Path filename) {
return false;
}
@Override
public RecordReader<Text, BytesWritable> getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException {
return new BinaryDocumentWithNameRecordReader((FileSplit) split, job);
}
/**
* BinaryDocumentWithNameRecordReader class to read through a given binary document
* Outputs the filename along with the complete document
*/
public class BinaryDocumentWithNameRecordReader
implements RecordReader<Text, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private boolean processed = false;
public BinaryDocumentWithNameRecordReader(FileSplit fileSplit, Configuration conf)
throws IOException {
this.fileSplit = fileSplit;
this.conf = conf;
}
@Override
public Text createKey() {
return new Text();
}
@Override
public BytesWritable createValue() {
return new BytesWritable();
}
@Override
public long getPos() throws IOException {
return this.processed ? this.fileSplit.getLength() : 0;
}
@Override
public float getProgress() throws IOException {
return this.processed ? 1.0f : 0.0f;
}
@Override
public boolean next(Text key, BytesWritable value) throws IOException {
if (!this.processed) {
byte[] contents = new byte[(int) this.fileSplit.getLength()];
Path file = this.fileSplit.getPath();
FileSystem fs = file.getFileSystem(this.conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
in.readFully(contents, 0, contents.length);
key.set(file.getName());
value.set(contents, 0, contents.length);
}
finally {
in.close();
}
this.processed = true;
return true;
}
else {
return false;
}
}
@Override
public void close() throws IOException {
}
}
}
Once the Java classes have been compiled they need to be merged into a copy of the hadoop-streaming.jar file. In my case I have created a file called hadoop-streaming-ms.jar. This file is a copy of the base file into which I have copied the classes, as the JAR file is just a ZIP file; although one can also use the JAR tool. One just has to remember to use the package path:
com\microsoft\hadoop\mapreduce\lib\input
To use this new streaming package the file needs to be copied to the Hadoop install lib directory; in my case:
C:\Apps\dist\lib
If you download the code there is also an implementation of these classes that support a key value type pairing of <NullWritable, BytesWritable>; rather than the <Text, BytesWritable> key value type pairing that is used to pass in the documents filename. This can used used if the document’s filename is not needed.
Conclusion
Hopefully the code is once again useful if you intend to write any Binary Streaming applications of documents. Although the code only currently demonstrates processing of a Microsoft Word document in a future post I will extend this to support processing PDF documents.
Finally don't forget one can graph the results using the JS Console:
file = fs.read("/office/authors")
data = parse(file.data, "author, pages:long")
options = {title:"Pages Per Author", orientation:15, x:"author", y:"pages"}
graph.bar(data, options)
Resulting in:
Enjoy!