Поделиться через


Адаптивное выполнение запроса

Адаптивное выполнение запросов (AQE) — это переоптимизация запросов, которая применяется во время выполнения запроса.

Цель повторной оптимизации среды выполнения заключается в том, чтобы платформа Azure Databricks имела самую актуальную и точную статистику по завершении обмена операциями смешивания и трансляций (называется "этапом запроса" в AQE). В результате Azure Databricks может применять более оптимальную физическую стратегию, подбирать оптимальный размер и число секций после смешивания или выполнять оптимизации, которые ранее требовали указаний, например обработку объединения с неравномерным распределением.

Это может быть очень полезно, если сбор статистики не включен или если статистика устарела. Это также полезно, если статически полученная статистика неточна, например в середине сложного запроса, или после обнаружения неравномерного распределения данных.

Возможности

AQE включен по умолчанию. Они состоят из 4 основных функций:

  • Динамическое изменение объединения слиянием сортированных списков на объединение с трансляцией и хешированием.
  • Динамическое объединение секций (небольших секций в секции подходящего размера) после обмена смешиваниями. Задачи очень маленького размера имеют плохую пропускную способность ввода-вывода и обычно требуют дополнительных ресурсов для планирования и настройки задач. Объединение небольших задач позволяет сэкономить ресурсы и улучшить пропускную способность кластера.
  • Динамическая обработка неравномерного распределения данных в объединении слиянием сортированных списков и объединении со смешиванием и хэшированием путем разделения (и репликации при необходимости) задач с неравномерным распределением в задачи приблизительно равного размера.
  • Динамическое обнаружение и распространение пустых отношений.

Приложение

AQE применяется ко всем запросам, которые:

  • Не используют потоковую передачу.
  • Включают по меньшей мере одну операцию обмена (обычно при наличии объединения, агрегирования или окна), один вложенный запрос или и то, и другое.

Не все запросы с AQE оптимизируются повторно. Повторная оптимизация может (или не может) потребоваться с другим планом запроса, отличающимся от скомпилированного статически. Чтобы определить, был ли изменен план запроса AQE, см. следующий раздел Планы запросов.

Планы запросов

В этом разделе описывается, как вы можете различными способами изучить планы запросов.

В этом разделе рассматриваются следующие вопросы.

Пользовательский интерфейс Spark

Узел AdaptiveSparkPlan

Запросы с применением AQE содержат один или несколько узлов AdaptiveSparkPlan, обычно в виде корневого узла каждого основного запроса или вложенного запроса. Перед выполнением запроса или во время его выполнения флаг isFinalPlan соответствующего узла AdaptiveSparkPlan отображается как false; после выполнения запроса флаг isFinalPlan изменяется на true..

Изменяющийся план

Схема плана запроса изменяется по мере выполнения и отражает актуальный выполняемый план. Узлы, которые уже выполнены (с доступными метриками) не изменяются. Но те, которые еще не выполнены, могут измениться с течением времени в результате повторных оптимизаций.

Ниже приведен пример схемы плана запроса.

Query plan diagram

DataFrame.explain()

Узел AdaptiveSparkPlan

Запросы с применением AQE содержат один или несколько узлов AdaptiveSparkPlan, обычно в виде корневого узла каждого основного запроса или вложенного запроса. Перед выполнением запроса или во время его выполнения флаг isFinalPlan соответствующего узла AdaptiveSparkPlan отображается как false; после выполнения запроса флаг isFinalPlan изменяется на true.

Текущий и первоначальный планы

В каждом узле AdaptiveSparkPlan будет присутствовать первоначальный план (план перед применением оптимизаций AQE) и текущий или окончательный план (в зависимости от того, завершено ли выполнение). Текущий план будет изменяться по ходу выполнения.

Статистика времени выполнения

Каждый этап смешения и трансляции включает статистику по данным.

Перед выполнением этапа или во время его выполнения статистика предоставляется приблизительно на время компиляции, а флаг isRuntime имеет значение false, например: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);.

После завершения этапа статистика включает значения, собранные во время выполнения, а флаг isRuntime изменится на true, например: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true).

Ниже приведен пример DataFrame.explain.

  • Перед выполнением

    Before execution

  • Во время выполнения

    During execution

  • После выполнения

    After execution

SQL EXPLAIN

Узел AdaptiveSparkPlan

Запросы с применением AQE содержат один или несколько узлов AdaptiveSparkPlan, обычно в виде корневого узла каждого основного запроса или вложенного запроса.

Без текущего плана

Так как SQL EXPLAIN не выполняет запрос, текущий план не отличается от первоначального плана и не отражает запрос, который в итоге будет выполнен AQE.

Ниже приведен пример для SQL.

SQL explain

Эффективность

План запроса изменится, если применены одна или несколько оптимизаций AQE. Эффект этих оптимизаций AQE можно хорошо понять по разнице между текущим и окончательным планами и первоначальным планом, а также между определенными узлами плана в текущем и окончательном планах.

  • Динамическое изменение объединения слиянием сортированных списков в объединение с трансляцией и хешированием: разные физические узлы объединения в текущем/окончательном плане и первоначальном плане.

    Join strategy string

  • Динамическое объединение секций: узел CustomShuffleReader со свойством Coalesced.

    Custom shuffle reader

    Custom shuffle reader string

  • Динамическая обработка объединения с неравномерным распределением: узел SortMergeJoin с полем isSkew со значением true.

    Skew join plan

    Skew join string

  • Динамическое обнаружение и распространение пустых отношений: часть плана (или план целиком) заменяется узлом LocalTableScan с пустым полем отношения.

    Local table scan

    Local table scan string

Настройка

В этом разделе рассматриваются следующие вопросы.

Включение и отключение адаптивного выполнения запросов

Свойство
spark.databricks.optimizer.adaptive.enabled

Тип: Boolean

Следует ли включить или выключить адаптивное выполнение запросов.

Значение по умолчанию: true

Включение автооптимизируемого перетасовки

Свойство
spark.sql.shuffle.partitions

Тип: Integer

Число секций по умолчанию, используемых при перетасовки данных для соединений или агрегатов. Установка значения auto позволяет автоматически оптимизировать перетасовку, которая автоматически определяет это число на основе плана запроса и размера входных данных запроса.

Примечание. Для структурированной потоковой передачи эту конфигурацию нельзя изменить между перезапусками запросов из одного расположения проверка point.

Значение по умолчанию: 200

Динамическое изменение объединения слиянием сортированных списков на объединение с трансляцией и хешированием

Свойство
spark.databricks.adaptive.autoBroadcastJoinThreshold

Тип: Byte String

Пороговое значение для запуска перехода на объединение с трансляцией во время выполнения.

Значение по умолчанию: 30MB

Динамическое объединение секций

Свойство
spark.sql.adaptive.coalescePartitions.enabled

Тип: Boolean

Следует ли включить или отключить объединение секций.

Значение по умолчанию: true
spark.sql.adaptive.advisoryPartitionSizeInBytes

Тип: Byte String

Целевой размер после объединения. Размеры объединенных секций не могут превышать этот размер.

Значение по умолчанию: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize

Тип: Byte String

Минимальный размер секций после объединения. Размеры объединенных секций не могут быть меньше этого размера.

Значение по умолчанию: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum

Тип: Integer

Минимальное число секций после объединения. Не рекомендуется, так как этот параметр явно переопределяет
spark.sql.adaptive.coalescePartitions.minPartitionSize.

Значение по умолчанию: 2 x число ядер кластеров

Динамическая обработка объединения с неравномерным распределением

Свойство
spark.sql.adaptive.skewJoin.enabled

Тип: Boolean

Следует ли включить или отключить обработку объединения с неравномерным распределением.

Значение по умолчанию: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor

Тип: Integer

Коэффициент, который при умножении на медиану размера секции позволяет определить, присутствует ли в секции неравномерное распределение.

Значение по умолчанию: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Тип: Byte String

Порог, помогающий определить, равномерно ли распределены данные в секции.

Значение по умолчанию: 256MB

Секция имеет неравномерное распределение, если (partition size > skewedPartitionFactor * median partition size) и (partition size > skewedPartitionThresholdInBytes) имеют значение true.

Динамическое обнаружение и распространение пустых отношений

Свойство
spark.databricks.adaptive.emptyRelationPropagation.enabled

Тип: Boolean

Позволяет включить или отключить динамическое распространение пустых отношений.

Значение по умолчанию: true

Часто задаваемые вопросы

В этом разделе рассматриваются следующие вопросы.

Почему AQE не транслирует таблицу с небольшим объединением?

Если размер отношения для трансляции не превышает этот порог, но все равно не транслируется, сделайте следующее:

  • Проверьте тип объединения. Трансляция не поддерживается для некоторых типов объединений, например левое отношение LEFT OUTER JOIN не транслируется.
  • Также может быть, что отношение содержит большое число пустых секций. В этом случае большинство задач могут быстро завершаться с объединением слиянием сортированных списков или потенциально могут быть оптимизированы с помощью обработки объединений с неравномерным распределением. AQE избегает изменения таких объединений слиянием сортированных списков на объединения с трансляцией и хешированием, если доля непустых секций меньше spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Следует ли мне использовать указание стратегии объединения с трансляцией с включенным AQE?

Да. Статически запланированное объединение с трансляцией обычно демонстрирует более высокую производительность, чем динамически запланированное AQE, так как AQE может не переключаться на объединение с трансляцией, пока не будет выполнено смешение для обеих сторон объединения (к этому времени будут получены фактические размеры отношения). Поэтому использование указания трансляции может все равно быть оптимальным, если вы хорошо знаете запрос. AQE будет учитывать указания запросов так же, как и статическая оптимизация, но все равно может применять динамические оптимизации, к которым не применяются указания.

В чем разница между указанием объединения с неравномерным распределением и оптимизацией объединения с неравномерным распределением AQE? Что же следует использовать?

Рекомендуется использовать обработку объединений с неравномерным распределением в AQE, а не указания для объединений с неравномерным распределением, так как объединение с неравномерным распределением в AQE выполняется автоматически и в целом более эффективно, чем использование указаний.

Почему AQE не выполняет автоматическое упорядочивание объединений?

Переупорядочение динамического соединения не является частью AQE.

Почему AQE не обнаруживает неравномерное распределение данных?

Существует два условия, которые должны выполняться, чтобы AQE обнаруживало секцию как секцию с неравномерным распределением:

  • Размер секции превышает spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (256 МБ по умолчанию).
  • Размер секции больше, чем произведение медианы всех секций и коэффициента секции с неравномерным распределением spark.sql.adaptive.skewJoin.skewedPartitionFactor (5 по умолчанию).

Кроме того, поддержка обработки неравномерного распределения ограничения для определенных типов объединений. Например, в LEFT OUTER JOIN можно оптимизировать только неравномерное распределение слева.

Устарело

Термин "адаптивное выполнение" используется с версии Spark 1.6, но новая AQE в Spark 3.0 отличается кардинально. В аспекте функциональности Spark 1.6 выполняет только частью "динамического объединения секций". В аспекте технической архитектуры новое AQE является платформой динамического планирования (в том числе повторного) запросов с учетом статистики во время выполнения, которая поддерживает различные оптимизации (например, описанные в этой статье) и может быть расширена для поддержки еще более эффективных оптимизаций.