Ejecución de consultas adaptables

La ejecución de consultas adaptables (AQE) es la re-optimización de consultas que se produce durante la ejecución de la consulta.

La motivación para la re-optimización en tiempo de ejecución es que Azure Databricks tiene las estadísticas más precisas actualizadas al final de un intercambio aleatorio y de difusión (denominado fase de consulta en AQE). Como resultado, Azure Databricks puede optar por una mejor estrategia física, elegir un número y un tamaño de partición óptimos posteriores al orden aleatorio, o realizar optimizaciones para las que antes era necesario sugerencias, por ejemplo, para el control de la combinación de sesgo.

Esto puede ser muy útil cuando la recopilación de estadísticas no está activada o cuando las estadísticas están obsoletas. También es útil en lugares donde las estadísticas derivadas de forma estática son inexactas, como en medio de una consulta complicada, o después de la aparición de asimetría de datos.

Funcionalidades

AQE está habilitado de forma predeterminada. Tiene cuatro características principales:

  • Cambia dinámicamente la fusión mediante combinación de ordenación a combinación de hash de difusión.
  • Fusiona las particiones de manera dinámica (las particiones pequeñas en otras de tamaño razonable) después del intercambio aleatorio. Las tareas muy pequeñas tienen un rendimiento de E/S peor y tienden a sufrir más por la sobrecarga de programación y de configuración de tareas. La combinación de tareas pequeñas ahorra recursos y mejora el rendimiento del clúster.
  • Controla dinámicamente el sesgo en la fusión mediante combinación de ordenación y la combinación de hash aleatoria mediante la división (y replicación si es necesario) de las tareas sesgadas en tareas de tamaño uniforme aproximadamente.
  • Detecta y propaga dinámicamente relaciones vacías.

Application

AQE se aplica a todas las consultas que:

  • No son de streaming
  • Contienen al menos un intercambio (normalmente cuando hay una combinación, un agregado o una ventana), una subconsulta o las dos cosas.

No todas las consultas con AQE aplicado se vuelven a optimizar necesariamente. Es posible que la re-optimización genere un plan de consulta diferente al compilado de manera estática. Para determinar si AQE ha cambiado el plan de una consulta, vea la sección siguiente, Planes de consulta.

Planes de consulta

En esta sección se describe cómo puede examinar los planes de consulta de distintas maneras.

En esta sección:

Interfaz de usuario de Spark

Nodo AdaptiveSparkPlan

Las consultas aplicadas a AQE contienen uno o varios nodos AdaptiveSparkPlan, normalmente como el nodo raíz de cada consulta principal o subconsulta. Antes de que se ejecute la consulta o cuando esté en ejecución, la marca isFinalPlan del nodo AdaptiveSparkPlan correspondiente se muestra como false; una vez que se completa la ejecución de la consulta, la marca isFinalPlan cambia a true..

Plan en evolución

El diagrama del plan de consulta evoluciona a medida que avanza la ejecución y refleja el plan más actual que se ejecuta. Los nodos que ya se han ejecutado (en los que hay métricas disponibles) no cambiarán, pero los que no lo han hecho pueden cambiar en el tiempo como resultado de las optimizaciones.

A continuación se muestra un ejemplo de diagrama de un plan de consulta:

Query plan diagram

DataFrame.explain()

Nodo AdaptiveSparkPlan

Las consultas aplicadas a AQE contienen uno o varios nodos AdaptiveSparkPlan, normalmente como el nodo raíz de cada consulta principal o subconsulta. Antes de que se ejecute la consulta o cuando esté en ejecución, la marca isFinalPlan del nodo AdaptiveSparkPlan correspondiente se muestra como false; una vez que se completa la ejecución de la consulta, la marca isFinalPlan cambia a true.

Plan actual e inicial

En cada nodo AdaptiveSparkPlan se mostrará tanto el plan inicial (el plan antes de aplicar las optimizaciones de AQE) como el plan actual o final, en función de si la ejecución se ha completado. El plan actual evolucionará a medida que avance la ejecución.

Estadísticas en tiempo de ejecución

Cada fase de orden aleatorio y difusión contiene estadísticas de datos.

Antes de que se ejecute la fase o cuando está en ejecución, las estadísticas son estimaciones en tiempo de compilación y la marca isRuntime es false, por ejemplo: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);.

Una vez que se completa la ejecución de la fase, las estadísticas son las recopiladas en tiempo de ejecución y la marca isRuntime se convertirá en true, por ejemplo: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true).

Este es un ejemplo de DataFrame.explain:

  • Antes de la ejecución

    Before execution

  • Durante la ejecución

    During execution

  • Después de la ejecución

    After execution

SQL EXPLAIN

Nodo AdaptiveSparkPlan

Las consultas con AQE aplicado contienen uno o varios nodos AdaptiveSparkPlan, normalmente como el nodo raíz de cada consulta principal o subconsulta.

No hay ningún plan actual

Como SQL EXPLAIN no ejecuta la consulta, el plan actual siempre es el mismo que el plan inicial y no refleja lo que AQE ejecutaría finalmente.

A continuación se muestra un ejemplo de explain en SQL:

SQL explain

Eficacia

El plan de consulta cambiará si una o varias optimizaciones de AQE tienen efecto. El efecto de estas optimizaciones de AQE se muestra en la diferencia entre los planes actual y final, y el plan inicial y los nodos de plan específicos de los planes actuales y finales.

  • Cambio dinámico de la fusión mediante combinación de ordenación en combinación de hash de difusión: distintos nodos de combinación física entre el plan actual o final, y el plan inicial

    Join strategy string

  • Fusión dinámica de particiones: nodo CustomShuffleReader con la propiedad Coalesced

    Custom shuffle reader

    Custom shuffle reader string

  • Control dinámico de la combinación de sesgo: nodo SortMergeJoin con el campo isSkew como true.

    Skew join plan

    Skew join string

  • Detección y propagación dinámica de relaciones vacías: el nodo LocalTableScan reemplaza parte del plan (o todo) con el campo de relación como vacío.

    Local table scan

    Local table scan string

Configuración

En esta sección:

Habilitación y deshabilitación de la ejecución de consultas adaptables

Propiedad
spark.databricks.optimizer.adaptive.enabled

Escriba: Boolean

Indica si se habilita o deshabilita la ejecución de consultas adaptables.

Valor predeterminado: true

Habilitación del orden aleatorio de optimización automática

Propiedad
spark.sql.shuffle.partitions

Escriba: Integer

Número predeterminado de particiones que se van a usar al ordenar aleatoriamente datos para combinaciones o agregaciones. Al establecer el valor auto, se habilitará el orden aleatorio de optimización automática, que determina automáticamente este número en función del plan de consulta y del tamaño de los datos de entrada de la consulta.

Nota: para flujos estructurados, esta configuración no se puede cambiar entre reinicios de consulta desde la misma ubicación del punto de control.

Valor predeterminado: 200

Cambio dinámico de la fusión mediante combinación de ordenación a combinación de hash de difusión

Propiedad
spark.databricks.adaptive.autoBroadcastJoinThreshold

Escriba: Byte String

Umbral para desencadenar el cambio a la combinación de difusión en tiempo de ejecución.

Valor predeterminado: 30MB

Fusión dinámica de particiones

Propiedad
spark.sql.adaptive.coalescePartitions.enabled

Escriba: Boolean

Si se habilita o deshabilita la fusión de particiones.

Valor predeterminado: true
spark.sql.adaptive.advisoryPartitionSizeInBytes

Escriba: Byte String

Tamaño de destino después de la fusión. Los tamaños de partición fusionados tendrá un valor cercano, pero no más grandes que este tamaño de destino.

Valor predeterminado: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize

Escriba: Byte String

Tamaño mínimo de las particiones después de la fusión. Los tamaños de partición fusionados no serán menores que este tamaño.

Valor predeterminado: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum

Escriba: Integer

Número mínimo de particiones después de la fusión. No se recomienda, porque el valor lo invalida explícitamente
spark.sql.adaptive.coalescePartitions.minPartitionSize.

Valor predeterminado: el doble del número de núcleos de clúster

Control dinámico de la combinación de sesgo

Propiedad
spark.sql.adaptive.skewJoin.enabled

Escriba: Boolean

Si se habilita o deshabilita el control de la combinación de sesgo.

Valor predeterminado: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor

Escriba: Integer

Factor que, cuando se multiplica por el tamaño medio de la partición, contribuye a determinar si una partición está sesgada.

Valor predeterminado: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Escriba: Byte String

Umbral que contribuye a determinar si una partición está sesgada.

Valor predeterminado: 256MB

Una partición se considera sesgada cuando (partition size > skewedPartitionFactor * median partition size) y (partition size > skewedPartitionThresholdInBytes) son true.

Detección y propagación dinámica de relaciones vacías

Propiedad
spark.databricks.adaptive.emptyRelationPropagation.enabled

Escriba: Boolean

Si se habilita o deshabilita la propagación dinámica de relaciones vacías.

Valor predeterminado: true

Preguntas más frecuentes

En esta sección:

¿Por qué AQE no ha difundido una tabla de combinación pequeña?

Si el tamaño de la relación que se espera que se difunda se encuentra por debajo de este umbral, pero todavía no se difunde:

  • Compruebe el tipo de combinación. La difusión no se admite para determinados tipos de combinación, por ejemplo, la relación izquierda de una operación LEFT OUTER JOIN no se puede difundir.
  • También es posible que la relación contenga una gran cantidad de particiones vacías, en cuyo caso la mayoría de las tareas pueden finalizar rápidamente con la fusión mediante combinación de ordenación o se puede optimizar con el control de la combinación sesgada. AQE evita cambiar estas fusiones mediante combinación de ordenación por combinaciones de hash de difusión si el porcentaje de particiones no vacías es inferior a spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

¿Se debe seguir usando una sugerencia de estrategia de combinación de difusión con AQE habilitado?

Sí. Una combinación de difusión planeada estáticamente suele ser más efectiva que una planeada de forma dinámica por parte de AQE, ya que es posible que AQE no cambie a la combinación de difusión hasta después de realizar el orden aleatorio para ambos lados de la combinación (momento en el que se obtienen los tamaños de relación reales). Por tanto, el uso de una sugerencia de difusión puede seguir siendo una buena opción si conoce bien la consulta. AQE respetará las sugerencias de consulta igual que lo hace la optimización estática, pero todavía puede aplicar optimizaciones dinámicas que no se ven afectadas por las sugerencias.

¿Cuál es la diferencia entre la sugerencia de combinación de sesgo y la optimización de combinación de sesgo de AQE? ¿Cuál debo usar?

Se recomienda confiar en el control de la combinación de sesgo de AQE en lugar de usar la sugerencia de combinación de sesgo, ya que la combinación de sesgo de AQE es completamente automática y, en general, funciona mejor que la de la sugerencia.

¿Por qué AQE no ha ajustado el orden de combinación automáticamente?

La reordenación de combinaciones dinámicas no forma parte de AQE.

¿Por qué AQE no ha detectado la asimetría de datos?

Hay dos condiciones de tamaño que se deben cumplir para que AQE detecte una partición como una partición sesgada:

  • El tamaño de la partición es mayor que spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (el valor predeterminado es de 256 MB).
  • El tamaño de la partición es mayor que el tamaño medio de todas las particiones multiplicado por el factor de partición sesgada spark.sql.adaptive.skewJoin.skewedPartitionFactor (el valor predeterminado es 5)

Además, la compatibilidad con el control del sesgo está limitada para determinados tipos de combinación, por ejemplo, en LEFT OUTER JOIN solo se puede optimizar el sesgo del lado izquierdo.

Heredado

El término "Ejecución adaptable" existe desde Spark 1.6, pero el nuevo AQE en Spark 3.0 es fundamentalmente diferente. En términos de funcionalidad, Spark 1.6 solo realiza la parte de "fusión dinámica de particiones". En términos de arquitectura técnica, el nuevo AQE es un marco de planificación y replanificación dinámicas de consultas basado en estadísticas en tiempo de ejecución, que admite una variedad de optimizaciones como las que se han descrito en este artículo y que se puede ampliar para habilitar más optimizaciones.