Partager via


Optimisation du traitement de données pour Apache Spark

Cet article explique comment optimiser la configuration de votre cluster Apache Spark pour obtenir de meilleures performances sur Azure HDInsight.

Vue d’ensemble

Si vous avez des travaux lents sur une jointure ou une lecture aléatoire, la cause est probablement l’asymétrie des données. L’asymétrie des données désigne une asymétrie dans vos données de travail. Par exemple, un travail de mappage peut prendre 20 secondes, mais l’exécution d’un travail où les données sont jointes ou assignées de manière aléatoire prend des heures. Pour corriger l’asymétrie des données, vous devez convertir l’intégralité de la clé en valeur salt, ou utiliser une valeur salt isolée pour uniquement un sous-ensemble de clés. Si vous utilisez une valeur salt isolée, vous devez effectuer un filtrage supplémentaire afin d’isoler votre sous-ensemble de clés salt dans les jointures de mappage. Une autre option consiste à introduire une colonne de compartiment et à pré-agréger d’abord dans les compartiments.

Un autre facteur pouvant ralentir les jointures est le type de jointure. Par défaut, Spark utilise le type de jointure SortMerge. Ce type de jointure convient parfaitement aux grands jeux de données, mais il est coûteux en terme de calcul, car il doit d’abord trier les côtés gauche et droit des données avant de les fusionner.

Une jointure Broadcast est particulièrement adaptée aux petits jeux de données, ou quand un côté de la jointure est beaucoup plus petit que l’autre. Ce type de jointure diffuse un côté à tous les exécuteurs. Il exige donc davantage de mémoire pour les diffusions en général.

Vous pouvez changer le type de jointure dans votre configuration en définissant spark.sql.autoBroadcastJoinThreshold, ou vous pouvez définir un indicateur de jointure à l’aide des API DataFrame (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

Si vous utilisez des tables compartimentées, vous disposez d’un troisième type de jointure : Merge. Un jeu de données correctement prépartitionné et prétrié ignore la phase de tri coûteuse d’une jointure SortMerge.

L’ordre des jointures est important, en particulier dans les requêtes plus complexes. Commencez par les jointures les plus sélectives. Dans la mesure du possible, déplacez également les jointures qui augmentent le nombre de lignes après les agrégations.

Pour gérer le parallélisme en cas de jointures cartésiennes, vous pouvez ajouter des structures imbriquées, le fenêtrage, et éventuellement ignorer une ou plusieurs étapes dans votre travail Spark.

Optimiser l’exécution des travaux

  • Effectuez une mise en cache si nécessaire. Par exemple, si vous utilisez les données à deux reprises, mettez-les en cache.
  • Diffusez les variables vers tous les exécuteurs. Les variables ne sont sérialisées qu’une seule fois, ce qui accélère les recherches.
  • Utilisez le pool de threads sur le pilote. Cela permet d’accélérer de nombreuses tâches.

Surveillez régulièrement vos travaux en cours d’exécution afin de détecter tout problème de performances. Si vous avez besoin de plus de détails pour résoudre certains problèmes, utilisez l’un des outils de profilage de performances suivants :

Le moteur Tungsten joue un rôle essentiel pour les performances des requêtes Spark 2.x. Il dépend de la génération de code à l’échelle globale. Dans certains cas, la génération de code à l’échelle globale peut être désactivée. Par exemple, si vous utilisez un type non mutable (string) dans l’expression d’agrégation, SortAggregate s’affiche à la place de HashAggregate. Par exemple, pour obtenir de meilleures performances, essayez ce qui suit puis réactivez la génération de code :

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

Étapes suivantes