Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этой статье описывается оптимизация конфигурации кластера Apache Spark для повышения производительности в Azure HDInsight.
Обзор
Если у вас есть медленные задания на присоединении или перетасовке, причина, вероятно, перекос данных. Перекос данных означает асимметрию в данных задания. Например, задание карты может занять 20 секунд. Но выполнение задания, в котором данные объединяются или перемешиваются, занимает часы. Для устранения неравномерного распределения данных необходимо добавить соль ко всему ключу или использовать переменную соль только для некоторого подмножества ключей. При использовании изолированной соли необходимо далее использовать фильтрацию для изоляции подмножества ссолённых ключей в объединениях карт. Другой вариант — ввести столбец сегмента и сначала предварительно агрегировать данные в сегментах.
На скорость выполнения операции соединения также влияет тип этой операции. По умолчанию Spark использует тип соединения SortMerge. Этот тип соединения лучше всего подходит для больших наборов данных. Но это требует значительных вычислительных ресурсов, поскольку сначала необходимо отсортировать левую и правую стороны данных перед их объединением.
Соединение Broadcast лучше всего подходит для небольших наборов данных, или в случаях, когда одна сторона соединения значительно меньше другой. Этот тип соединения рассылает данные всем исполнителям, и поэтому в общем случае требует больше памяти для операций передачи.
Тип соединения в конфигурации можно изменить, задав spark.sql.autoBroadcastJoinThreshold, или можно задать подсказку по соединению с помощью API-интерфейсов DataFrame (dataframe.join(broadcast(df2))).
// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)
// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
createOrReplaceTempView("V_JOIN")
sql("SELECT col1, col2 FROM V_JOIN")
Если вы используете сгруппированные таблицы, у вас есть третий тип соединения — соединение Merge. В соединении SortMerge правильно предварительно секционированный и отсортированный набор данных пропустит дорогостоящий этап сортировки.
Порядок соединений имеет значение, особенно в более сложных запросах. Начните с наиболее часто используемых соединений. Кроме того, по возможности перемещайте соединения, увеличивающие количество строк после статистической обработки.
Чтобы управлять параллелизмом для декартовых соединений, можно добавить вложенные структуры, управление окнами и, вероятно, пропустить один или несколько шагов в задании Spark.
Оптимизация выполнения задания
- При необходимости выполните кэширование, например при повторном использовании данных.
- Передайте переменные во все исполнители. Переменные сериализуются только один раз, за счет чего поиск ускоряется.
- Используйте пул потока в драйвере, что ускорит выполнение нескольких задач.
Регулярно отслеживайте ваши процессы на предмет проблем с производительностью. Если вам потребуется более подробное представление о некоторых проблемах, рассмотрите один из следующих средств профилирования производительности:
- средство Intel PAL отслеживает использование ЦП, хранилища и пропускной способности сети.
- Mission Control Oracle Java 8 профилирует код для Spark и исполнителя.
Ключевым элементом производительности запросов в Spark 2.x является механизм Tungsten, который зависит от генерации кода на уровне всего этапа. В некоторых случаях комплексное создание кода можно отключить. Например, если в статистическом выражении используется неизменяемый тип (string), вместо SortAggregate появится HashAggregate. Например, для повышения производительности запустите команду ниже, а затем повторно включите создание кода:
MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))