Программирование в Spark

Завершено

Теперь, когда мы познакомились с архитектурой и потоком вычислений в Spark, имеет смысл рассмотреть какую-нибудь простую программу Spark. Но перед этим нам все же нужно разобраться в некоторых основных примитивных операциях RDD.

Начнем с некоторых основных преобразований с одним RDD (RDD1 = {1,2,3,3}):

Имя функции Назначение Пример Результат
map() Применить функцию к каждому элементу в RDD и вернуть RDD результата. rdd.map(x => x + 1) {2, 3, 4, 4}
flatMap() Применить функцию к каждому элементу в RDD и вернуть RDD содержимого возвращаемых итераторов. Часто используется для извлечения слов. rdd.flatMap(x => x.to(3)) {1, 2, 3, 2, 3, 3, 3}
filter() Возвращает RDD, состоящий только из тех элементов, которые передают условие, переданное в filter(). rdd.filter(x => x != 1) {2, 3, 3}
distinct() Удаляет дубликаты. rdd.distinct() {1, 2, 3}

Преобразования также могут использовать в качестве входных данных сразу несколько RDD. Здесь мы рассмотрим преобразования с двумя RDD в качестве входных данных (RDD1 = {1,2,3}; RDD2 = {3,4,5}):

Имя функции Назначение Пример Результат
union() Создает RDD, содержащий элементы из обоих RDD. rdd.union(other) {1, 2, 3, 3, 4, 5}
intersection() RDD, содержащий только элементы, обнаруженные в обоих RDD. rdd.intersection(other) {3}
subtract() Удаляет содержимое одного RDD (например, обучающие данные). rdd.subtract(other) {1, 2}
cartesian() Декартово произведение с другим RDD. rdd.cartesian(other) {(1, 3), (1, 4), … (3,5)}

Наконец, мы рассмотрим некоторые действия, на этот раз снова на одном RDD = {1,2,3,3}:

Имя функции Назначение Пример Результат
collect() Возвращает все элементы из RDD. rdd.collect() {1, 2, 3, 3}
count() Количество элементов в RDD. rdd.count() 4
countByValue() Сколько раз каждый элемент встречается в RDD. rdd.countByValue() {(1, 1), (2, 1), (3, 2)}
take(num) Возвращает числовые элементы из RDD. rdd.take(2) {1, 2}
top(num) Возвращает верхние числовые элементы из RDD. rdd.top(2) {3, 3}
takeOrdered(num)(ordering) Возвращает числовые элементы, исходя из указанного порядка. rdd.takeOrdered(2)(myOrdering) {3, 3}
reduce(func) Объединяет элементы RDD в параллельное вычисление (например, суммирует). rdd.reduce((x, y) => x + y) 9
fold(zero)(func) То же, что reduce(), но с указанным нулевым значением. rdd.fold(0)((x, y) => x + y) 9
aggregate(zeroValue)(seqOp, combOp) Похоже на reduce(), но используется для возврата другого типа. rdd.aggregate((0, 0)) ((x, y) =>(x._1 + y, x._2 + 1), (x, y) =>(x._1 + y._1, x._2 + y._2)) (9, 4)

Теперь, когда мы изучили некоторые примитивы, давайте посмотрим, как все эти операции работают вместе при реализации простой программы. Рассмотрим реализацию итеративного алгоритма PageRank в Spark.

Алгоритм PageRank итеративно обновляет рейтинг для каждого URL-адреса, добавляя вклады из ссылающихся на него URL-адресов. PageRank предполагает, что пользователь, начав со случайной страницы, с 85-процентной вероятностью выберет случайную ссылку на текущей странице и с 15-процентной вероятностью перейдет на случайную страницу любого другого сайта.

При каждой итерации каждый URL-адрес добавляет к своим соседям r/n, где r — его собственный ранг, а n — количество соседей этого узла. Затем он обновляет его ранг до $\frac{a}{N} + (1 - a) \sum c_{i}$, где $\sum c_{i}$ — это сумма всех полученных вкладов, а $N $ — общее количество документов. $a$ показывает вероятность того, что случайный пользователь, начавший движение с определенной веб-страницы, перестанет переходить по ссылкам. Это называется демпинг-фактором (как уже говорилось выше, исследования показали, что его вероятность составляет около 0,85).

Рассмотрим следующую программу PageRank Scala:

val links = spark.textFile(...).map(...).persist()
var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) 
{
	// Build an RDD of (targetURL, float) pairs
	// with the contributions sent by each page
	val contribs = links.join(ranks).flatMap 
	{
		(url, (links, rank)) =>
		links.map(dest => (dest, rank/links.size))
	}

	// Sum contributions by URL and get new ranks
	ranks = contribs.reduceByKey((x,y) => x+y).mapValues(sum => a/N + (1-a)*sum)
}

В этой реализации PageRank в Spark набор входных данных состоит из текстового файла в формате (URL, rank). Для каждой итерации операция присоединения с атрибутами links и ranks используется для статистической обработки вклада по каждому URL-адресу. RDD contribs представляет вклад, отправленный каждым URL-адресом. Вклады суммируются по каждому ключу (с использованием редукции), после чего полученное значение обновляется по формуле PageRank $\frac{a}{N} + (1 - a) \sum c_{i}$. После обновления RDD ranks процесс повторяется еще раз для указанного количества итераций.

Lineage graph for the Spark PageRank example.

Рис. 6. График происхождения для примера Spark PageRank

Как уже говорилось на предыдущей странице, операцию присоединения можно оптимизировать за счет аналогичного секционирования ссылок и рангов (например, с помощью хэш-разделителя для секционирования URL-адресов между узлами). Если каждый раздел ссылок и соответствующий раздел рангов будут находиться на одном узле, можно будет полностью исключить обмен данными между узлами из операции присоединения.

Обратите внимание на то, что на предыдущем рисунке журнал преобразований рангов RDD в каждой итерации продолжает пополняться. В связи с этим, возможно, потребуется скорректировать стратегию так, чтобы некоторые версии рангов сохранялись для более эффективного восстановления после отказов.

Решения о секционировании, хранении и выборе оптимального набора операций для определения вычислений затрудняют разработку оптимальных программ Spark. Однако при правильной реализации задания Spark могут выполняться намного быстрее, чем традиционные задания MapReduce.