Адаптивное выполнение запроса
Адаптивное выполнение запросов (AQE) — это переоптимизация запросов, которая применяется во время выполнения запроса.
Цель повторной оптимизации среды выполнения заключается в том, чтобы платформа Azure Databricks имела самую актуальную и точную статистику по завершении обмена операциями смешивания и трансляций (называется "этапом запроса" в AQE). В результате Azure Databricks может применять более оптимальную физическую стратегию, подбирать оптимальный размер и число секций после смешивания или выполнять оптимизации, которые ранее требовали указаний, например обработку объединения с неравномерным распределением.
Это может быть очень полезно, если сбор статистики не включен или если статистика устарела. Это также полезно, если статически полученная статистика неточна, например в середине сложного запроса, или после обнаружения неравномерного распределения данных.
Возможности
AQE включен по умолчанию. Они состоят из 4 основных функций:
- Динамическое изменение объединения слиянием сортированных списков на объединение с трансляцией и хешированием.
- Динамическое объединение секций (небольших секций в секции подходящего размера) после обмена смешиваниями. Задачи очень маленького размера имеют плохую пропускную способность ввода-вывода и обычно требуют дополнительных ресурсов для планирования и настройки задач. Объединение небольших задач позволяет сэкономить ресурсы и улучшить пропускную способность кластера.
- Динамическая обработка неравномерного распределения данных в объединении слиянием сортированных списков и объединении со смешиванием и хэшированием путем разделения (и репликации при необходимости) задач с неравномерным распределением в задачи приблизительно равного размера.
- Динамическое обнаружение и распространение пустых отношений.
Приложение
AQE применяется ко всем запросам, которые:
- Не используют потоковую передачу.
- Включают по меньшей мере одну операцию обмена (обычно при наличии объединения, агрегирования или окна), один вложенный запрос или и то, и другое.
Не все запросы с AQE оптимизируются повторно. Повторная оптимизация может (или не может) потребоваться с другим планом запроса, отличающимся от скомпилированного статически. Чтобы определить, был ли изменен план запроса AQE, см. следующий раздел Планы запросов.
Планы запросов
В этом разделе описывается, как вы можете различными способами изучить планы запросов.
В этом разделе рассматриваются следующие вопросы.
Пользовательский интерфейс Spark
Узел AdaptiveSparkPlan
Запросы с применением AQE содержат один или несколько узлов AdaptiveSparkPlan
, обычно в виде корневого узла каждого основного запроса или вложенного запроса.
Перед выполнением запроса или во время его выполнения флаг isFinalPlan
соответствующего узла AdaptiveSparkPlan
отображается как false
; после выполнения запроса флаг isFinalPlan
изменяется на true.
.
Изменяющийся план
Схема плана запроса изменяется по мере выполнения и отражает актуальный выполняемый план. Узлы, которые уже выполнены (с доступными метриками) не изменяются. Но те, которые еще не выполнены, могут измениться с течением времени в результате повторных оптимизаций.
Ниже приведен пример схемы плана запроса.
DataFrame.explain()
Узел AdaptiveSparkPlan
Запросы с применением AQE содержат один или несколько узлов AdaptiveSparkPlan
, обычно в виде корневого узла каждого основного запроса или вложенного запроса. Перед выполнением запроса или во время его выполнения флаг isFinalPlan
соответствующего узла AdaptiveSparkPlan
отображается как false
; после выполнения запроса флаг isFinalPlan
изменяется на true
.
Текущий и первоначальный планы
В каждом узле AdaptiveSparkPlan
будет присутствовать первоначальный план (план перед применением оптимизаций AQE) и текущий или окончательный план (в зависимости от того, завершено ли выполнение). Текущий план будет изменяться по ходу выполнения.
Статистика времени выполнения
Каждый этап смешения и трансляции включает статистику по данным.
Перед выполнением этапа или во время его выполнения статистика предоставляется приблизительно на время компиляции, а флаг isRuntime
имеет значение false
, например: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
.
После завершения этапа статистика включает значения, собранные во время выполнения, а флаг isRuntime
изменится на true
, например: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
.
Ниже приведен пример DataFrame.explain
.
Перед выполнением
Во время выполнения
После выполнения
SQL EXPLAIN
Узел AdaptiveSparkPlan
Запросы с применением AQE содержат один или несколько узлов AdaptiveSparkPlan, обычно в виде корневого узла каждого основного запроса или вложенного запроса.
Без текущего плана
Так как SQL EXPLAIN
не выполняет запрос, текущий план не отличается от первоначального плана и не отражает запрос, который в итоге будет выполнен AQE.
Ниже приведен пример для SQL.
Эффективность
План запроса изменится, если применены одна или несколько оптимизаций AQE. Эффект этих оптимизаций AQE можно хорошо понять по разнице между текущим и окончательным планами и первоначальным планом, а также между определенными узлами плана в текущем и окончательном планах.
Динамическое изменение объединения слиянием сортированных списков в объединение с трансляцией и хешированием: разные физические узлы объединения в текущем/окончательном плане и первоначальном плане.
Динамическое объединение секций: узел
CustomShuffleReader
со свойствомCoalesced
.Динамическая обработка объединения с неравномерным распределением: узел
SortMergeJoin
с полемisSkew
со значением true.Динамическое обнаружение и распространение пустых отношений: часть плана (или план целиком) заменяется узлом LocalTableScan с пустым полем отношения.
Настройка
В этом разделе рассматриваются следующие вопросы.
- Включение и отключение адаптивного выполнения запросов
- Включение автооптимизируемого перетасовки
- Динамическое изменение объединения слиянием сортированных списков на объединение с трансляцией и хешированием
- Динамическое объединение секций
- Динамическая обработка объединения с неравномерным распределением
- Динамическое обнаружение и распространение пустых отношений
Включение и отключение адаптивного выполнения запросов
Свойство |
---|
spark.databricks.optimizer.adaptive.enabled Тип: Boolean Следует ли включить или выключить адаптивное выполнение запросов. Значение по умолчанию: true |
Включение автооптимизируемого перетасовки
Свойство |
---|
spark.sql.shuffle.partitions Тип: Integer Число секций по умолчанию, используемых при перетасовки данных для соединений или агрегатов. Установка значения auto позволяет автоматически оптимизировать перетасовку, которая автоматически определяет это число на основе плана запроса и размера входных данных запроса.Примечание. Для структурированной потоковой передачи эту конфигурацию нельзя изменить между перезапусками запросов из одного расположения проверка point. Значение по умолчанию: 200 |
Динамическое изменение объединения слиянием сортированных списков на объединение с трансляцией и хешированием
Свойство |
---|
spark.databricks.adaptive.autoBroadcastJoinThreshold Тип: Byte String Пороговое значение для запуска перехода на объединение с трансляцией во время выполнения. Значение по умолчанию: 30MB |
Динамическое объединение секций
Свойство |
---|
spark.sql.adaptive.coalescePartitions.enabled Тип: Boolean Следует ли включить или отключить объединение секций. Значение по умолчанию: true |
spark.sql.adaptive.advisoryPartitionSizeInBytes Тип: Byte String Целевой размер после объединения. Размеры объединенных секций не могут превышать этот размер. Значение по умолчанию: 64MB |
spark.sql.adaptive.coalescePartitions.minPartitionSize Тип: Byte String Минимальный размер секций после объединения. Размеры объединенных секций не могут быть меньше этого размера. Значение по умолчанию: 1MB |
spark.sql.adaptive.coalescePartitions.minPartitionNum Тип: Integer Минимальное число секций после объединения. Не рекомендуется, так как этот параметр явно переопределяет spark.sql.adaptive.coalescePartitions.minPartitionSize .Значение по умолчанию: 2 x число ядер кластеров |
Динамическая обработка объединения с неравномерным распределением
Свойство |
---|
spark.sql.adaptive.skewJoin.enabled Тип: Boolean Следует ли включить или отключить обработку объединения с неравномерным распределением. Значение по умолчанию: true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor Тип: Integer Коэффициент, который при умножении на медиану размера секции позволяет определить, присутствует ли в секции неравномерное распределение. Значение по умолчанию: 5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes Тип: Byte String Порог, помогающий определить, равномерно ли распределены данные в секции. Значение по умолчанию: 256MB |
Секция имеет неравномерное распределение, если (partition size > skewedPartitionFactor * median partition size)
и (partition size > skewedPartitionThresholdInBytes)
имеют значение true
.
Динамическое обнаружение и распространение пустых отношений
Свойство |
---|
spark.databricks.adaptive.emptyRelationPropagation.enabled Тип: Boolean Позволяет включить или отключить динамическое распространение пустых отношений. Значение по умолчанию: true |
Часто задаваемые вопросы
В этом разделе рассматриваются следующие вопросы.
- Почему AQE не транслирует таблицу с небольшим объединением?
- Следует ли мне использовать указание стратегии объединения с трансляцией с включенным AQE?
- В чем разница между указанием объединения с неравномерным распределением и оптимизацией объединения с неравномерным распределением AQE? Что же следует использовать?
- Почему AQE не выполняет автоматическое упорядочивание объединений?
- Почему AQE не обнаруживает неравномерное распределение данных?
Почему AQE не транслирует таблицу с небольшим объединением?
Если размер отношения для трансляции не превышает этот порог, но все равно не транслируется, сделайте следующее:
- Проверьте тип объединения. Трансляция не поддерживается для некоторых типов объединений, например левое отношение
LEFT OUTER JOIN
не транслируется. - Также может быть, что отношение содержит большое число пустых секций. В этом случае большинство задач могут быстро завершаться с объединением слиянием сортированных списков или потенциально могут быть оптимизированы с помощью обработки объединений с неравномерным распределением. AQE избегает изменения таких объединений слиянием сортированных списков на объединения с трансляцией и хешированием, если доля непустых секций меньше
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
.
Следует ли мне использовать указание стратегии объединения с трансляцией с включенным AQE?
Да. Статически запланированное объединение с трансляцией обычно демонстрирует более высокую производительность, чем динамически запланированное AQE, так как AQE может не переключаться на объединение с трансляцией, пока не будет выполнено смешение для обеих сторон объединения (к этому времени будут получены фактические размеры отношения). Поэтому использование указания трансляции может все равно быть оптимальным, если вы хорошо знаете запрос. AQE будет учитывать указания запросов так же, как и статическая оптимизация, но все равно может применять динамические оптимизации, к которым не применяются указания.
В чем разница между указанием объединения с неравномерным распределением и оптимизацией объединения с неравномерным распределением AQE? Что же следует использовать?
Рекомендуется использовать обработку объединений с неравномерным распределением в AQE, а не указания для объединений с неравномерным распределением, так как объединение с неравномерным распределением в AQE выполняется автоматически и в целом более эффективно, чем использование указаний.
Почему AQE не выполняет автоматическое упорядочивание объединений?
Переупорядочение динамического соединения не является частью AQE.
Почему AQE не обнаруживает неравномерное распределение данных?
Существует два условия, которые должны выполняться, чтобы AQE обнаруживало секцию как секцию с неравномерным распределением:
- Размер секции превышает
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
(256 МБ по умолчанию). - Размер секции больше, чем произведение медианы всех секций и коэффициента секции с неравномерным распределением
spark.sql.adaptive.skewJoin.skewedPartitionFactor
(5 по умолчанию).
Кроме того, поддержка обработки неравномерного распределения ограничения для определенных типов объединений. Например, в LEFT OUTER JOIN
можно оптимизировать только неравномерное распределение слева.
Устарело
Термин "адаптивное выполнение" используется с версии Spark 1.6, но новая AQE в Spark 3.0 отличается кардинально. В аспекте функциональности Spark 1.6 выполняет только частью "динамического объединения секций". В аспекте технической архитектуры новое AQE является платформой динамического планирования (в том числе повторного) запросов с учетом статистики во время выполнения, которая поддерживает различные оптимизации (например, описанные в этой статье) и может быть расширена для поддержки еще более эффективных оптимизаций.