Partager via


Remplacer sélectivement les données avec Delta Lake

Azure Databricks tire parti de la fonctionnalité Delta Lake pour prendre en charge deux options distinctes pour les remplacements sélectifs :

  • L’option replaceWhere remplace atomiquement tous les enregistrements qui correspondent à un prédicat donné.
  • Vous pouvez remplacer des répertoires de données en fonction de la façon dont les tables sont partitionnées à l’aide de remplacements de partition dynamiques.

Pour la plupart des opérations, Databricks recommande d’utiliser replaceWhere pour spécifier les données à remplacer.

Important

Si des données ont été remplacées accidentellement, vous pouvez utiliser la restauration pour annuler la modification.

Remplacement sélectif arbitraire avec replaceWhere

Vous pouvez remplacer de manière sélective uniquement les données qui correspondent à une expression arbitraire.

Note

SQL nécessite Databricks Runtime 12.2 LTS ou une version ultérieure.

La commande suivante remplace atomiquement les événements en janvier dans la table cible, qui est partitionnée par start_date, avec les données dans replace_data :

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Cet exemple de code écrit les données dans replace_data, valide que toutes les lignes correspondent au prédicat et effectue un remplacement atomique à l’aide de la sémantique overwrite. Si des valeurs de l’opération se trouvent hors de la contrainte, cette opération échoue par défaut avec une erreur.

Vous pouvez modifier ce comportement pour overwrite les valeurs dans la plage de prédicats et insert les enregistrements qui se trouvent hors de la plage spécifiée. Pour ce faire, désactivez la vérification des contraintes en définissant la valeur spark.databricks.delta.replaceWhere.constraintCheck.enabled sur false à l’aide de l’un des paramètres suivants :

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Comportement hérité

Avec le comportement par défaut hérité, replaceWhere remplace les données correspondant à un prédicat sur les colonnes de partition uniquement. Avec ce modèle hérité, la commande suivante remplace atomiquement le mois de janvier dans la table cible, partitionnée par date, par les données dans df :

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

Si vous souhaitez revenir à l’ancien comportement, vous pouvez désactiver l’indicateur spark.databricks.delta.replaceWhere.dataColumns.enabled :

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Remplacements de partition dynamique

L’écriture dynamique de partitions remplace uniquement les partitions pour lesquelles de nouvelles données sont insérées. Il remplace toutes les données existantes dans ces partitions et laisse les autres inchangés.

Azure Databricks prend en charge deux approches :

  • REPLACE USING (recommandé) : fonctionne sur tous les types de calcul, notamment les entrepôts Databricks SQL, le calcul serverless et le calcul classique. Ne nécessite pas la définition d’une configuration de session Spark.
  • partitionOverwriteMode (hérité) : nécessite un calcul classique et la définition d’une configuration de session Spark. Non pris en charge sur Databricks SQL ou le calcul sans serveur.

Les sections ci-dessous montrent comment utiliser chaque approche.

Remplacements de partition dynamique avec REPLACE USING

Databricks Runtime 16.3 et versions ultérieures prend en charge le remplacement dynamique des partitions pour les tables partitionnées à l’aide de REPLACE USING. Cette méthode vous permet de remplacer de manière sélective les données de tous les types de calcul, sans avoir à définir une configuration de session Spark. REPLACE USING permet un comportement de remplacement atomique indépendant de l’infrastructure de calcul, qui fonctionne sur les entrepôts SQL Databricks, le calcul serverless et le calcul classique.

REPLACE USING remplace uniquement les partitions ciblées par les données entrantes. Toutes les autres partitions restent inchangées.

L’exemple suivant montre comment utiliser la réécriture de partition dynamique avec REPLACE USING. Actuellement, vous pouvez uniquement utiliser SQL, pas Python ou Scala. Pour plus d’informations, consultez la référence du langage SQL à la section INSERT.

INSERT INTO TABLE events
  REPLACE USING (event_id, start_date)
  SELECT * FROM source_data

Gardez à l’esprit les contraintes et les comportements suivants pour la réécriture de partitions dynamiques :

  • Vous devez spécifier l'ensemble complet des colonnes de partition de la table dans le champ USING.
  • Vérifiez toujours que les données écrites touchent uniquement les partitions attendues. Une seule ligne dans la partition incorrecte peut remplacer involontairement la partition entière.

Si vous avez besoin d’une logique de correspondance plus personnalisable que ce qui REPLACE USING prend en charge, comme le traitement des NULL valeurs comme égales, utilisez plutôt l’élément complémentaire REPLACE ON . Pour plus d’informations, consultez INSERT.

Remplacements de partition dynamiques avec partitionOverwriteMode (hérité)

Important

Cette fonctionnalité est disponible en préversion publique.

Databricks Runtime 11.3 LTS et les versions ultérieures prennent en charge les remplacements dynamiques de partitions pour les tables partitionnées en utilisant le mode d’écrasement : soit INSERT OVERWRITE dans SQL, soit une écriture de DataFrame avec df.write.mode("overwrite"). Ce type de remplacement n’est disponible que pour le calcul classique, et non pour les entrepôts SQL Databricks ou le calcul serverless.

Configurez le mode de remplacement de partition dynamique en définissant la configuration de session Spark spark.sql.sources.partitionOverwriteMode sur dynamic. Vous pouvez également définir l’option DataFrameWriterpartitionOverwriteMode à dynamic. Si elle est présente, l’option spécifique à la requête remplace le mode défini dans la configuration de session. La valeur par défaut de spark.sql.sources.partitionOverwriteMode est static.

L’exemple suivant montre comment utiliser partitionOverwriteMode :

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Gardez à l’esprit les contraintes et comportements suivants pour partitionOverwriteMode:

  • Vous ne pouvez pas attribuer overwriteSchema à true.
  • Vous ne pouvez pas spécifier les deux partitionOverwriteMode et replaceWhere dans la même DataFrameWriter opération.
  • Si vous spécifiez une condition replaceWhere avec une option DataFrameWriter, Delta Lake applique cette condition pour contrôler les données remplacées. Cette option est prioritaire sur la configuration au niveau de la partitionOverwriteMode session.
  • Vérifiez toujours que les données écrites touchent uniquement les partitions attendues. Une seule ligne dans la partition incorrecte peut remplacer involontairement la partition entière.