Partager via


Mettre à jour ou insérer dans une table Delta Lake en utilisant la fusion

Vous pouvez insérer ou mettre à jour les données d’une table source, d’une vue ou d’un DataFrame dans une table Delta cible à l’aide de l’opération MERGE SQL. Delta Lake prend en charge les insertions, les mises à jour et les suppressions dans MERGE, et prend en charge une syntaxe étendue par rapport aux standards SQL pour faciliter les cas d’usage avancés.

Supposons que vous disposez d’une table source nommée people10mupdates ou d’un chemin d’accès source qui /tmp/delta/people-10m-updates contient de nouvelles données pour une table cible nommée people10m ou un chemin cible à l’adresse /tmp/delta/people-10m. Certains de ces nouveaux enregistrements sont peut-être déjà présents dans les données cibles. Pour fusionner les nouvelles données, vous souhaitez mettre à jour les lignes où le id de la personne est déjà présent et insérer les nouvelles lignes où aucune correspondance id n’est présente. Vous pouvez exécuter la requête suivante :

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Langage de programmation Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Important

Une seule ligne de la table source peut correspondre à une ligne donnée dans la table cible. Dans Databricks Runtime 16.0 et versions ultérieures, MERGE évalue les conditions spécifiées dans les clauses et WHEN MATCHED les ON conditions pour déterminer les correspondances dupliquées. Dans Databricks Runtime 15.4 LTS et ci-dessous, MERGE les opérations considèrent uniquement les conditions spécifiées dans la ON clause.

Pour plus d’informations sur les syntaxes Scala et Python, consultez Documentation sur l’API Delta Lake. Pour plus de détails sur la syntaxe SQL, consultez MERGE INTO

Modifier toutes les lignes sans correspondance à l’aide de la fusion

Dans Databricks SQL et Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez utiliser la WHEN NOT MATCHED BY SOURCE clause pour UPDATE ou DELETE les enregistrements dans la table cible qui n’ont pas d’enregistrements correspondants dans la table source. Databricks recommande d’ajouter une clause conditionnelle facultative pour éviter la réécriture complète de la table cible.

L’exemple de code suivant montre la syntaxe de base de l’utilisation de cette méthode pour les suppressions, en remplaçant la table cible par le contenu de la table source et en supprimant les enregistrements sans correspondance dans la table cible. Pour obtenir un modèle plus évolutif pour les tables où les mises à jour et les suppressions sources sont limitées dans le temps, consultez La synchronisation incrémentielle de la table Delta avec la source.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Langage de programmation Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

L’exemple suivant ajoute des conditions à la WHEN NOT MATCHED BY SOURCE clause et spécifie des valeurs à mettre à jour dans des lignes cibles sans correspondance.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Langage de programmation Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Sémantique de l’opération de fusion

Vous trouverez ci-dessous une description détaillée de la sémantique de l’opération merge programmatique.

  • Il peut y avoir un nombre quelconque de clauses whenMatched et whenNotMatched.

  • whenMatched les clauses sont exécutées lorsqu’une ligne source correspond à une ligne de table cible en fonction de la condition de correspondance. Ces clauses ont la sémantique suivante.

    • Les clauses whenMatched ne peuvent avoir qu’une action update et une delete. L’action update ne merge met à jour que les colonnes spécifiées (similaires à update) de la ligne cible correspondante. L’action delete supprime la ligne correspondante.

    • Chaque clause whenMatched peut avoir une condition facultative. Si cette condition de clause existe, l’action update ou delete est exécutée pour toute paire de lignes source-cible correspondante uniquement lorsque la condition de clause a la valeur true.

    • S’il y a plusieurs clauses whenMatched, elles sont évaluées dans l’ordre dans lequel elles sont spécifiées. Toutes les clauses whenMatched, à l’exception de la dernière, doivent avoir des conditions.

    • Si aucune des conditions whenMatched ne prend la valeur true pour une paire de lignes source et cible qui correspond à la condition de fusion, la ligne cible reste inchangée.

    • Pour mettre à jour toutes les colonnes de la table Delta cible avec les colonnes correspondantes du jeu de données source, utilisez whenMatched(...).updateAll(). Ceci équivaut à :

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      pour toutes les colonnes de la table cible Delta. Par conséquent, cette action suppose que la table source a les mêmes colonnes que celles de la table cible, sinon la requête lève une erreur d’analyse.

      Remarque

      Ce comportement change lorsque l’évolution de schéma automatique est activée. Pour plus d’informations, consultez l’évolution automatique du schéma .

  • Les clauses whenNotMatched sont exécutées lorsqu’une ligne source ne correspond à aucune ligne cible basée sur la condition de correspondance. Ces clauses ont la sémantique suivante.

    • Les clauses whenNotMatched ne peuvent avoir que l’action insert. La nouvelle ligne est générée en fonction de la colonne spécifiée et des expressions correspondantes. Vous n’avez pas besoin de spécifier toutes les colonnes de la table cible. Pour les colonnes cibles non spécifiées, NULL est insérée.

    • Chaque clause whenNotMatched peut avoir une condition facultative. Si la condition de clause est présente, une ligne source est insérée uniquement si cette condition est vraie pour cette ligne. Sinon, la colonne source est ignorée.

    • S’il y a plusieurs clauses whenNotMatched, elles sont évaluées dans l’ordre dans lequel elles sont spécifiées. Toutes les clauses whenNotMatched, à l’exception de la dernière, doivent avoir des conditions.

    • Pour insérer toutes les colonnes de la table Delta cible avec les colonnes correspondantes du jeu de données source, utilisez whenNotMatched(...).insertAll(). Ceci équivaut à :

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      pour toutes les colonnes de la table cible Delta. Par conséquent, cette action suppose que la table source a les mêmes colonnes que celles de la table cible, sinon la requête lève une erreur d’analyse.

      Remarque

      Ce comportement change lorsque l’évolution de schéma automatique est activée. Pour plus d’informations, consultez l’évolution automatique du schéma .

  • Les clauses whenNotMatchedBySource sont exécutées quand une ligne cible ne correspond à aucune ligne source sur la base de la condition de fusion. Ces clauses ont la sémantique suivante.

    • Les clauses whenNotMatchedBySource peuvent spécifier les actions delete et update.
    • Chaque clause whenNotMatchedBySource peut avoir une condition facultative. Si la condition de clause est présente, une ligne cible est modifiée uniquement si cette condition est vraie pour cette ligne. Sinon, la ligne cible reste inchangée.
    • S’il y a plusieurs clauses whenNotMatchedBySource, elles sont évaluées dans l’ordre dans lequel elles sont spécifiées. Toutes les clauses whenNotMatchedBySource, à l’exception de la dernière, doivent avoir des conditions.
    • Par définition, whenNotMatchedBySource les clauses n’ont pas de ligne source à partir de laquelle extraire les valeurs des colonnes, et les colonnes sources ne peuvent donc pas être référencées. Pour que chaque colonne soit modifiée, vous pouvez spécifier un littéral ou effectuer une action sur la colonne cible, comme SET target.deleted_count = target.deleted_count + 1, par exemple.

Important

  • Une merge opération peut échouer si plusieurs lignes du jeu de données source correspondent et que la fusion tente de mettre à jour les mêmes lignes de la table Delta cible. Selon la sémantique SQL de fusion, une telle opération de mise à jour est ambiguë, car il n’est pas clair quelle ligne source doit être utilisée pour mettre à jour la ligne cible correspondante. Vous pouvez prétraiter la table source pour éliminer le risque de correspondances multiples.
  • Vous pouvez appliquer une opération SQL MERGE sur un affichage SQL VIEW uniquement si l’affichage a été défini en tant que CREATE VIEW viewName AS SELECT * FROM deltaTable.

Déduplication des données lors de l’écriture dans des tables Delta

Un cas d'usage courant d'ETL consiste à collecter des journaux en les ajoutant à une table Delta. Toutefois, souvent, les sources peuvent générer des enregistrements de journal en double et des étapes de déduplication en aval sont nécessaires pour les prendre en charge. La commande merge vous permet d’éviter d’insérer les enregistrements en double.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Langage de programmation Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Remarque

Le jeu de données contenant les nouveaux journaux doit être dédupliqué dans son propre sein. Par la sémantique SQL de fusion, elle correspond et déduplique les nouvelles données avec les données existantes de la table, mais s’il existe des données en double dans le nouveau jeu de données, elle est insérée. Par conséquent, dédupliquez les nouvelles données avant de les fusionner dans la table.

Si vous savez qu'il se peut que vous obteniez des enregistrements dupliqués pour quelques jours seulement, vous pouvez optimiser davantage votre requête en partitionnant la table par date, puis en spécifiant la plage de dates de la table cible correspondant.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Langage de programmation Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Cela est plus efficace que la commande précédente, car elle recherche des doublons uniquement dans les 7 derniers jours de journaux, et non dans la table entière. En outre, vous pouvez utiliser cette fusion par insertion uniquement avec Structured Streaming afin d'effectuer une déduplication continue des journaux.

  • Dans une requête de diffusion en continu, vous pouvez utiliser l’opération de fusion pour foreachBatch écrire en continu toutes les données de diffusion en continu dans une table Delta avec déduplication. Pour plus d’informations sur , consultez l'foreachBatch suivant.
  • Dans une autre requête de diffusion en continu, vous pouvez lire en continu des données dédupliquées à partir de cette table Delta. Cela est possible car une fusion par insertion uniquement ajoute de nouvelles données à la table Delta.

Données à variation lente (SCD) et capture de données modifiées (CDC) avec Delta Lake

Les pipelines déclaratifs de Lakeflow prennent en charge de manière native le suivi et l'application des types SCD 1 et 2. Utilisez AUTO CDC ... INTO avec les pipelines déclaratifs Lakeflow pour vous assurer que les enregistrements désordonnés sont gérés correctement lors du traitement des flux CDC. Consultez Les API CDC AUTO : simplifiez la capture de données modifiées grâce aux pipelines déclaratifs Lakeflow.

Synchroniser de façon incrémentielle la table Delta avec la source

Dans Databricks SQL et Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez utiliser WHEN NOT MATCHED BY SOURCE pour créer des conditions arbitraires pour supprimer et remplacer atomiquement une partie d’une table. Cela peut être particulièrement utile lorsque vous disposez d’une table source où les enregistrements peuvent changer ou être supprimés pendant plusieurs jours après l’entrée de données initiale, mais finalement s’installer dans un état final.

La requête suivante montre l’utilisation de ce modèle pour sélectionner 5 jours d’enregistrements à partir de la source, mettre à jour les enregistrements correspondants dans la cible, insérer de nouveaux enregistrements de la source vers la cible et supprimer tous les enregistrements sans correspondance des 5 derniers jours dans la cible.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

En fournissant le même filtre booléen sur les tables source et cible, vous pouvez propager dynamiquement les modifications de votre source vers les tables cibles, y compris les suppressions.

Remarque

Bien que ce modèle puisse être utilisé sans clauses conditionnelles, cela entraînerait une réécriture complète de la table cible qui peut être coûteuse.