Partager via


Remplacer sélectivement les données avec Delta Lake

Azure Databricks tire parti de la fonctionnalité Delta Lake pour prendre en charge deux options distinctes pour les remplacements sélectifs :

  • L’option replaceWhere remplace atomiquement tous les enregistrements qui correspondent à un prédicat donné.
  • Vous pouvez remplacer des répertoires de données en fonction de la façon dont les tables sont partitionnées à l’aide de remplacements de partition dynamiques.

Pour la plupart des opérations, Databricks recommande d’utiliser replaceWhere pour spécifier les données à remplacer.

Important

Si les données ont été accidentellement remplacées, vous pouvez utiliser la restauration pour annuler la modification.

Remplacement sélectif arbitraire avec replaceWhere

Vous pouvez remplacer de manière sélective uniquement les données qui correspondent à une expression arbitraire.

Remarque

SQL requiert Databricks Runtime 12.2 LTS ou version ultérieure.

La commande suivante remplace atomiquement les événements en janvier dans la table cible, qui est partitionnée par start_date, avec les données dans replace_data :

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Cet exemple de code écrit les données dans replace_data, valide que toutes les lignes correspondent au prédicat et effectue un remplacement atomique au moyen de la sémantique overwrite. Si des valeurs de l’opération se trouvent hors de la contrainte, cette opération échoue par défaut avec une erreur.

Vous pouvez modifier ce comportement en valeurs overwrite dans la plage de prédicats et les enregistrements insert qui se trouvent hors de la plage spécifiée. Pour ce faire, désactivez la vérification des contraintes en définissant la valeur spark.databricks.delta.replaceWhere.constraintCheck.enabled sur false à l’aide de l’un des paramètres suivants :

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Comportement hérité

Avec le comportement par défaut hérité, replaceWhere remplaçait les données correspondant à un prédicat sur les colonnes de partition uniquement. Avec ce modèle hérité, la commande suivante remplace atomiquement le mois de janvier dans la table cible, partitionnée par date, par les données dans df :

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

Si vous souhaitez revenir à l’ancien comportement, vous pouvez désactiver l’indicateur spark.databricks.delta.replaceWhere.dataColumns.enabled :

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Remplacements de partition dynamique

Important

Cette fonctionnalité est disponible en préversion publique.

Databricks Runtime 11.3 LTS et versions ultérieures prend en charge le mode de remplacement de partition dynamique pour les tables partitionnées. Pour les tables avec plusieurs partitions, Databricks Runtime 11.3 LTS et versions antérieures prend uniquement en charge les remplacements de partition dynamiques si toutes les colonnes de partition sont du même type de données.

En mode de remplacement de partition dynamique, les opérations remplacent toutes les données existantes dans chaque partition logique pour laquelle l’écriture valide de nouvelles données. Toutes les partitions logiques existantes pour lesquelles l’écriture ne contient pas de données restent inchangées. Ce mode s’applique uniquement lorsque les données sont écrites en mode de remplacement : soit INSERT OVERWRITE en SQL, ou une écriture DataFrame avec df.write.mode("overwrite").

Configurez le mode de remplacement de partition dynamique en définissant la configuration de session Spark spark.sql.sources.partitionOverwriteMode sur dynamic. Vous pouvez également l’activer en définissant l’option DataFrameWriterpartitionOverwriteMode sur dynamic. Si elle est présente, l’option spécifique à la requête remplace le mode défini dans la configuration de session. La valeur par défaut de partitionOverwriteMode est de static.

Important

Vérifiez que les données écrites avec une partition dynamique remplacent uniquement les partitions attendues. Une seule ligne dans la partition incorrecte peut entraîner une remplacement involontaire d’une partition entière.

L’exemple suivant illustre l’utilisation de remplacements de partition dynamique :

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Notes

  • La remplacement dynamique de la partition est en conflit avec l’option replaceWhere pour les tables partitionnées.
    • Si le remplacement de partition dynamique est activé dans la configuration de session Spark et replaceWhere est fourni en tant qu’option DataFrameWriter, Delta Lake remplace les données en fonction de l’expression replaceWhere (les options spécifiques aux requêtes remplacent les configurations de session).
    • Vous recevez une erreur si les options DataFrameWriter disposent à la fois d’un remplacement de partition dynamique et replaceWhere activé.
  • Vous ne pouvez pas spécifier overwriteSchema comme true lors de l’utilisation du remplacement de partition dynamique.