Partager via


Optimisation de la fusion faible et aléatoire sur les tables Delta

La commande MERGE de Delta Lake permet aux utilisateurs de mettre à jour une table delta avec des conditions avancées. Cela permet de mettre à jour les données d’une table source, d’un affichage ou d’une DataFrame dans une table cible à l’aide de la commande MERGE. Toutefois, l’algorithme actuel n’est pas entièrement optimisé pour la gestion des lignes non modifiées. Avec l’optimisation de la fusion faible et aléatoire, les lignes non modifiées sont exclues d’une opération traitement coûteuse nécessaire à la mise à jour des lignes correspondantes.

Pourquoi nous avons besoin d’une fusion faible et aléatoire

Actuellement, l’opération MERGE est effectuée par deux exécutions de jointure. La première jointure utilise l’ensemble de la table cible et des données sources pour rechercher une liste de fichiers touchés de la table cible, y compris les lignes correspondantes. Après cela, il effectue la deuxième lecture de jointure en lisant uniquement les fichiers touchés et les données sources, pour effectuer la mise à jour effective de la table. Même si la première jointure consiste à réduire la quantité de données pour la deuxième jointure, il peut encore y avoir un grand nombre de lignes non modifiées dans les fichiers touchés . La première requête de jointure est plus légère, car elle lit uniquement les colonnes dans la condition de correspondance donnée. La deuxième pour la mise à jour de table doit charger toutes les colonnes, ce qui entraîne un processus de traitement coûteux.

Avec l’optimisation de la fusion faible et aléatoire, Delta conserve temporairement le résultat de ligne correspondant de la première jointure et l’utilise pour la deuxième jointure. En fonction du résultat, il exclut les lignes non modifiées du processus de traitement lourd. Il y aurait deux tâches d’écriture distinctes pour les lignes correspondantes et les lignes non modifiées, ce qui pourrait se traduire par un nombre de fichiers de sortie deux fois plus élevé que dans le cas du comportement précédent. Toutefois, le gain de performances attendu l’emporte sur le problème possible de petits fichiers.

Disponibilité

Notes

  • La fusion faible et aléatoire est disponible en tant que fonctionnalité d’évaluation.

Elle est disponible sur les pools Synapse pour les versions 3.2 et 3.3 d’Apache Spark.

Version Disponibilité Default
Delta 0.6/Spark 2.4 Non -
Delta 1.2/Spark 3.2 Oui false
Delta 2.2/Spark 3.3 Oui true

Avantages de la fusion faible et aléatoire

  • Les lignes non modifiées dans les fichiers touchés sont traitées séparément et ne passent pas par l’opération MERGE réelle. Cela permet d’économiser le temps d’exécution de MERGE global et les ressources de calcul. Le gain serait plus important lorsque de nombreuses lignes sont copiées et que seules quelques lignes sont mises à jour.
  • L’ordre des lignes est conservé pour les lignes non modifiées. Par conséquent, les fichiers de sortie contenant des lignes non modifiées pourraient être encore efficaces pour le saut de données si le fichier était trié ou Z-ORDERED.
  • Même dans le pire des cas, lorsque la condition de fusion correspond à toutes les lignes des fichiers touchés, les frais engendrés resteraient minimes.

Comment activer et désactiver la fusion faible et aléatoire

Une fois la configuration définie pour le pool ou la session, tous les modèles d’écriture Spark utilisent les fonctionnalités.

Pour utiliser l’optimisation de fusion faible et aléatoire, activez-la à l’aide de la configuration suivante :

  1. Scala et PySpark
spark.conf.set("spark.microsoft.delta.merge.lowShuffle.enabled", "true")
  1. Spark SQL
SET `spark.microsoft.delta.merge.lowShuffle.enabled` = true

Pour vérifier la valeur de configuration actuelle, utilisez la commande comme indiqué ci-dessous :

  1. Scala et PySpark
spark.conf.get("spark.microsoft.delta.merge.lowShuffle.enabled")
  1. Spark SQL
SET `spark.microsoft.delta.merge.lowShuffle.enabled`

Pour désactiver la fonctionnalité, modifiez la configuration suivante, comme indiqué ci-dessous :

  1. Scala et PySpark
spark.conf.set("spark.microsoft.delta.merge.lowShuffle.enabled", "false")
  1. Spark SQL
SET `spark.microsoft.delta.merge.lowShuffle.enabled` = false