Поделиться через


Оптимизация заданий Apache Spark в Azure Synapse Analytics

Узнайте, как оптимизировать конфигурацию кластера Apache Spark для определенной рабочей нагрузки. Наиболее распространенной проблемой является нехватка памяти. Причинами этой проблемы можно назвать неправильные конфигурации (в частности неправильные размеры исполнителей), длительные операции и задачи, которые приводят к декартовым операциям. Ускорить выполнение заданий можно с помощью соответствующего кэширования и разрешения неравномерного распределения данных. Для повышения производительности мы рекомендуем отслеживать и просматривать выполнения длительных и ресурсоемких заданий Spark.

В разделах ниже описаны общие рекомендации и распространенные методы оптимизации задания Spark.

Выбор абстракции данных

В более ранних версиях Spark используются наборы RDD для абстракции данных. В Spark версии 1.3 и 1.6 были представлены DataFrames и DataSets соответственно. Рассмотрим следующие относительные характеристики:

  • Кадры данных
    • Оптимальный вариант в большинстве случаев.
    • Предоставляет оптимизацию запросов через Catalyst.
    • Комплексное создание кода.
    • Прямой доступ к памяти.
    • Низкие накладные расходы при сборке мусора.
    • Не настолько удобны для разработчиков, как наборы данных, так как отсутствуют проверки со временем компиляции или программирование на основе объекта домена.
  • Наборы данных
    • Подходят для использования в сложных конвейерах ETL, где допустимо влияние производительности.
    • Не подходят для использования в статистических функциях, где весомо влияние производительности.
    • Предоставляет оптимизацию запросов через Catalyst.
    • Удобны для разработчиков, так как обеспечивают программирование на основе объекта домена и проверки со временем компиляции.
    • Увеличивают нагрузку при десериализации и сериализации.
    • Высокие накладные расходы при сборке мусора.
    • Разбивают комплексное создание кода на этапы.
  • Устойчивые распределенные наборы данных (RDD)
    • Вам необязательно использовать наборы RDD, если только вам не нужно создать пользовательский RDD.
    • Отсутствует оптимизация запросов через Catalyst.
    • Отсутствует комплексное создание кода.
    • Высокие накладные расходы при сборке мусора.
    • Необходимо использовать устаревшие API-интерфейсы Spark 1.x.

Использование оптимального формата данных

Spark поддерживает многие форматы, такие как CSV, JSON, XML, PARQUET, ORC и AVRO. С помощью внешних источников данных его можно расширить для поддержки большего количества форматов. Дополнительные сведения см. на странице пакетов Apache Spark.

Лучший формат для повышения производительности — PARQUET со сжатием Snappy, который является стандартным форматом в кластере Spark 2.x. В формате PARQUET данные хранятся в столбцах. Этот формат высоко оптимизирован в Spark. Кроме того, хотя мгновенное сжатие может привести к увеличению размера файлов, чем при сжатии gzip, эти файлы будут распакованы быстрее из-за особенностей их разделяемости.

Использование кэша

Spark обеспечивает собственные механизмы кэширования, которые можно использовать с помощью различных методов, например .persist(), .cache() и CACHE TABLE. Такое встроенное кэширование эффективно при работе с небольшими наборами данных, а также в конвейерах ETL, где требуется кэшировать промежуточные результаты. Однако встроенное кэширование Spark в настоящее время не подходит для работы с секционированием, так как в кэшированнной таблице не хранятся секционированные данные.

Эффективное использование памяти

Кластер Spark работает путем размещения данных в памяти, поэтому управление ресурсами памяти является ключевым аспектом оптимизации выполнения заданий Spark. Есть несколько методов, которые можно применить для эффективного использования памяти кластера.

  • В рамках стратегии секционирования рекомендуется выбирать небольшие секции данных и учитывать размер данных, типы и распределение.

  • В Synapse Spark (среда выполнения 3.1 или более поздней версии) сериализация данных Kryo включена по умолчанию сериализация данных Kryo.

  • Размер буфера криптосериализатора можно настроить с помощью конфигурации Spark на основе требований рабочей нагрузки:

    // Set the desired property
    spark.conf.set("spark.kryoserializer.buffer.max", "256m")
    
    
  • Отслеживайте и настраивайте параметры конфигурации Spark.

Для справки структура памяти Spark и некоторые основные параметры памяти исполнителя показаны на рисунке ниже.

Рекомендации по использованию памяти Spark

Apache Spark в Azure Synapse использует YARN Apache Hadoop YARN, эта платформа управляет максимальным объемом памяти, используемой всеми контейнерами на всех узлах Spark. На схеме ниже показаны ключевые объекты и их связи.

Управление памятью Spark в YARN

При получении сообщений о нехватке памяти сделайте следующее:

  • Просмотрите операции перемешивания при управлении группами обеспечения доступности баз данных. Ограничьте их путем снижения на стороне сопоставления, выполните предварительное секционирование (или разбиение на группы) исходных данных, увеличьте объем операций перемешивания для отдельных процессов и сократите объем отправляемых данных.
  • Выберите ReduceByKey с фиксированным объемом памяти, а не GroupByKey, который обеспечивает статистические функции, управление окнами и другие возможности, но включает неограниченный объем памяти.
  • Выберите TreeReduce, который в основном обрабатывает исполнителей или секции, а не Reduce, который в основном обрабатывает драйвер.
  • Используйте кадры данных, а не объекты устойчивого распределенного набора данных более низкого уровня.
  • Создайте типы ComplexTypes, инкапсулирующие действия, такие как "Первые N", различные статистические функции или операции управления окнами.

Оптимизация сериализации данных

Так как задания кластера Spark можно распределить, соответствующая сериализация данных представляет собой важный шаг для повышения производительности. Есть два варианта сериализации данных Spark:

  • Сериализация Java, используемая по умолчанию.
  • Сериализация Kryo (новый формат), которая может ускорить сериализацию и сделать ее компактнее по сравнению с сериализацией Java. При использовании сериализации Kryo необходимо зарегистрировать классы в программе. Пока что она поддерживает не все сериализуемые типы.

Использование группирования

Группирование аналогично секционированию данных, но каждый контейнер может содержать не одно значение столбца, а набор. Группирование подходит для секционирования большого количества значений (миллионов и более), например идентификаторов продукта. Контейнер определяется хэшированием ключа контейнера строки. Таблицы в контейнерах предлагают уникальную оптимизацию, так как в них хранятся метаданные о способах группирования и сортировки.

Ниже приведены некоторые расширенные функции группирования.

  • Оптимизация запросов на основе группирования метасведений.
  • Оптимизированные статистические функции.
  • Оптимизированные соединения.

Вы можете одновременно использовать секционирование и группирование.

Оптимизация операций соединения и перемешивания

При медленном выполнении заданий в операциях соединения или перемешивания причиной скорее всего является неравномерное распределение данных, представляющее собой асимметрию в данных задания. Например, выполнение задания сопоставления может занять 20 секунд, но выполнение задания с объединением или перемешиванием данных может занять несколько часов. Для устранения неравномерного распределения данных необходимо использовать строку случайных данных для целого ключа или изолированную строку случайных данных только для некоторого подмножества ключей. При использовании изолированной строки случайных данных необходимо далее использовать фильтрацию для изоляции подмножества соответствующих ключей в соединениях сопоставлений. Другой вариант — создать столбец группы и выполнить предварительную статистическую обработку сначала в группах.

На скорость выполнения операции соединения также влияет тип этой операции. По умолчанию кластер Spark использует тип соединения SortMerge. Этот тип соединения лучше всего подходит для больших наборов данных, но с точки зрения объема расчетов он достаточно дорогостоящий, так как используется сначала для сортировки левой и правой частей данных, а затем для их объединения.

Соединение Broadcast лучше всего подходит для небольших наборов данных, или в случаях, когда одна сторона соединения значительно меньше другой. Этот тип соединения оповещает все исполнители, поэтому в целом требует большего объема памяти для такой операции передачи.

Тип соединения в конфигурации можно изменить, задав spark.sql.autoBroadcastJoinThreshold, или можно задать подсказку по соединению с помощью API-интерфейсов DataFrame (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

Если вы используете таблицы в группах, вам доступен третий тип соединения — соединение Merge. В соединении SortMerge правильно предварительно секционированный и отсортированный набор данных пропустит дорогостоящий этап сортировки.

Порядок соединений имеет значение, особенно в более сложных запросах. Начните с наиболее часто используемых соединений. Кроме того, по возможности перемещайте соединения, увеличивающие количество строк после статистической обработки.

Чтобы управлять параллелизмом для декартовых соединений, можно добавить вложенные структуры, управление окнами и, вероятно, пропустить один или несколько шагов в задании Spark.

Выбор правильного размера исполнителя

При выборе конфигурации исполнителя предусмотрите лимит переполнения памяти при сборке мусора Java.

  • Факторы, которые стоит учесть, чтобы выбрать исполнитель меньшего размера:

    • Уменьшите размер кучи менее чем до 32 ГБ, чтобы издержки на сборку мусора не превысили 10 %.
    • Сократите число ядер, чтобы издержки на сборку мусора не превысили 10 %.
  • Факторы, которые стоит учесть, чтобы выбрать исполнитель большего размера:

    • Уменьшите лимит переполнения памяти при обмене данными между исполнителями.
    • Сократите число открытых соединений между исполнителями (N2) в более крупных кластерах (> 100 исполнителей).
    • Увеличьте размер кучи для обработки задач с интенсивным потреблением ресурсов памяти.
    • (Необязательно.) Уменьшите лимит переполнения памяти каждого исполнителя.
    • (Необязательно.) Увеличьте использование и параллелизм, увеличив число назначенных ЦП.

Как показывает опыт при выборе размера исполнителя стоит руководствоваться следующим:

  • Начните с 30 ГБ на каждый исполнитель и распределите доступные ядра компьютера.
  • Увеличьте число ядер исполнителя для более крупных кластеров (> 100 исполнителей).
  • Измените размеры в зависимости от пробных запусков и предшествующих факторов, например лимита переполнения памяти при сборке мусора.

При выполнении параллельных запросов учтите следующее:

  • Начните с 30 ГБ на каждый исполнитель и для всех ядер компьютера.
  • Создайте несколько параллельных приложений Spark, увеличив число назначенных ЦП (уменьшение задержки приблизительно на 30 %).
  • Распределите запросы между параллельными приложениями.
  • Измените размеры в зависимости от пробных запусков и предшествующих факторов, например лимита переполнения памяти при сборке мусора.

Отслеживайте производительность запросов для выбросов или других проблем с производительностью за счет поиска в представлении временной шкалы, SQL Graph, статистике задания и т. д. Иногда один или несколько исполнителей работают медленнее по сравнению с другими, и выполнение задач занимает гораздо больше времени. Это часто происходит в больших кластерах (более 30 узлов). В этом случае разделите работу на большое число задач, чтобы планировщик мог компенсировать их медленное выполнение.

Например, создайте как минимум в два раза больше задач по сравнению с количеством ядер исполнителя в приложении. Можно также включить упреждающее выполнение задач с помощью conf: spark.speculation = true.

Оптимизация выполнения задания

  • При необходимости выполните кэширование, например при повторном использовании данных.
  • Передайте переменные во все исполнители. Переменные сериализуются только один раз, за счет чего поиск ускоряется.
  • Используйте пул потока в драйвере, что ускорит выполнение нескольких задач.

Ключевым аспектом производительности запроса Spark 2.x является механизм Tungsten, который зависит от комплексного создания кода. В некоторых случаях комплексное создание кода можно отключить.

Например, если в статистическом выражении используется неизменяемый тип (string), вместо HashAggregate появится SortAggregate. Например, для повышения производительности запустите команду ниже, а затем повторно включите создание кода:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

Следующие шаги