Partager via


Remplacer sélectivement les données avec Delta Lake

Delta Lake propose les options distinctes suivantes pour les remplacements sélectifs :

  • L’option replaceWhere remplace atomiquement tous les enregistrements qui correspondent à un prédicat donné.
  • Vous pouvez remplacer des répertoires de données en fonction de la façon dont les tables sont partitionnées à l’aide de remplacements de partition dynamiques.

Pour la plupart des opérations, Databricks recommande d’utiliser replaceWhere pour spécifier les données à remplacer.

Important

Si des données ont été remplacées accidentellement, vous pouvez utiliser la restauration pour annuler la modification.

Remplacement sélectif arbitraire avec replaceWhere

Vous pouvez remplacer de manière sélective uniquement les données qui correspondent à une expression arbitraire.

Note

SQL nécessite Databricks Runtime 12.2 LTS ou une version ultérieure.

Par exemple, remplacez atomiquement les événements en janvier dans la table cible, qui est partitionnée par start_date, par les données dans replace_data.

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .saveAsTable("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .saveAsTable("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Cet exemple de code écrit les données dans replace_data, valide que toutes les lignes correspondent au prédicat et effectue un remplacement atomique à l’aide de la sémantique overwrite. Si des valeurs de l’opération se trouvent hors de la contrainte, cette opération échoue par défaut avec une erreur.

Vous pouvez modifier ce comportement pour overwrite les valeurs dans la plage de prédicats et insert les enregistrements qui se trouvent hors de la plage spécifiée. Pour ce faire, désactivez la vérification des contraintes en définissant la valeur spark.databricks.delta.replaceWhere.constraintCheck.enabled sur false à l’aide de l’un des paramètres suivants :

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Note

REPLACE WHERE accepte une boolean_expression avec certaines restrictions. Consultez INSERT la référence de langage SQL.

Comportement hérité

Si vous utilisez le comportement hérité de replaceWhere, les requêtes correspondent uniquement aux prédicats sur les colonnes de partition et ignorent toutes les données en dehors de la portée des prédicats. La commande suivante remplace atomiquement le mois de janvier dans la table cible, partitionnée par date, par les données dans df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .saveAsTable("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .saveAsTable("people10m")

Pour utiliser le comportement hérité, définissez l’indicateur spark.databricks.delta.replaceWhere.dataColumns.enabled sur false:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Remplacements de partition dynamique

La partition dynamique remplace uniquement les partitions où l’écriture valide les nouvelles données et laisse d’autres partitions inchangées.

Azure Databricks recommande REPLACE USING pour déclencher des remplacements de partitions dynamiques. Ce mode fonctionne sur tous les types de calcul, notamment les entrepôts Databricks SQL, le calcul serverless et le calcul classique, et ne vous oblige pas à définir une configuration de session Spark. Consultez les remplacements de partition dynamique avec REPLACE USING.

partitionOverwriteMode est un mode hérité pour les remplacements de partition dynamique qui vous obligent à utiliser le calcul classique et à définir une configuration de session Spark. Il n’est pas pris en charge sur Databricks SQL ou sur le calcul sans serveur. Consultez les remplacements de partition dynamique avec partitionOverwriteMode (hérité) .

Les sections ci-dessous montrent comment utiliser chaque mode.

Remplacements de partition dynamique avec REPLACE USING

Databricks Runtime 16.3 et versions ultérieures prennent en charge les remplacements dynamiques de partitions pour les tables à l’aide de REPLACE USING. Vous pouvez remplacer de manière sélective les données de tous les types de calcul, sans avoir à définir une configuration de session Spark. REPLACE USING permet un comportement de remplacement atomique indépendant de l’infrastructure de calcul, qui fonctionne sur les entrepôts SQL Databricks, le calcul serverless et le calcul classique.

REPLACE USING remplace uniquement les partitions ciblées par les données entrantes. Toutes les autres partitions restent inchangées.

REPLACE USING est pris en charge uniquement dans SQL. Pour plus d’informations, consultez la référence du langage SQL à la section INSERT.

Les exemples suivants utilisent REPLACE USING:

INSERT INTO TABLE events
  REPLACE USING (event_id, start_date)
  SELECT * FROM source_data

Gardez à l’esprit les contraintes et les comportements suivants pour la réécriture de partitions dynamiques :

  • Dans Databricks Runtime 17.2 et versions ultérieures, les tables partitionnée, les tables nonpartitées et les tables avec clustering liquide sont prises en charge.
  • Dans Databricks Runtime 16.3 à 17.1, seules les tables partitionnée sont prises en charge et vous devez spécifier l’ensemble complet des colonnes de partition de la table dans la USING clause.
  • Vérifiez toujours que les données écrites touchent uniquement les partitions attendues. Une seule ligne dans la partition incorrecte peut remplacer involontairement la partition entière.

Si vous avez besoin d’une logique de correspondance plus personnalisable que ce qui REPLACE USING prend en charge, comme le traitement des NULL valeurs comme égales, utilisez plutôt l’élément complémentaire REPLACE ON . Pour plus d’informations, consultez INSERT.

Remplacements de partition dynamiques avec partitionOverwriteMode (hérité)

Important

Cette fonctionnalité est disponible en préversion publique.

Databricks Runtime 11.3 LTS et les versions ultérieures prennent en charge les remplacements dynamiques de partitions pour les tables partitionnées en utilisant le mode d’écrasement : soit INSERT OVERWRITE dans SQL, soit une écriture de DataFrame avec df.write.mode("overwrite"). Ce type de remplacement n’est disponible que pour le calcul classique, et non pour les entrepôts SQL Databricks ou le calcul serverless.

Avertissement

Lorsque cela est possible, utilisez INSERT OVERWRITE REPLACE USING plutôt que de remplacer la partition avec INSERT OVERWRITE PARTITION et spark.sql.sources.partitionOverwriteMode=dynamic. Lors des modifications de partitionnement, le remplacement de partition peut utiliser des données obsolètes.

Pour utiliser le mode de remplacement de partition dynamique, définissez la configuration de session Spark sur spark.sql.sources.partitionOverwriteModedynamic. Vous pouvez également définir l’option DataFrameWriterpartitionOverwriteMode à dynamic. Si elle est présente, l’option spécifique à la requête remplace le mode défini dans la configuration de session. La valeur par défaut pour spark.sql.sources.partitionOverwriteMode est static.

L'exemple suivant utilise partitionOverwriteMode :

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Gardez à l’esprit les contraintes et comportements suivants pour partitionOverwriteMode:

  • Vous ne pouvez pas attribuer overwriteSchema à true.
  • Vous ne pouvez pas spécifier les deux partitionOverwriteMode et replaceWhere dans la même DataFrameWriter opération.
  • Si vous spécifiez une condition replaceWhere avec une option DataFrameWriter, Delta Lake applique cette condition pour contrôler les données remplacées. Cette option est prioritaire sur la configuration au niveau de la partitionOverwriteMode session.
  • Vérifiez toujours que les données écrites touchent uniquement les partitions attendues. Une seule ligne dans la partition incorrecte peut remplacer involontairement la partition entière.