La nécessité d’optimiser l’écriture sur Apache Spark

Les charges de travail analytiques sur les moteurs de traitement Big Data tels qu’Apache Spark s’effectuent plus efficacement lors de l’utilisation de tailles de fichiers plus volumineuses standardisées. La relation entre la taille du fichier, le nombre de fichiers, le nombre de travailleurs Spark et ses configurations jouent un rôle essentiel sur les performances. Les charges de travail d’ingestion dans des tables data lake peuvent avoir la caractéristique héritée d’écrire constamment de nombreux petits fichiers ; ce scénario est communément appelé « problème de petit fichier ».

Optimiser l’écriture est une fonctionnalité Delta Lake sur Synapse qui réduit le nombre de fichiers écrits et vise à augmenter la taille de fichier individuelle des données écrites. Il optimise dynamiquement les partitions tout en générant des fichiers avec une taille par défaut de 128 Mo. La taille de fichier cible peut être modifiée par spécification de charge de travail à l’aide de configurations.

Cette fonction permet d'atteindre la taille du fichier en utilisant une phase supplémentaire de brassage des données sur les partitions, ce qui entraîne un coût de traitement supplémentaire lors de l'écriture des données. La petite pénalité d’écriture doit être compensée par l’efficacité de lecture sur les tables.

Notes

  • Il est disponible sur les pools Synapse pour les versions Apache Spark supérieures à 3.1.

Avantages de l’optimisation des écritures

  • Il est disponible sur les tables Delta Lake pour les modèles d’écriture Batch et Streaming.
  • Il n’est pas nécessaire de modifier le modèle de commande spark.write. La fonctionnalité est activée par un paramètre de configuration ou une propriété de table.
  • Il réduit le nombre de transactions d’écriture par rapport à la commande OPTIMISER.
  • Les opérations OPTIMISER seront plus rapides car elles fonctionnent sur moins de fichiers.
  • La commande VACUUM pour la suppression d’anciens fichiers non référencés fonctionne également plus rapidement.
  • Les requêtes analysent moins de fichiers avec des tailles de fichiers plus optimales, ce qui améliore les performances de lecture ou l’utilisation des ressources.

Optimiser les scénarios d’utilisation d’écriture

Quand utiliser cette fonctionnalité ?

  • Les tables partitionnés Delta lake sont soumises à des modèles d’écriture qui génèrent des tailles de fichiers sous-optimales (inférieures à 128 Mo) ou non normalisées (fichiers dont les tailles sont différentes).
  • Trames de données réparties qui seront écrites sur le disque avec une taille de fichiers sous-optimale.
  • Tables partitionnée delta lake ciblées par de petites commandes SQL de lot telles que UPDATE, DELETE, MERGE, CREATE TABLE AS SELECT, INSERT INTO, etc.
  • Scénarios d’ingestion de streaming avec des modèles de données d’ajout à des tables partitionnée Delta lake où la latence d’écriture supplémentaire est tolérable.

Quand l'éviter

  • Tables non partitionnées.
  • Cas d’usage où la latence d’écriture supplémentaire n’est pas acceptable.
  • Tables volumineuses avec des planifications d’optimisation bien définies et des modèles de lecture.

Comment activer et désactiver la fonctionnalité d’écriture d’optimisation

La fonctionnalité d’écriture d’optimisation est désactivée par défaut. Dans le pool Spark 3.3, c’est activé par défaut pour les tables partitionnées.

Une fois la configuration définie pour le pool ou la session, tous les modèles d’écriture Spark utilisent les fonctionnalités.

Pour utiliser la fonctionnalité d’écriture d’optimisation, activez-la à l’aide de la configuration suivante :

  1. Scala et PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = true

Pour vérifier la valeur de configuration actuelle, utilisez la commande comme indiqué ci-dessous :

  1. Scala et PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.enabled")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled`

Pour désactiver la fonctionnalité d’écriture d’optimisation, modifiez la configuration suivante, comme indiqué ci-dessous :

  1. Scala et PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "false")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = false

Contrôle de l’optimisation de l’écriture à l’aide des propriétés de table

Sur les nouvelles tables

  1. SQL
CREATE TABLE <table_name> TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

Utilisation de l'API DeltaTableBuilder :

val table = DeltaTable.create()
  .tableName("<table_name>")
  .addColumnn("<colName>", <dataType>)
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

Sur les tables existantes

  1. SQL
ALTER TABLE <table_name> SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

Utilisation de l'API DeltaTableBuilder :

val table = DeltaTable.replace()
  .tableName("<table_name>")
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

Comment obtenir et modifier la configuration de taille de fichier maximale actuelle pour Optimiser l’écriture

Pour obtenir la valeur de la configuration actuelle, utilisez les commandes suivantes. La valeur par défaut est 128 Mo.

  1. Scala et PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.binSize")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize`
  • Pour modifier la valeur de configuration
  1. Scala et PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "134217728")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize` = 134217728

Étapes suivantes