Partilhar via


Otimizar trabalhos do Apache Spark no Azure Synapse Analytics

Saiba como otimizar uma configuração de cluster do Apache Spark para sua carga de trabalho específica. O problema mais comum é a pressão da memória, devido a configurações inadequadas (em especial, executores com tamanho incorreto), operações de execução longa e tarefas que resultam em Operações cartesianas. Você pode acelerar trabalhos com cache apropriado e permitindo a distorção de dados. Para obter o melhor desempenho, monitore e analise as execuções de tarefas do Spark de longa duração e que consomem recursos.

As seções a seguir descrevem otimizações e recomendações comuns de trabalho do Spark.

Escolha a abstração de dados

As versões anteriores do Spark usam RDDs para abstrair dados, o Spark 1.3 e 1.6 introduziram DataFrames e DataSets, respectivamente. Considere os seguintes méritos relativos:

  • DataFrames
    • Melhor escolha na maioria das situações.
    • Fornece otimização de consultas através do Catalyst.
    • Geração de código de estágio inteiro.
    • Acesso direto à memória.
    • Baixa sobrecarga de coleta de lixo (GC).
    • Não é tão amigável para desenvolvedores quanto DataSets, pois não há verificações em tempo de compilação ou programação de objetos de domínio.
  • Conjuntos de dados
    • Bom em pipelines ETL complexos onde o impacto no desempenho é aceitável.
    • Não é bom em agregações onde o impacto no desempenho pode ser considerável.
    • Fornece otimização de consultas através do Catalyst.
    • Amigável para desenvolvedores, fornecendo programação de objetos de domínio e verificações em tempo de compilação.
    • Adiciona sobrecarga de serialização/desserialização.
    • Alta sobrecarga de GC.
    • Interrompe a geração de código de estágio inteiro.
  • RDDs
    • Você não precisa usar RDDs, a menos que precise criar um novo RDD personalizado.
    • Nenhuma otimização de consulta através do Catalyst.
    • Sem geração de código de estágio inteiro.
    • Alta sobrecarga de GC.
    • Deve usar APIs herdadas do Spark 1.x.

Utilizar o formato de dados ideal

O Spark suporta muitos formatos, como csv, json, xml, parquet, orc e avro. O Spark pode ser estendido para suportar muitos mais formatos com fontes de dados externas - para obter mais informações, consulte Pacotes Apache Spark.

O melhor formato para desempenho é o parquet com compressão rápida, que é o padrão no Spark 2.x. O Parquet armazena dados em formato colunar e é altamente otimizado no Spark. Além disso, enquanto a compressão rápida pode resultar em arquivos maiores do que dizer compressão gzip. Devido à natureza divisória desses arquivos, eles serão descompactados mais rapidamente.

Utilizar a cache

O Spark fornece seus próprios mecanismos de cache nativos, que podem ser usados por meio de diferentes métodos, como .persist(), .cache()e CACHE TABLE. Esse cache nativo é eficaz com pequenos conjuntos de dados, bem como em pipelines de ETL onde você precisa armazenar em cache resultados intermediários. No entanto, o cache nativo do Spark atualmente não funciona bem com particionamento, uma vez que uma tabela em cache não mantém os dados de particionamento.

Utilizar a memória de forma eficiente

O Spark opera colocando dados na memória, portanto, gerenciar recursos de memória é um aspeto fundamental para otimizar a execução de trabalhos do Spark. Há várias técnicas que você pode aplicar para usar a memória do cluster de forma eficiente.

  • Prefira partições de dados menores e leve em conta o tamanho, os tipos e a distribuição dos dados em sua estratégia de particionamento.

  • No Synapse Spark (Runtime 3.1 ou superior), a serialização de dados do Kryo é habilitada por padrão pela serialização de dados do Kryo.

  • Você pode personalizar o tamanho do buffer do kryoserializer usando a configuração do Spark com base nos seus requisitos de carga de trabalho:

    // Set the desired property
    spark.conf.set("spark.kryoserializer.buffer.max", "256m")
    
    
  • Monitore e ajuste as definições de configuração do Spark.

Para sua referência, a estrutura de memória do Spark e alguns parâmetros de memória do executor chave são mostrados na próxima imagem.

Considerações sobre memória de faísca

Apache Spark no Azure Synapse usa YARN Apache Hadoop YARN, YARN controla a soma máxima de memória usada por todos os contêineres em cada nó do Spark. O diagrama a seguir mostra os objetos principais e suas relações.

Gerenciamento de memória YARN Spark

Para endereçar mensagens de "falta de memória", tente:

  • Analise os embaralhamentos de gerenciamento do DAG. Reduza reduzindo do lado do mapa, pré-particione (ou bucketize) os dados de origem, maximize os embaralhamentos únicos e reduza a quantidade de dados enviados.
  • Prefira ReduceByKey com seu limite de memória fixa para GroupByKey, que fornece agregações, janelas e outras funções, mas tem um limite de memória ilimitada.
  • Prefira TreeReduce, que faz mais trabalho nos executores ou partições, para Reduce, que faz todo o trabalho no driver.
  • Aproveite DataFrames em vez dos objetos RDD de nível inferior.
  • Crie ComplexTypes que encapsulam ações, como "Top N", várias agregações ou operações de janela.

Otimizar a serialização de dados

Os trabalhos do Spark são distribuídos, portanto, a serialização de dados apropriada é importante para o melhor desempenho. Há duas opções de serialização para o Spark:

  • Serialização Java
  • A serialização do Kryo é o padrão. É um formato mais recente e pode resultar em serialização mais rápida e compacta do que o Java. O Kryo requer que você registre as classes em seu programa e ainda não suporta todos os tipos serializáveis.

Utilizar os registos

O bucketing é semelhante ao particionamento de dados, mas cada bucket pode conter um conjunto de valores de coluna em vez de apenas um. O bucketing funciona bem para particionamento em números grandes (em milhões ou mais) de valores, como identificadores de produto. Um bucket é determinado pelo hash da chave bucket da linha. As tabelas em bucket oferecem otimizações exclusivas porque armazenam metadados sobre como foram bucketed e classificados.

Alguns recursos avançados de bucketing são:

  • Otimização de consultas com base em meta-informações de bucketing.
  • Agregações otimizadas.
  • Junções otimizadas.

Você pode usar particionamento e bucketing ao mesmo tempo.

Otimizar as associações e misturas

Se você tiver trabalhos lentos em um Join ou Shuffle, a causa provavelmente é a distorção de dados, que é a assimetria em seus dados de trabalho. Por exemplo, um trabalho de mapa pode levar 20 segundos, mas a execução de um trabalho em que os dados são unidos ou embaralhados leva horas. Para corrigir a distorção de dados, você deve salgar a chave inteira ou usar um sal isolado para apenas alguns subconjuntos de chaves. Se você estiver usando um sal isolado, você deve filtrar ainda mais para isolar seu subconjunto de chaves salgadas em junções de mapa. Outra opção é introduzir uma coluna de balde e pré-agregar em buckets primeiro.

Outro fator que causa junções lentas pode ser o tipo de junção. Por padrão, o Spark usa o tipo de SortMerge junção. Esse tipo de junção é mais adequado para grandes conjuntos de dados, mas é computacionalmente caro porque deve primeiro classificar os lados esquerdo e direito dos dados antes de mesclá-los.

Uma Broadcast junção é mais adequada para conjuntos de dados menores, ou onde um lado da junção é muito menor do que o outro lado. Este tipo de junção transmite um lado para todos os executores, e por isso requer mais memória para transmissões em geral.

Você pode alterar o tipo de associação em sua configuração definindo spark.sql.autoBroadcastJoinThreshold, ou pode definir uma dica de associação usando as APIs do DataFrame (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

Se você estiver usando tabelas em bucket, terá um terceiro tipo de junção, a Merge junção. Um conjunto de dados pré-particionado e pré-classificado corretamente ignorará a dispendiosa fase de classificação de uma SortMerge associação.

A ordem das junções é importante, especialmente em consultas mais complexas. Comece com as junções mais seletivas. Além disso, mova junções que aumentam o número de linhas após agregações quando possível.

Para gerenciar o paralelismo para junções cartesianas, você pode adicionar estruturas aninhadas, janelas e talvez pular uma ou mais etapas em seu trabalho do Spark.

Selecione o tamanho correto do executor

Ao decidir a configuração do executor, considere a sobrecarga de coleta de lixo (GC) Java.

  • Fatores para reduzir o tamanho do executor:

    • Reduza o tamanho da pilha abaixo de 32 GB para manter a sobrecarga < do GC em 10%.
    • Reduza o número de núcleos para manter a sobrecarga < do GC em 10%.
  • Fatores para aumentar o tamanho do executor:

    • Reduza a sobrecarga de comunicação entre os executores.
    • Reduza o número de conexões abertas entre executores (N2) em clusters maiores (>100 executores).
    • Aumente o tamanho da pilha para acomodar tarefas que consomem muita memória.
    • Opcional: reduza a sobrecarga de memória por executor.
    • Opcional: Aumente a utilização e a simultaneidade sobreinscrevendo a CPU.

Como regra geral ao selecionar o tamanho do executor:

  • Comece com 30 GB por executor e distribua os núcleos de máquina disponíveis.
  • Aumente o número de núcleos executores para clusters maiores (> 100 executores).
  • Modifique o tamanho com base nas execuções de avaliação e nos fatores anteriores, como a sobrecarga de GC.

Ao executar consultas simultâneas, considere o seguinte:

  • Comece com 30 GB por executor e todos os núcleos da máquina.
  • Crie vários aplicativos paralelos do Spark substituindo a CPU (cerca de 30% de melhoria de latência).
  • Distribua consultas entre aplicativos paralelos.
  • Modifique o tamanho com base nas execuções de avaliação e nos fatores anteriores, como a sobrecarga de GC.

Monitore o desempenho da consulta em busca de discrepâncias ou outros problemas de desempenho, observando a exibição de linha do tempo, gráfico SQL, estatísticas de trabalho e assim por diante. Às vezes, um ou alguns dos executores são mais lentos do que os outros, e as tarefas levam muito mais tempo para serem executadas. Isto acontece frequentemente em clusters maiores (> 30 nós). Neste caso, divida o trabalho em um número maior de tarefas para que o agendador possa compensar tarefas lentas.

Por exemplo, tenha pelo menos o dobro de tarefas do que o número de núcleos de executor no aplicativo. Você também pode habilitar a execução especulativa de tarefas com conf: spark.speculation = trueo .

Otimizar a execução de tarefas

  • Armazene em cache conforme necessário, por exemplo, se você usar os dados duas vezes, em seguida, armazene-os em cache.
  • Transmitir variáveis para todos os executores. As variáveis são serializadas apenas uma vez, resultando em pesquisas mais rápidas.
  • Use o pool de threads no driver, o que resulta em uma operação mais rápida para muitas tarefas.

A chave para o desempenho da consulta do Spark 2.x é o mecanismo Tungsten, que depende da geração de código de estágio inteiro. Em alguns casos, a geração de código de estágio inteiro pode ser desabilitada.

Por exemplo, se você usar um tipo não mutável (string) na expressão de agregação, SortAggregate aparecerá em vez de HashAggregate. Por exemplo, para um melhor desempenho, tente o seguinte e, em seguida, reative a geração de código:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

Próximos passos