Compartilhar via


Execução de consulta adaptável

A AQE (execução de consulta adaptável) é a reotimização da consulta que ocorre durante a execução da consulta.

A motivação para a reotimização do tempo de execução é que o Azure Databricks tem as estatísticas precisas mais atualizadas no final de um intercâmbio de ordem aleatória e de difusão (chamado de estágio de consulta na AQE). Como resultado, o Azure Databricks pode optar por uma estratégia física melhor, escolher o tamanho e o número da partição de ordem aleatória ideal, ou as otimizações usadas para exigir dicas, por exemplo, manipulação de junção de inclinação.

Isso poderá ser muito útil quando a coleta de estatísticas não estiver ativada ou quando as estatísticas estiverem obsoletas. Também será útil em locais onde estatísticas derivadas estaticamente são imprecisas, como no meio de uma consulta complicada ou após a ocorrência de distorção de dados.

Funcionalidades

O AQE está habilitado por padrão. Ela tem quatro recursos principais:

  • Altera dinamicamente a junção de mesclagem de classificação na junção hash de difusão.
  • Une partições dinamicamente (combine partições pequenas em partições de tamanho razoável) após a troca em ordem aleatória. As tarefas muito pequenas têm pior taxa de transferência de E/S e tendem a sofrer mais do que o agendamento da sobrecarga e da sobrecarga da configuração da tarefa. A combinação de tarefas pequenas salva recursos e melhora a taxa de transferência do cluster.
  • Manipula dinamicamente a distorção em classificar junção de mesclagem e junção hash aleatória dividindo (e replicando se necessário) tarefas distorcidas em tarefas de tamanho quase uniforme.
  • Detecta e propaga dinamicamente as relações vazias.

Aplicativo

A AQE aplica-se a todas as consultas:

  • Não streaming
  • Que contêm pelo menos uma troca (geralmente, quando há uma junção, agregação ou janela), uma subconsulta ou ambas.

Nem todas as consultas aplicadas à AQE são necessariamente reotimizadas. A nova otimização pode ou não surgir com um plano de consulta diferente daquele compilado estaticamente. Para determinar se o plano de uma consulta foi alterado por AQE, confira a seção a seguir, Planos de consulta.

Planos de consultas

Esta seção discute como você pode examinar os planos de consulta de maneiras diferentes.

Nesta seção:

Interface do usuário do Spark

AdaptiveSparkPlan

As consultas aplicadas à AQE contêm um ou mais nós AdaptiveSparkPlan, geralmente como o nó raiz de cada consulta principal ou subconsulta. Antes que a consulta seja executada ou quando estiver em execução, o sinalizador isFinalPlan do nó AdaptiveSparkPlan correspondente será mostrado como false; depois que a execução da consulta for concluída, o sinalizador isFinalPlan será alterado para true.

Plano em evolução

O diagrama do plano de consulta evolui à medida que a execução progride e reflete o plano mais atual que está sendo executado. Os nós que já foram executados (nos quais há métricas disponíveis) não serão alterados, mas aqueles que não podem mudar ao longo do tempo como resultado de reotimizações.

Este é um exemplo de diagrama de plano de consulta:

Query plan diagram

DataFrame.explain()

AdaptiveSparkPlan

As consultas aplicadas à AQE contêm um ou mais nós AdaptiveSparkPlan, geralmente como o nó raiz de cada consulta principal ou subconsulta. Antes que a consulta seja executada ou quando estiver em execução, o sinalizador isFinalPlan do nó AdaptiveSparkPlan correspondente será mostrado como false; depois que a execução da consulta for concluída, o sinalizador isFinalPlan será alterado para true.

Plano atual e inicial

Em cada nó AdaptiveSparkPlan, haverá o plano inicial (o plano antes de aplicar qualquer otimização da AQE) e o plano atual ou final, dependendo se a execução foi concluída. O plano atual evoluirá à medida que a execução for progredida.

Estatísticas de tempo de execução

Cada estágio de difusão e ordem aleatória contém estatísticas de dados.

Antes que o estágio seja executado ou quando o estágio estiver em execução, as estatísticas serão estimativas de tempo de compilação e o sinalizador isRuntime será false, por exemplo: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Depois que a execução do estágio for concluída, as estatísticas serão aquelas coletadas em tempo de execução e o sinalizador isRuntime se tornará true, por exemplo: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

A seguir, é mostrado um exemplo de DataFrame.explain:

  • Antes da execução

    Before execution

  • Durante a execução

    During execution

  • Após a execução

    After execution

SQL EXPLAIN

AdaptiveSparkPlan

As consultas aplicadas à AQE contêm um ou mais nós AdaptiveSparkPlan, geralmente como o nó raiz de cada consulta principal ou subconsulta.

Sem plano atual

Como SQL EXPLAIN não executa a consulta, o plano atual é sempre o mesmo que o plano inicial e não reflete o que eventualmente seria executado pela AQE.

A seguir, um exemplo explicado por SQL:

SQL explain

Eficácia

O plano de consulta será alterado se uma ou mais otimizações da AQE entrarem em vigor. O efeito dessas otimizações da AQE é demonstrado pela diferença entre os planos atual e final e os nós de plano inicial e plano específico nos planos atual e final.

  • Alterar dinamicamente a junção de mesclagem de classificação em junção hash de difusão: nós de junção físicas diferentes entre o plano atual/final e o plano inicial

    Join strategy string

  • Unindo partições dinamicamente: nó CustomShuffleReader com a propriedade Coalesced

    Custom shuffle reader

    Custom shuffle reader string

  • Manipule dinamicamente a junção de inclinação: nó SortMergeJoin com campo isSkew como verdadeiro.

    Skew join plan

    Skew join string

  • Detectar e propagar dinamicamente relações vazias: parte de (ou todo) o plano é substituído pelo nó LocalTableScan com o campo de relação como vazio.

    Local table scan

    Local table scan string

Configuração

Nesta seção:

Habilitar e desabilitar a execução de consulta adaptável

Propriedade
spark.databricks.optimizer.adaptive.enabled

Digite: Boolean

Se a execução da consulta adaptável deve ser habilitada ou desabilitada.

Valor padrão: true

Habilitar o embaralhamento otimizado automaticamente

Propriedade
spark.sql.shuffle.partitions

Digite: Integer

O número padrão de partições a serem usadas ao embaralhar dados para junções ou agregações. A configuração do valor auto habilita a ordem aleatória com otimização automática, que determina automaticamente esse número com base no plano de consulta e no tamanho dos dados de entrada da consulta.

Observação: para o Fluxo Estruturado, essa configuração não pode ser alterada entre as reinicializações de consulta do mesmo local de ponto de verificação.

Valor padrão: 200

Alterar dinamicamente a junção de mesclagem de classificação em junção hash de difusão

Propriedade
spark.databricks.adaptive.autoBroadcastJoinThreshold

Digite: Byte String

O limite para disparar a alternância para a junção de difusão no tempo de execução.

Valor padrão: 30MB

Unir partições dinamicamente

Propriedade
spark.sql.adaptive.coalescePartitions.enabled

Digite: Boolean

Se a união de partição deve ser habilitada ou desabilitada.

Valor padrão: true
spark.sql.adaptive.advisoryPartitionSizeInBytes

Digite: Byte String

O tamanho do destino após a união. Os tamanhos de partição unidas serão próximos, mas não maiores que esse tamanho de destino.

Valor padrão: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize

Digite: Byte String

O tamanho mínimo de partições após a união. Os tamanhos de partição unidas não serão menores do que esse tamanho.

Valor padrão: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum

Digite: Integer

O tamanho mínimo de partições após a união. Não recomendado, porque a configuração substitui explicitamente
spark.sql.adaptive.coalescePartitions.minPartitionSize.

Valor padrão: 2x o número de núcleos do cluster

Manipular dinamicamente a junção de distorção

Propriedade
spark.sql.adaptive.skewJoin.enabled

Digite: Boolean

Se a manipulação de junção de distorção deve ser habilitada ou desabilitada.

Valor padrão: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor

Digite: Integer

Um fator que, quando multiplicado pelo tamanho da partição mediana, contribui para determinar se uma partição está distorcida.

Valor padrão: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Digite: Byte String

Um limite que contribui para determinar se uma partição é distorcida.

Valor padrão: 256MB

Uma partição é considerada distorcida quando (partition size > skewedPartitionFactor * median partition size) e (partition size > skewedPartitionThresholdInBytes) são true.

Detectar e propagar relações vazias dinamicamente

Propriedade
spark.databricks.adaptive.emptyRelationPropagation.enabled

Digite: Boolean

Se a propagação de relação vazia dinâmica deve ser habilitada ou desabilitada.

Valor padrão: true

Perguntas frequentes (FAQ)

Nesta seção:

Por que a AQE não transmitiu uma pequena tabela de junção?

Se o tamanho da relação esperada para difusão estiver abaixo desse limite, mas ainda não for transmitido:

  • Verifique o tipo de junção. Não há suporte para a transmissão para determinados tipos de junção, por exemplo, a relação à esquerda de um LEFT OUTER JOIN não pode ser transmitida.
  • Também pode ser que a relação contenha muitas partições vazias; nesse caso, a maioria das tarefas pode ser concluída rapidamente com a junção de mesclagem de classificação ou potencialmente pode ser otimizada com manipulação de junção de distorção. A AQE evitará alterar essas junções de mesclagem de classificação para junções de hash de difusão se o percentual de partições não vazias for menor que spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Ainda devo usar uma dica de estratégia de junção de difusão com a AQE habilitada?

Sim. Uma junção de difusão planejada estaticamente geralmente é mais bem-desempenho do que uma planejada dinamicamente pelo AQE, pois o AQE pode não mudar para a junção de difusão até depois de executar o embaralhamento para ambos os lados da junção (quando os tamanhos reais da relação são obtidos). Portanto, usar uma dica de difusão ainda poderá ser uma boa opção se você conhecer bem a consulta. A AQE respeitará as dicas de consulta da mesma maneira que a otimização estática, mas ainda poderá aplicar otimizações dinâmicas que não são afetadas pelas dicas.

Qual é a diferença entre a dica de junção de distorção e a otimização de junção de distorção do AQE? Qual deles devo usar?

É recomendável contar com o tratamento de junção de distorção da AQE em vez de usar a dica de junção de distorção, pois a junção de distorção da AQE é completamente automática e, em geral, tem um desempenho melhor do que a contraparte de dica.

Por que a AQE não ajusta minha ordenação de junção automaticamente?

A reordenação dinâmica de junções não faz parte do AQE.

Por que a AQE não detectou minha distorção de dados?

Há duas condições de tamanho que devem ser atendidas para que a AQE detecte uma partição como uma partição distorcida:

  • O tamanho da partição é superior a spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (o padrão é 256 MB)
  • O tamanho da partição é maior que o tamanho mediano de todas as partições vezes o fator de partição distorcida spark.sql.adaptive.skewJoin.skewedPartitionFactor (o padrão é 5)

Além disso, o suporte ao tratamento de distorção é limitado para determinados tipos de junção, por exemplo, no LEFT OUTER JOIN, somente distorção no lado esquerdo pode ser otimizada.

Herdada

O termo "Execução Adaptável" existe desde o Spark 1.6, mas a nova AQE no Spark 3.0 é fundamentalmente diferente. Em termos de funcionalidade, o Spark 1.6 faz apenas a parte "união dinâmica de partições". Em termos de arquitetura técnica, a nova AQE é uma estrutura de planejamento dinâmico e replanejamento de consultas com base em estatísticas de runtime, que dá suporte a uma variedade de otimizações, como aquelas descritas neste artigo e pode ser estendida para habilitar otimizações mais potenciais.