Upsert dans une table Delta Lake à l’aide de la fusion
Vous pouvez effectuer un upsert de données à partir d’une table, d’un affichage ou d’une tramedonnées sources dans une table Delta cible à l’aide de l’opération SQL MERGE
. Delta Lake prend en charge les insertions, les mises à jour et les suppressions dans MERGE
, et prend en charge une syntaxe étendue par rapport aux standards SQL pour faciliter les cas d’usage avancés.
Supposons que vous disposez d’une table source nommée people10mupdates
ou d’un chemin d’accès source /tmp/delta/people-10m-updates
contenant de nouvelles données pour une table cible nommée people10m
ou un chemin d’accès cible /tmp/delta/people-10m
. Certains de ces nouveaux enregistrements sont peut-être déjà présents dans les données cibles. Pour fusionner les nouvelles données, vous devez mettre à jour les lignes où l’id
de la personne est déjà présent, et insérer les nouvelles lignes là où aucun id
correspondant n’est présent. Vous pouvez exécuter la requête suivante :
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Important
Une seule ligne de la table source peut correspondre à une ligne donnée dans la table cible. Dans Databricks Runtime 16.0 et versions ultérieures, MERGE
évalue les conditions spécifiées dans les clauses et ON
les WHEN MATCHED
conditions pour déterminer les correspondances dupliquées. Dans Databricks Runtime 15.4 LTS et ci-dessous, MERGE
les opérations considèrent uniquement les conditions spécifiées dans la ON
clause.
Pour plus d’informations sur les syntaxes Scala et Python, consultez Documentation sur l’API Delta Lake. Pour plus d’informations sur la syntaxe SQL, consultez MERGE INTO
Modifier toutes les lignes sans correspondance à l’aide de la fusion
Dans Databricks SQL et Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez utiliser la clause WHEN NOT MATCHED BY SOURCE
pour les enregistrements UPDATE
ou DELETE
dans la table cible qui n’a aucun enregistrement correspondant dans la table source. Databricks recommande d’ajouter une clause conditionnelle facultative pour éviter de réécrire entièrement la table cible.
L’exemple de code suivant indique la syntaxe de base à utiliser pour les suppressions, en remplaçant la table cible par le contenu de la table source et en supprimant les enregistrements sans correspondance dans la table cible. Pour utiliser un modèle davantage scalable pour les tables où les mises à jour et les suppressions de la source sont limitées dans le temps, consultez Synchroniser de manière incrémentielle la table Delta avec la source.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
L’exemple suivant ajoute des conditions à la clause WHEN NOT MATCHED BY SOURCE
et spécifie les valeurs à mettre à jour dans les lignes cibles sans correspondance.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Sémantique de l’opération de fusion
Vous trouverez ci-dessous une description détaillée de la sémantique de l’opération merge
programmatique.
Il peut y avoir un nombre quelconque de clauses
whenMatched
etwhenNotMatched
.Les clauses
whenMatched
sont exécutées lorsqu’une ligne source correspond à une ligne de table cible basée sur la condition de correspondance. Ces clauses ont la sémantique suivante.Les clauses
whenMatched
ne peuvent avoir qu’une actionupdate
et unedelete
. L’actionupdate
dansmerge
met à jour uniquement les colonnes spécifiées (comme l’opérationupdate
) de la ligne cible correspondante. L’actiondelete
supprime la ligne correspondante.Chaque clause
whenMatched
peut avoir une condition facultative. Si cette condition de clause existe, l’actionupdate
oudelete
est exécutée pour toute paire de lignes source-cible correspondante uniquement lorsque la condition de clause a la valeur true.S’il y a plusieurs clauses
whenMatched
, elles sont évaluées dans l’ordre dans lequel elles sont spécifiées. Toutes les clauseswhenMatched
, à l’exception de la dernière, doivent avoir des conditions.Si aucune des conditions
whenMatched
ne prend la valeur true pour une paire de lignes source et cible qui correspond à la condition de fusion, la ligne cible reste inchangée.Pour mettre à jour toutes les colonnes de la table Delta cible avec les colonnes correspondantes du jeu de données source, utilisez
whenMatched(...).updateAll()
. Ceci équivaut à :whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
pour toutes les colonnes de la table Delta cible. Par conséquent, cette action suppose que la table source possède les mêmes colonnes que celles de la table cible, sinon la requête génère une erreur d’analyse.
Remarque
Ce comportement change lorsque l’évolution de schéma automatique est activée. Pour plus d’informations, consultez Évolution automatique du schéma.
Les clauses
whenNotMatched
sont exécutées lorsqu’une ligne source ne correspond à aucune ligne cible basée sur la condition de correspondance. Ces clauses ont la sémantique suivante.Les clauses
whenNotMatched
ne peuvent avoir que l’actioninsert
. La nouvelle ligne est générée en fonction de la colonne spécifiée et des expressions correspondantes. Il n’est pas nécessaire de spécifier toutes les colonnes de la table cible. Pour les colonnes cibles non spécifiées,NULL
est inséré.Chaque clause
whenNotMatched
peut avoir une condition facultative. Si la condition de clause est présente, une ligne source est insérée uniquement si cette condition est vraie pour cette ligne. Dans le cas contraire, la colonne source est ignorée.S’il y a plusieurs clauses
whenNotMatched
, elles sont évaluées dans l’ordre dans lequel elles sont spécifiées. Toutes les clauseswhenNotMatched
, à l’exception de la dernière, doivent avoir des conditions.Pour insérer toutes les colonnes de la table Delta cible avec les colonnes correspondantes du jeu de données source, utilisez
whenNotMatched(...).insertAll()
. Ceci équivaut à :whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
pour toutes les colonnes de la table Delta cible. Par conséquent, cette action suppose que la table source possède les mêmes colonnes que celles de la table cible, sinon la requête génère une erreur d’analyse.
Remarque
Ce comportement change lorsque l’évolution de schéma automatique est activée. Pour plus d’informations, consultez Évolution automatique du schéma.
Les clauses
whenNotMatchedBySource
sont exécutées quand une ligne cible ne correspond à aucune ligne source sur la base de la condition de fusion. Ces clauses ont la sémantique suivante.- Les clauses
whenNotMatchedBySource
peuvent spécifier les actionsdelete
etupdate
. - Chaque clause
whenNotMatchedBySource
peut avoir une condition facultative. Si la condition de clause est présente, une ligne cible est modifiée uniquement si cette condition est vraie pour cette ligne. Sinon, la ligne cible reste inchangée. - S’il y a plusieurs clauses
whenNotMatchedBySource
, elles sont évaluées dans l’ordre dans lequel elles sont spécifiées. Toutes les clauseswhenNotMatchedBySource
, à l’exception de la dernière, doivent avoir des conditions. - Par définition, les clauses
whenNotMatchedBySource
n’ont pas de ligne source d’où peuvent être extraites des valeurs de colonne. Cela explique pourquoi il n’est pas possible de référencer les colonnes sources. Pour chaque colonne à modifier, vous pouvez spécifier un littéral ou effectuer une action sur la colonne cible, par exempleSET target.deleted_count = target.deleted_count + 1
.
- Les clauses
Important
- Une opération
merge
peut échouer si plusieurs lignes du jeu de données source correspondent et si l’opération tente de mettre à jour les mêmes lignes de la table Delta cible. Selon la sémantique SQL de la fusion, une telle opération de mise à jour est ambiguë car on ne sait pas quelle ligne source doit être utilisée pour mettre à jour la ligne cible correspondante. Vous pouvez prétraiter la table source pour éliminer le risque de correspondances multiples. - Vous pouvez appliquer une opération SQL
MERGE
sur un affichage SQL VIEW uniquement si l’affichage a été défini en tant queCREATE VIEW viewName AS SELECT * FROM deltaTable
.
Déduplication des données lors de l’écriture dans des tables Delta
Un cas d’usage ETL courant consiste à collecter des journaux dans une table Delta en les ajoutant à une table. Toutefois, les sources peuvent souvent générer des enregistrements de journal en double et des étapes de déduplication en aval sont nécessaires pour s’en occuper. La commande merge
vous permet d’éviter d’insérer les enregistrements en double.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Notes
Le jeu de données contenant les nouveaux journaux doit être dédupliqué dans son propre sein. En vertu de la sémantique SQL de fusion, il met en correspondance et déduplique les nouvelles données avec les données existantes dans la table, mais s’il existe des données en double dans le nouveau jeu de données, elles sont insérées. Par conséquent, dédupliquez les nouvelles données avant de les fusionner dans la table.
Si vous savez que vous risquez d’obtenir des enregistrements en double pendant quelques jours seulement, vous pouvez optimiser votre requête en partitionnant la table par date, puis en spécifiant la plage de dates de la table cible avec laquelle établir la correspondance.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Cette opération est plus efficace que la commande précédente, car elle recherche des doublons uniquement dans les 7 derniers jours des journaux, et non dans la table entière. En outre, vous pouvez utiliser cette fusion par insertion uniquement avec une diffusion en continu structurée afin d’effectuer une déduplication continue des journaux.
- Dans une requête de diffusion en continu, vous pouvez utiliser une opération de fusion dans
foreachBatch
afin d’écrire en permanence toutes les données de diffusion en continu dans une table Delta avec déduplication. Pour plus d’informations surforeachBatch
, consultez l'exemple de diffusion en continu suivant. - Dans une autre requête de diffusion en continu, vous pouvez lire en permanence des données dédupliquées à partir de cette table Delta. Cela est possible parce qu’une fusion par insertion uniquement ajoute uniquement de nouvelles données à la table Delta.
Données à variation lente (SCD) et capture de données modifiées (CDC) avec Delta Lake
Delta Live Tables inclut une prise en charge native du suivi et de l’application de SCD Type 1 et Type 2. Utilisez APPLY CHANGES INTO
avec Delta Live Tables pour vous assurer que les enregistrements en désordre sont gérés correctement lors du traitement des flux CDC. Voir API APPLY CHANGES : Simplifier la capture des changements de données dans Delta Live Tables.
Synchroniser de manière incrémentielle la table Delta avec la source
Dans Databricks SQL et Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez utiliser WHEN NOT MATCHED BY SOURCE
pour créer des conditions arbitraires afin de supprimer et de remplacer atomiquement une partie d’une table. Cela peut être très utile si vous avez une table source où les enregistrements peuvent continuer à être modifiés ou supprimés pendant plusieurs jours après l’entrée initiale des données, avant de passer à un état final.
La requête suivante montre comment utiliser ce modèle pour sélectionner cinq jours d’enregistrements à partir de la source, mettre à jour les enregistrements correspondants dans la cible, insérer de nouveaux enregistrements de la source dans la cible et supprimer tous les enregistrements sans correspondance des cinq derniers jours dans la cible.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
En fournissant le même filtre booléen sur les tables source et cible, vous pouvez propager dynamiquement les modifications de la table source vers la table cible, y compris les suppressions.
Notes
Ce modèle peut être utilisé sans clause conditionnelle, mais cela entraînerait alors une réécriture complète de la table cible, une opération potentiellement coûteuse.