Оптимизация обработки данных для Apache Spark

В этой статье описано, как оптимизировать конфигурацию кластера Apache Spark, чтобы обеспечить наилучшую производительность в Azure HDInsight.

Обзор

Если ваши задания медленно выполняют операции соединения (Join) или перемешивания (Shuffle), скорее всего причиной является неравномерное распределение данных. Неравномерное распределение данных обозначает асимметрию данных, предоставленных для задания. Например, задание сопоставления может занять 20 секунд, тогда как на задание с объединением или перемешиванием уходит несколько часов. Для устранения неравномерного распределения данных необходимо использовать строку случайных данных для целого ключа или изолированную строку случайных данных только для некоторого подмножества ключей. При использовании изолированной строки случайных данных необходимо далее использовать фильтрацию для изоляции подмножества соответствующих ключей в соединениях сопоставлений. Другой вариант — создать столбец группы и выполнить предварительную статистическую обработку сначала в группах.

На скорость выполнения операции соединения также влияет тип этой операции. По умолчанию кластер Spark использует тип соединения SortMerge. Этот тип соединения лучше всего подходит для больших наборов данных. Но с точки зрения объема вычислений он достаточно ресурсоемкий, так как перед объединением он сначала сортирует отдельно левую и правую части данных.

Соединение Broadcast лучше всего подходит для небольших наборов данных, или в случаях, когда одна сторона соединения значительно меньше другой. Этот тип соединения оповещает все исполнители, поэтому в целом требует большего объема памяти для такой операции передачи.

Тип соединения в конфигурации можно изменить, задав spark.sql.autoBroadcastJoinThreshold, или можно задать подсказку по соединению с помощью API-интерфейсов DataFrame (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

Если вы используете таблицы в группах, вам доступен третий тип соединения — соединение Merge. В соединении SortMerge правильно предварительно секционированный и отсортированный набор данных пропустит дорогостоящий этап сортировки.

Порядок соединений имеет значение, особенно в более сложных запросах. Начните с наиболее часто используемых соединений. Кроме того, по возможности перемещайте соединения, увеличивающие количество строк после статистической обработки.

Чтобы управлять параллелизмом для декартовых соединений, можно добавить вложенные структуры, управление окнами и, вероятно, пропустить один или несколько шагов в задании Spark.

Оптимизация выполнения задания

  • При необходимости выполните кэширование, например при повторном использовании данных.
  • Передайте переменные во все исполнители. Переменные сериализуются только один раз, за счет чего поиск ускоряется.
  • Используйте пул потока в драйвере, что ускорит выполнение нескольких задач.

Регулярно отслеживайте выполняющиеся задания для обнаружения проблем с производительностью. Если необходимо получить дополнительные сведения об определенных проблемах, ознакомьтесь с одним из следующих инструментов профилирования.

Ключевым аспектом производительности запроса Spark 2.x является механизм Tungsten, который зависит от комплексного создания кода. В некоторых случаях комплексное создание кода можно отключить. Например, если в статистическом выражении используется неизменяемый тип (string), вместо HashAggregate появится SortAggregate. Например, для повышения производительности запустите команду ниже, а затем повторно включите создание кода:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

Дальнейшие действия