Compartir a través de


Ejecución de consultas adaptables

La ejecución de consultas adaptables (AQE) es la reoptimización de consultas que se produce durante el procesamiento de consultas.

La motivación para la optimización en tiempo de ejecución es que Azure Databricks tiene las estadísticas más precisas de fecha up-toal final de un intercambio aleatorio y de difusión (denominado fase de consulta en AQE). Como resultado, Azure Databricks puede optar por una mejor estrategia física, elegir un tamaño y un número óptimos de partición posteriores a la mezcla, o realizar optimizaciones que solían requerir sugerencias, por ejemplo, la gestión de uniones sesgadas.

Esto puede ser muy útil cuando la recopilación de estadísticas no está activada o cuando las estadísticas están obsoletas. También es útil en lugares donde las estadísticas derivadas estáticamente son inexactas, como en medio de una consulta complicada, o después de la aparición de la asimetría de datos.

Capacidades

AQE está habilitado de forma predeterminada. Tiene cuatro características principales:

  • Cambia dinámicamente la combinación de combinación de ordenación en combinación hash de difusión.
  • Combina dinámicamente las particiones (combinan particiones pequeñas en particiones de tamaño razonable) después del intercambio aleatorio. Las tareas muy pequeñas tienen un rendimiento de E/S peor y tienden a sufrir más de la sobrecarga de programación y la sobrecarga de configuración de tareas. La combinación de tareas pequeñas ahorra recursos y mejora el rendimiento del clúster.
  • Gestiona dinámicamente el sesgo en las uniones de combinación de ordenación y hash aleatorio dividiendo (y replicando si es necesario) las tareas sesgadas en tareas de tamaño aproximadamente uniforme.
  • Detecta y propaga dinámicamente relaciones vacías.

Aplicación

AQE se aplica a todas las consultas que son:

  • Sin transmisión
  • Contiene al menos un intercambio (normalmente cuando hay una combinación, un agregado o una ventana), una subconsulta, o ambos.

No todas las consultas aplicadas por AQE se vuelven a optimizar necesariamente. La reoptimización puede dar lugar a un plan de consulta diferente al compilado estáticamente. Para determinar si AQE ha cambiado el plan de una consulta, consulte la sección siguiente, Planes de consulta.

Planes de consulta

En esta sección se describe cómo puede examinar los planes de consulta de diferentes maneras.

En esta sección:

Interfaz de usuario de Spark

AdaptiveSparkPlan nodo

Las consultas aplicadas a AQE contienen uno o varios AdaptiveSparkPlan nodos, normalmente como nodo raíz de cada consulta principal o subconsulta. Antes de que se ejecute la consulta o cuando se esté ejecutando, la isFinalPlan marca del nodo correspondiente AdaptiveSparkPlan se muestra como false; una vez completada la ejecución de la consulta, la isFinalPlan marca cambia a . true.

Plan en evolución

El diagrama del plan de consulta evoluciona a medida que avanza la ejecución y refleja el plan más actual que se está ejecutando. Los nodos que ya se han ejecutado (en los que las métricas están disponibles) no cambiarán, pero aquellos que no lo han hecho pueden cambiar con el tiempo al ser reoptimizados.

A continuación se muestra un ejemplo de diagrama de plan de consulta:

Diagrama del plan de consulta

DataFrame.explain()

AdaptiveSparkPlan nodo

Las consultas aplicadas a AQE contienen uno o varios AdaptiveSparkPlan nodos, normalmente como nodo raíz de cada consulta principal o subconsulta. Antes de que se ejecute la consulta o cuando se ejecute, la isFinalPlan marca del nodo correspondiente AdaptiveSparkPlan se muestra como false; una vez completada la ejecución de la consulta, la isFinalPlan marca cambia a true.

Plan actual e inicial

En cada AdaptiveSparkPlan nodo habrá el plan inicial (el plan antes de aplicar las optimizaciones de AQE) y el plan actual o final, en función de si la ejecución se ha completado. El plan actual evolucionará a medida que avanza la ejecución.

Estadísticas de tiempo de ejecución

Cada fase de orden aleatorio y de difusión contiene estadísticas de datos.

Antes de que se ejecute la etapa o durante su ejecución, las estadísticas son estimaciones en tiempo de compilación, y el indicador isRuntime es false, por ejemplo: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Una vez completada la ejecución de la fase, las estadísticas son las recopiladas en tiempo de ejecución y la marca isRuntime se convertirá en true, por ejemplo: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

A continuación se muestra un DataFrame.explain ejemplo:

  • Antes de la ejecución

    Antes de la ejecución

  • Durante la ejecución

    Durante la ejecución

  • Después de la ejecución

    Después de la ejecución

SQL EXPLAIN

AdaptiveSparkPlan nodo

Las consultas aplicadas a AQE contienen uno o varios nodos AdaptiveSparkPlan, normalmente como nodo raíz de cada consulta principal o subconsulta.

Ningún plan actual

Como SQL EXPLAIN no ejecuta la consulta, el plan actual es siempre el mismo que el plan inicial y no refleja lo que finalmente se ejecutaría con AQE.

A continuación se muestra un ejemplo de explicación de SQL:

Explicación de SQL

Efectividad

El plan de consulta cambiará si una o varias optimizaciones de AQE surten efecto. El efecto de estas optimizaciones de AQE se demuestra por la diferencia entre los planes actuales y finales y el plan inicial y los nodos de plan específicos en los planes actuales y finales.

  • Cambiar dinámicamente el tipo de combinación de ordenación a combinación hash de difusión: diferentes nodos de combinación física entre el plan actual y el plan inicial.

    Cadena de estrategia de unión

  • Particiones de fusión dinámica: nodo CustomShuffleReader con propiedad Coalesced

    Lector de mezcla personalizado

    Cadena de lector aleatorio personalizada

  • Controlar dinámicamente la combinación de asimetría: nodo SortMergeJoin con el campo isSkew como true.

    Plan de combinación sesgada

    Cadena de combinación sesgada

  • Detectar y propagar dinámicamente relaciones vacías: parte (o la totalidad) del plan es reemplazada por el nodo LocalTableScan con el campo de relación vacío.

    Escaneo de tabla local

    Cadena de escaneo de tabla local

Configuración

En esta sección:

Habilitación y deshabilitación de la ejecución de consultas adaptables

Propiedad
spark.databricks.optimizer.adaptive.enabled
Tipo: Boolean
Si se va a habilitar o deshabilitar la ejecución de consultas adaptables.
Valor predeterminado: true

Habilitar mezcla aleatoria autooptimizada

Propiedad
spark.sql.shuffle.partitions
Tipo: Integer
Número predeterminado de particiones que se van a usar al ordenar aleatoriamente datos para combinaciones o agregaciones. Si se establece el valor auto , se habilita el orden aleatorio optimizado automáticamente, que determina automáticamente este número en función del plan de consulta y del tamaño de los datos de entrada de la consulta.
Nota: Para Structured Streaming, esta configuración no se puede cambiar entre reinicios de consulta desde la misma ubicación de punto de control.
Valor predeterminado: 200

Cambiar dinámicamente la combinación de combinación de ordenación en combinación hash de difusión

Propiedad
spark.databricks.adaptive.autoBroadcastJoinThreshold
Tipo: Byte String
Umbral para desencadenar la conmutación a la unión de difusión en tiempo de ejecución.
Valor predeterminado: 30MB

Fusión dinámica de particiones

Propiedad
spark.sql.adaptive.coalescePartitions.enabled
Tipo: Boolean
Si se debe habilitar o deshabilitar la fusión de particiones.
Valor predeterminado: true
spark.sql.adaptive.advisoryPartitionSizeInBytes
Tipo: Byte String
Tamaño de destino después de la coalescencia. Los tamaños de las particiones fusionados estarán cerca, pero no más grandes que este tamaño objetivo.
Valor predeterminado: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize
Tipo: Byte String
Tamaño mínimo de las particiones después de la fusión. Los tamaños de partición fusionados no serán menores que este tamaño.
Valor predeterminado: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum
Tipo: Integer
Número mínimo de particiones después de la fusión. No se recomienda, ya que la configuración anula explícitamente
spark.sql.adaptive.coalescePartitions.minPartitionSize.
Valor predeterminado: 2x no. de núcleos de clúster

Control dinámico de la combinación de asimetría

Propiedad
spark.sql.adaptive.skewJoin.enabled
Tipo: Boolean
Si se va a habilitar o deshabilitar el manejo de combinaciones sesgadas.
Valor predeterminado: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor
Tipo: Integer
Factor que cuando se multiplica por el tamaño de partición mediana contribuye a determinar si una partición está sesgada.
Valor predeterminado: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
Tipo: Byte String
Umbral que contribuye a determinar si una partición está sesgada.
Valor predeterminado: 256MB

Una partición se considera sesgada cuando ambos (partition size > skewedPartitionFactor * median partition size) y (partition size > skewedPartitionThresholdInBytes) son true.

Detección y propagación dinámica de relaciones vacías

Propiedad
spark.databricks.adaptive.emptyRelationPropagation.enabled
Tipo: Boolean
Si se va a habilitar o deshabilitar la propagación dinámica de relaciones vacías.
Valor predeterminado: true

Preguntas más frecuentes (FAQ)

En esta sección:

¿Por qué AQE no transmitió una pequeña tabla de unión?

Si el tamaño de la relación que se espera que se difunda está por debajo de este umbral, pero sigue sin difundirse:

  • Compruebe el tipo de combinación. La difusión no se admite para ciertos tipos de combinación; por ejemplo, la relación izquierda de un LEFT OUTER JOIN no se puede transmitir.
  • También puede ocurrir que la relación contenga una gran cantidad de particiones vacías, en cuyo caso la mayoría de las tareas puede finalizar rápidamente con una combinación de ordenación o puede optimizarse potencialmente con un manejo de combinaciones sesgadas. AQE evita cambiar estas combinaciones de combinación de ordenación para difundir combinaciones hash si el porcentaje de particiones no vacías es inferior a spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

¿Debo seguir usando una sugerencia de estrategia de combinación de difusión con AQE habilitado?

Sí. Una unión de difusión planeada estáticamente suele ser más eficaz que una planeada dinámicamente por AQE, ya que AQE podría no cambiar a la unión de difusión hasta después de realizar la reorganización para ambos lados de la unión, momento en el cual se obtienen los tamaños de relación reales. Por lo tanto, el uso de una sugerencia de difusión puede ser una buena opción si conoce bien la consulta. AQE respetará las sugerencias de consulta de la misma manera que la optimización estática, pero todavía puede aplicar optimizaciones dinámicas que no se ven afectadas por las sugerencias.

¿Cuál es la diferencia entre la sugerencia para combinación asimétrica y la optimización para combinación asimétrica de AQE? ¿Cuál debo usar?

Se recomienda confiar en el control de combinaciones de asimetría de AQE en lugar de usar la sugerencia de combinación de asimetría, ya que la combinación de asimetría de AQE es completamente automática y, en general, funciona mejor que el homólogo de sugerencia.

¿Por qué AQE no ajustó automáticamente mi orden de unión?

La reordenación de combinaciones dinámicas no forma parte de AQE.

¿Por qué AQE no detectó mi asimetría de datos?

Hay dos condiciones de tamaño que deben cumplirse para que AQE detecte una partición como una partición sesgada:

  • El tamaño de la partición es mayor que ( spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes valor predeterminado de 256 MB)
  • El tamaño de la partición es mayor que el tamaño medio de todas las particiones multiplicado por el factor de partición sesgada spark.sql.adaptive.skewJoin.skewedPartitionFactor (valor predeterminado: 5)

Además, la compatibilidad con el control de sesgos está limitada para determinados tipos de combinación, por ejemplo, en LEFT OUTER JOIN, solo se puede optimizar la asimetría en el lado izquierdo.

Legado

El término "Ejecución adaptable" ha existido desde Spark 1.6, pero el nuevo AQE en Spark 3.0 es fundamentalmente diferente. En términos de funcionalidad, Spark 1.6 solo realiza la función de "agrupar dinámicamente particiones". En términos de arquitectura técnica, el nuevo AQE es un marco de planeamiento dinámico y replanado de consultas basadas en estadísticas en tiempo de ejecución, que admite una variedad de optimizaciones como las que hemos descrito en este artículo y se puede ampliar para habilitar más optimizaciones potenciales.