Example MapReduce programs

Completed

As a simple MapReduce example, we can write a job/program that counts the times that each word appears in a file or a set of files. For instance, if we assume two files, A and B, with contents "This is a cloud computing course" and "This is learning path 4 in the cloud computing course," we would expect an output similar to what is shown in the following table. Such a program is commonly called WordCount. The WordCount program is included in the Hadoop distribution and is readily available for running and testing. It represents a large spectrum of text-processing algorithms in big-data applications. As such, many consider it a state-of-the-art benchmark workload for evaluating Hadoop's efficiency. The original MapReduce paper1 used WordCount as a benchmark workload.

The following table shows the frequency of each word in two files with contents "This is a cloud computing course" and "This is learning path 4 in the cloud computing course," as supposedly generated by the WordCount benchmark program:

Word Count    Word Count    Word Count
This 2   computing 2   4 1
is 2   course 2   in 1
a 1   learning 1   The 1
cloud 2   path 1      

MapReduce programs can be written in the Java programming language. To write a WordCount program (or MapReduce programs in general), we usually start by defining input and output formats for the map and reduce functions. Since we are dealing with key-value pairs, we need only specify the actual keys and values that input and output files will contain and their types (e.g., string or numerical). A job's input (the input dataset), being one or a million files, must always exhibit the same format. Similarly, a job's output files must always expose the same format, which might differ from the input formats.

Input and output formats can be arbitrary, line-based logs, for example, or images, videos, multiline records, or something totally different. Hadoop can process any file format, from flat text, through binary, to structured databases. To arrange this, users can override Hadoop's InputFormat (default TextInputFormat) and OutputFormat (default TextOutputFormat) classes. The former defines how input files are read as key-value pairs and constructs the corresponding input splits for map tasks. The Hadoop engine spawns one map task per input split thus generated. The OutputFormat class similarly defines how key-value pairs are written by reduce tasks as output files to HDFS. The following video shows a detailed illustration of how the InputFormat and OutputFormat classes are used in Hadoop MapReduce.

The default I/O subclasses are suitable for text processing. Specifically, TextInputFormat enables reading text files, with the byte offset of a line being a key and the actual line content being a value. TextOutputFormat allows writing files as key-value3 text pairs. Other formatting classes, such as the SequenceFileInputFormat and SequenceFileOutputFormat classes, are shipped with Hadoop and allow reading and writing binary files. In addition, Hadoop users can always implement custom input and output format classes tailored for their input datasets. More information on how to do that can be found in White's "Hadoop: The Definitive Guide."3

Our WordCount example assumes text file inputs. Thus, we can directly use the TextInputFormat class, with a key being the byte offset of a line in a file and a value being the line content itself. Furthermore, we can directly use the TextOutputFormat class, with a key being a word encountered in the input dataset and a value being the frequency of the word. The key type can be set to Java Long (LongWritable in Hadoop) and the value type to Java String (Text in Hadoop). The reduce function should receive words from the map tasks as keys and the digit 1 per word as values,4 so the key type will be that of words (Text) and the value type that of the unit digit (Java Integer, IntWritable in Hadoop). All that remains is the logic of the map and reduce functions. For the map function, input splits should be parsed and each word emitted with a count of 1. In the reduce function, each word received can be simply output as is along with its frequency, computed after aggregating all the 1s received with that word.5

The following code snippet shows our complete WordCount example code for the new Java MapReduce API released in Hadoop 0.20.0.

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {

	public static class WCMap extends Mapper<LongWritable, Text, Text,
	IntWritable> {
		private final static IntWritable one = new
				IntWritable(1);
		private Text word = new Text();

		public void map(LongWritable key, Text value, Context context) throws
		IOException, InterruptedException {
			String line = value.toString();
			StringTokenizer tokenizer = new StringTokenizer(line);

			while (tokenizer.hasMoreTokens()) {
				word.set(tokenizer.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class WCReduce extends Reducer<Text, IntWritable, Text,
	IntWritable> {

		public void reduce(Text key, Iterable<IntWritable> values, Context
		context)
				throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			context.write(key, new IntWritable(sum));
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = new Job(conf, "wordcount");

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setMapperClass(WCMap.class);
		job.setReducerClass(WCReduce.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileInputFormat.addInputPath(job, new Path(args[1]));
		FileOutputFormat.setOutputPath(job, new Path(args[2]));

		job.waitForCompletion(true);
	}

}

The code snippet shows that programmers need to create only two sequential functions, map and reduce, that are encapsulated in two (inner) classes: in this case, the WCMap and WCReduce inner classes. The WCMap inner class extends the Mapper class and overrides its map() function. The Mapper class maps given input key-value pair types (LongWritable and Text) to a set of output key-value pair types (Text and IntWritable). Output key-value pair types defined in the Mapper class should always match the input key-value pair types in the Reducer class. The WCReduce inner class extends the Reducer class and overrides its reduce() function. In addition to defining the input key-value pair types, the Reducer class defines the output key-value pair types (Text and IntWritable) that will be used by reduce tasks to generate the final results.

The map() and reduce() functions in the WCMap and WCReduce inner classes incorporate the actual logic of the WordCount program. The Context parameter in both functions performs I/O writes to local disks and HDFS. The main() function sets up a job to execute the WordCount program on a set of input files using the addInputPath() function. It also specifies where the output files are placed on HDFS using the setOutputPath() function. In the main() function, setOutputKeyClass() and setOutputValueClass() specify the key-value pair types emitted by reduce tasks and assume, by default, that these types match the map task output key-value types. If this is not the case, the main() function should also call setMapOutputKeyClass() and setMapOutputValueClass() to specify map task output key-value types. To set the input and output formats, the functions setInputFormatClass() and setOutputFormatClass() are called. Finally, the setMapperClass() and setReducerClass() functions are used to set the job's constituent inner map and reduce classes, WCMap and WCReduce. The following video discusses Sort, which is another classical MapReduce example.

This next video presents Sobel, which is an image-processing, edge-detection example.



3 Note that this is a tab space (not a single space) between the key and the value.

4 The digit 1 per word, W, is output to indicate that W has appeared in an input split. This allows the reduce task, which receives W, to simply increment its counter for W.

5 Each reduce task can receive multiple words from multiple map tasks, but each word appears at only one reduce task.


References

  1. J. Dean and S. Ghemawat (Dec. 2004). MapReduce: Simplified Data Processing on Large Clusters OSDI
  2. M. Zaharia, A. Konwinski, A. Joseph, R. Katz, and I. Stoica (2008). Improving MapReduce Performance in Heterogeneous Environments OSDI
  3. T. White (2011). Hadoop: The Definitive Guide 2nd Edition O'Reilly

Check your knowledge

1.

For the WordCount example, which combiner can be used to get the correct output?