Пример программ MapReduce

Завершено

В качестве простого примера MapReduce можно написать задание или программу, которая считает, сколько раз каждое слово встречается в файле или наборе файлов. Например, если у нас два файла, A и B, с содержимым "This is a cloud computing course" (Это курс по облачным вычислениям) и "This is learning path 4 in the cloud computing course" (Это схема обучения 4 в курсе по облачным вычислениям), мы получим результат, как в следующей таблице. Такая программа обычно называется WordCount. Программа WordCount включена в дистрибутив Hadoop и доступна для выполнения и тестирования. Она представляет собой большой спектр алгоритмов обработки текста в приложениях с большими данными. Таким образом, многие считают ее эталоном для оценки производительности Hadoop. В исходной документации по MapReduce1 WordCount использовался в качестве рабочей нагрузки для тестирования.

В следующей таблице показана частота каждого слова в двух файлах с содержимым "This is a cloud computing course" (Это курс по облачным вычислениям) и "This is learning path 4 in the cloud computing course" (Это схема обучения 4 в курсе по облачным вычислениям) в программе тестирования производительности WordCount:

Word Count    Word Count    Word Count
This 2   computing 2   4 1
- 2   course 2   in 1
a 1   Обучение 1   Компонент 1
cloud 2   path 1      

Программы MapReduce могут быть написаны на языке программирования Java. Чтобы написать программу WordCount (или программы MapReduce в целом), мы обычно начинаем с определения входных и выходных форматов для функций map и reduce. Поскольку мы работаем с парами "ключ-значение", нам нужно указать только фактические ключи и значения, которые будут содержать входные и выходные файлы и их типы (например, строка или число). Входные данные задания (входной набор данных), которые представляют собой один или миллион файлов, всегда должны иметь одинаковый формат. Аналогичным образом выходные файлы задания всегда должны иметь тот же формат, который может отличаться от входных форматов.

Входные и выходные форматы могут быть произвольными журналами на основе строк, например изображения, видео, многострочные записи или что-то другое. Hadoop может обрабатывать файлы любого формата, от простого до двоичного текста и структурированных баз данных. Для этого пользователи могут переопределить классы InputFormat Hadoop (по умолчанию TextInputFormat) и OutputFormat (по умолчанию TextOutputFormat). Первый из них определяет способ считывания входных файлов в виде пар "ключ-значение" и создает соответствующие входные фрагменты для задач map. Механизм Hadoop создает одну задачу map для каждого такого фрагмента. Класс OutputFormat аналогичным образом определяет, как пары "ключ-значение" записываются задачами reduce в качестве выходных файлов в HDFS. В следующем видео показано, как классы InputFormat и OutputFormat используются в Hadoop MapReduce.

Подклассы ввода-вывода по умолчанию подходят для обработки текста. В частности, TextInputFormat позволяет читать текстовые файлы, при этом смещение строки в байтах является ключом, а фактическое содержимое строки — значением. TextOutputFormat позволяет записывать файлы в виде текстовых пар "ключ-значение"3. Другие классы форматирования, такие как SequenceFileInputFormat и SequenceFileOutputFormat, поставляются с Hadoop и позволяют считывать и записывать двоичные файлы. Кроме того, пользователи Hadoop всегда могут реализовывать пользовательские классы входных и выходных форматов, предназначенные для входных наборов данных. Дополнительные сведения о том, как это сделать, можно найти в "Hadoop: Окончательное руководство".3

В нашем примере WordCount используются входные текстовые файлы. Мы можем напрямую использовать класс TextInputFormat с ключом, выражающим смещение строки в файле в байтах, и значением, равным содержимому строки. Кроме того, можно напрямую использовать класс TextOutputFormat с ключом, который является словом, обнаруженным во входном наборе данных, и значением, которое представляет частоту слова. Для типа ключа можно задать значение Java Long (LongWritable в Hadoop) и тип значения Java String (Text в Hadoop). Функция reduce должна получать слова из задач map в качестве ключей и число 1 на каждое слово в качестве значений4, поэтому типом ключа будет тип слов (Text), а типом значения — тип цифры (Java Integer, IntWritable в Hadoop). Остается только логика функций map и reduce. Для функции map входные фрагменты должны быть проанализированы, и к каждому слову должно быть добавлено количество 1. В функции reduce каждое полученное слово можно просто выводить без изменений вместе с частотой, вычисленной путем статистической обработки всех единиц, полученных с этим словом5.

В следующем фрагменте приведен полный код примера WordCount для нового API MapReduce для Java, выпущенным в Hadoop 0.20.0.

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {

	public static class WCMap extends Mapper<LongWritable, Text, Text,
	IntWritable> {
		private final static IntWritable one = new
				IntWritable(1);
		private Text word = new Text();

		public void map(LongWritable key, Text value, Context context) throws
		IOException, InterruptedException {
			String line = value.toString();
			StringTokenizer tokenizer = new StringTokenizer(line);

			while (tokenizer.hasMoreTokens()) {
				word.set(tokenizer.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class WCReduce extends Reducer<Text, IntWritable, Text,
	IntWritable> {

		public void reduce(Text key, Iterable<IntWritable> values, Context
		context)
				throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			context.write(key, new IntWritable(sum));
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = new Job(conf, "wordcount");

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setMapperClass(WCMap.class);
		job.setReducerClass(WCReduce.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileInputFormat.addInputPath(job, new Path(args[1]));
		FileOutputFormat.setOutputPath(job, new Path(args[2]));

		job.waitForCompletion(true);
	}

}

Фрагмент кода показывает, что программистам необходимо создать только две последовательных функции, map и reduce, инкапсулированные в двух (внутренних) классах: в данном случае — WCMap и WCReduce. Внутренний класс WCMap расширяет класс Mapper и переопределяет его функцию map(). Класс Mapper сопоставляет указанные типы входных пар "ключ-значение" (LongWritable и Text) с набором типов выходных пар "ключ-значение" (Text и IntWritable). Типы выходных пар "ключ-значение", определенные в классе Mapper, должны всегда соответствовать типам входных пар "ключ-значение" в классе Reducer. Внутренний класс WCReduce расширяет класс Reducer и переопределяет его функцию reduce(). Помимо определения входных типов пар "ключ-значение", класс Reducer определяет типы выходных пар "ключ-значение" (Text и IntWritable), которые будут использоваться в задачах reduce для формирования окончательных результатов.

Функции map() и reduce() во внутренних классах WCMap и WCReduce включают в себя фактическую логику программы WordCount. Параметр Context в обеих функциях выполняет запись ввода-вывода в локальные диски и HDFS. Функция main() настраивает задание для выполнения программы WordCount с набором входных файлов с помощью функции addInputPath(). Она также указывает, куда выходные файлы помещаются в HDFS с помощью функции setOutputPath(). В функции main()setOutputKeyClass() и setOutputValueClass() указывают типы пар "ключ-значение", создаваемые задачами reduce, и по умолчанию предполагают, что эти типы соответствуют типам выходных пар "ключ-значение" задачи map. Если это не так, то функция main() должна также вызывать setMapOutputKeyClass() и setMapOutputValueClass(), чтобы указать типы выходных пар "ключ-значение" для задачи map. Чтобы задать форматы ввода и вывода, вызываются функции setInputFormatClass() и setOutputFormatClass(). Наконец, функции setMapperClass() и setReducerClass() используются для указания внутренних классов map и reduce в задании, WCMap и WCReduce. В видео обсуждается программа Sort, которая является еще одним классическим примером MapReduce.

В следующем видео представлена программа Sobel, которая является примером обработки изображений и обнаружения края.



3 Обратите внимание, что ключ и значение разделены табуляцией, а не одним пробелом.

4 Цифра 1 для каждого слова, W, — это выходные данные, указывающие, что W появилось во входном фрагменте. Это позволяет задаче reduce, которая получает W, просто увеличить счетчик для W.

5 Каждая задача reduce может получить несколько слов из нескольких задач map, но каждое слово появляется только в одной задаче reduce.


Ссылки

  1. J. Dean and S. Ghemawat (Dec. 2004). MapReduce: Simplified Data Processing on Large Clusters OSDI
  2. M. Zaharia, A. Konwinski, A. Joseph, R. Katz, and I. Stoica (2008). Improving MapReduce Performance in Heterogeneous Environments OSDI
  3. T. White (2011). Hadoop: The Definitive Guide 2nd Edition O'Reilly

Проверьте свои знания

1.

Какое средство объединения можно использовать для получения правильных выходных данных в примере WordCount?