Ausführen eines Upsert-Vorgangs zum Platzieren von Daten in einer Delta Lake-Tabelle mithilfe von „merge“
Sie können für Daten aus einer Quelltabelle, einer Sicht oder einem Datenrahmen (DataFrame) mithilfe des MERGE
SQL-Vorgangs ein Upsert in eine Delta-Zieltabelle ausführen. Delta Lake unterstützt Einfügungen, Updates und Löschungen in MERGE
sowie eine erweiterte Syntax, die über die SQL-Standards hinausgeht, um komplexere Anwendungsfälle zu vereinfachen.
Angenommen, Sie haben eine Quelltabelle mit dem Namen people10mupdates
oder einen Quellpfad unter /tmp/delta/people-10m-updates
, die bzw. der neue Daten für eine Zieltabelle mit dem Namen people10m
oder einen Zielpfad unter /tmp/delta/people-10m
enthält. Einige dieser neuen Datensätze sind in den Zieldaten möglicherweise bereits enthalten. Zum Zusammenführen der neuen Daten möchten Sie Zeilen aktualisieren, in denen die id
der Person bereits enthalten ist, und die neuen Zeilen einfügen, in denen es keine übereinstimmende id
gibt. Sie können folgende Abfrage ausführen:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Wichtig
Nur eine einzelne Zeile aus der Quelltabelle kann mit einer bestimmten Zeile in der Zieltabelle übereinstimmen. In Databricks Runtime 16.0 und höher werden bedingungen ausgewertet, die in den WHEN MATCHED
und ON
Klauseln angegeben sind, MERGE
um doppelte Übereinstimmungen zu bestimmen. In Databricks Runtime 15.4 LTS und unten MERGE
berücksichtigen Vorgänge nur Bedingungen, die in der ON
Klausel angegeben sind.
Details zur Syntax von Scala und Python finden Sie in der Dokumentation zur Delta Lake-API. Details zur SQL-Syntax finden Sie unter MERGE INTO.
Ändern aller nicht übereinstimmender Zeilen mithilfe von „merge“
In Databricks SQL und Databricks Runtime 12.2 LTS und höher können Sie die WHEN NOT MATCHED BY SOURCE
-Klausel verwenden, um UPDATE
- oder DELETE
-Vorgänge für Datensätze in der Zieltabelle auszuführen, für die keine entsprechenden Datensätze in der Quelltabelle vorhanden sind. Databricks empfiehlt das Hinzufügen einer optionalen bedingten Klausel, um zu vermeiden, dass die Zieltabelle vollständig neu geschrieben wird.
Das folgende Codebeispiel zeigt die grundlegende Syntax der Verwendung für Löschvorgänge. Dabei wird die Zieltabelle mit dem Inhalt der Quelltabelle überschrieben, und nicht übereinstimmende Datensätze werden in der Zieltabelle gelöscht. Ein besser skalierbares Muster für Tabellen mit zeitgebundenen Quellaktualisierungen und -löschungen finden Sie unter Inkrementelles Synchronisieren der Delta-Tabelle mit der Quelle.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Im folgenden Beispiel werden der Klausel WHEN NOT MATCHED BY SOURCE
Bedingungen hinzugefügt und Werte angegeben, die in nicht übereinstimmenden Zielzeilen aktualisiert werden sollen:
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Semantik des Zusammenführungsvorgangs
Hier finden Sie eine ausführliche Beschreibung der Semantik des programmgesteuerten Vorgangs vom Typ merge
.
Es kann eine beliebige Anzahl von
whenMatched
- undwhenNotMatched
-Klauseln geben.whenMatched
-Klauseln werden ausgeführt, wenn eine Quellzeile basierend auf der Übereinstimmungsbedingung einer Zieltabellenzeile entspricht. Diese Klauseln haben die folgende Semantik.whenMatched
-Klauseln können höchstens eineupdate
-Aktion und einedelete
-Aktion enthalten. Dieupdate
-Aktion inmerge
aktualisiert nur die angegebenen Spalten (ähnlich wie derupdate
-Vorgang) der übereinstimmenden Zielzeile. Diedelete
-Aktion löscht die übereinstimmende Zeile.Bei jeder
whenMatched
-Klausel kann es eine optionale Bedingung geben. Ist diese Klauselbedingung vorhanden, wird dieupdate
- oderdelete
-Aktion nur dann für alle übereinstimmenden Quelle/Ziel-Zeilenpaare ausgeführt, wenn die Klauselbedingung „true“ ist.Wenn es mehrere
whenMatched
-Klauseln gibt, werden sie in der Reihenfolge ausgewertet, in der sie angegeben wurden. AllewhenMatched
-Klauseln, mit Ausnahme der letzten, müssen Bedingungen aufweisen.Wenn keine der
whenMatched
-Bedingungen für ein Quelle/Ziel-Zeilenpaar, das mit der Zusammenführungsbedingung übereinstimmt, als „true“ ausgewertet wird, bleibt die Zielzeile unverändert.Verwenden Sie
whenMatched(...).updateAll()
, um alle Spalten der Delta-Zieltabelle mit den entsprechenden Spalten des Quelldatasets zu aktualisieren. Dieser entspricht:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
bei allen Spalten der Delta-Zieltabelle. Deshalb wird bei dieser Aktion davon ausgegangen, dass die Quelltabelle dieselben Spalten wie die Zieltabelle enthält. Andernfalls löst die Abfrage einen Analysefehler aus.
Hinweis
Dieses Verhalten ändert sich, wenn die automatische Schemaentwicklung aktiviert wird. Ausführliche Informationen finden Sie unter Automatische Schemaentwicklung für Delta Lake-Merge.
whenNotMatched
-Klauseln werden ausgeführt, wenn eine Quellzeile basierend auf der Übereinstimmungsbedingung keiner Zielzeile entspricht. Diese Klauseln haben die folgende Semantik.whenNotMatched
-Klauseln können nur dieinsert
-Aktion enthalten. Die neue Zeile wird basierend auf der angegebenen Spalte und den entsprechenden Ausdrücken generiert. Sie müssen nicht alle Spalten in der Zieltabelle angeben. Für nicht angegebene Zielspalten wirdNULL
eingefügt.Jede
whenNotMatched
-Klausel kann eine optionale Bedingung aufweisen. Wenn die Klauselbedingung vorhanden ist, wird eine Quellzeile nur dann eingefügt, wenn diese Bedingung für die entsprechende Zeile TRUE ist. Andernfalls wird die Quellspalte ignoriert.Wenn es mehrere
whenNotMatched
-Klauseln gibt, werden sie in der Reihenfolge ausgewertet, in der sie angegeben wurden. AllewhenNotMatched
-Klauseln, mit Ausnahme der letzten, müssen Bedingungen aufweisen.Verwenden Sie
whenNotMatched(...).insertAll()
, um alle Spalten der Delta-Zieltabelle mit den entsprechenden Spalten des Quelldatasets hinzuzufügen. Dieser entspricht:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
bei allen Spalten der Delta-Zieltabelle. Deshalb wird bei dieser Aktion davon ausgegangen, dass die Quelltabelle dieselben Spalten wie die Zieltabelle enthält. Andernfalls löst die Abfrage einen Analysefehler aus.
Hinweis
Dieses Verhalten ändert sich, wenn die automatische Schemaentwicklung aktiviert wird. Ausführliche Informationen finden Sie unter Automatische Schemaentwicklung für Delta Lake-Merge.
whenNotMatchedBySource
-Klauseln werden ausgeführt, wenn eine Zielzeile basierend auf der Mergebedingung keiner Quellzeile entspricht. Diese Klauseln haben die folgende Semantik.whenNotMatchedBySource
-Klauseln können Aktionen vom Typdelete
oderupdate
angeben.- Bei jeder
whenNotMatchedBySource
-Klausel kann es eine optionale Bedingung geben. Wenn die Klauselbedingung vorhanden ist, wird nur dann eine Zielzeile geändert, wenn diese Bedingung für die entsprechende Zeile erfüllt ist. Andernfalls bleibt die Zielzeile unverändert. - Wenn es mehrere
whenNotMatchedBySource
-Klauseln gibt, werden sie in der Reihenfolge ausgewertet, in der sie angegeben wurden. Bei allenwhenNotMatchedBySource
-Klauseln, mit Ausnahme der letzten, muss es Bedingungen geben. whenNotMatchedBySource
-Klauseln verfügen per Definition nicht über eine Quellzeile zum Abrufen von Spaltenwerten. Daher kann nicht auf Quellspalten verwiesen werden. Für die zu ändernden Spalten können Sie entweder ein Literal angeben oder eine Aktion für die Zielspalte ausführen (beispielsweiseSET target.deleted_count = target.deleted_count + 1
).
Wichtig
- Ein Zusammenführungsvorgang (
merge
) kann fehlschlagen, wenn mehrere Zeilen des Quelldatasets übereinstimmen und der Zusammenführungsvorgang versucht, dieselben Zeilen der Delta-Zieltabelle zu aktualisieren. Entsprechend der SQL-Semantik von „merge“ ist ein solcher Aktualisierungsvorgang mehrdeutig, da unklar ist, welche Quellzeile zum Aktualisieren der übereinstimmenden Zielzeile verwendet werden soll. Sie können die Quelltabelle vorverarbeiten, um die Möglichkeit mehrerer Übereinstimmungen auszuschließen. - Sie können einen SQL
MERGE
-Vorgang nur dann auf eine SQL VIEW anwenden, wenn die Ansicht alsCREATE VIEW viewName AS SELECT * FROM deltaTable
definiert wurde.
Datendeduplizierung beim Schreiben in Delta-Tabellen
Ein gängiger ETL-Anwendungsfall ist das Sammeln von Protokollen in der Delta-Tabelle, indem sie an eine Tabelle angefügt werden. Oft können die Quellen jedoch doppelte Protokolldatensätze generieren, und es sind nachfolgende Deduplizierungsschritte erforderlich, um sie zu übernehmen. Mit merge
können Sie das Einfügen der doppelten Datensätze vermeiden.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Hinweis
Das Dataset, das die neuen Protokolle enthält, muss in sich selbst dedupliziert werden. Durch die SQL-Semantik der Zusammenführung gleicht es die neuen Daten mit den vorhandenen Daten in der Tabelle ab und dedupliziert sie. Wenn das neue Dataset aber doppelte Daten enthält, werden diese eingefügt. Deduplizieren Sie deshalb die neuen Daten, bevor Sie sie in der Tabelle zusammenführen.
Wenn Sie wissen, dass Sie doppelte Datensätze möglicherweise nur für wenige Tage erhalten, können Sie Ihre Abfrage weiter optimieren, indem Sie die Tabelle nach Datum partitionieren und dann den Datumsbereich der Zieltabelle angeben, nach dem eine Übereinstimmung erstellt werden soll.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Dies ist effizienter als der vorhergehende Befehl, da er nur in den Protokollen der letzten 7 Tage nach Duplikaten sucht und nicht in der gesamten Tabelle. Darüber hinaus können Sie diese „insert-only“-Zusammenführung bei strukturiertem Streaming verwenden, um eine kontinuierliche Deduplizierung der Protokolle durchzuführen.
- In einer Streamingabfrage können Sie mithilfe des Zusammenführungsvorgangs in
foreachBatch
Streamingdaten mit Deduplizierung fortlaufend in eine Delta-Tabelle schreiben. Weitere Informationen zuforeachBatch
finden Sie im folgenden Streamingbeispiel. - In einer anderen Streamingabfrage können Sie deduplizierte Daten kontinuierlich aus dieser Delta-Tabelle lesen. Dies ist möglich, weil bei einer „insert-only“-Zusammenführung nur neue Daten an die Delta-Tabelle angefügt werden.
Sich langsam ändernde Daten (SCD) und Change Data Capture (CDC) mit Delta Lake
Delta Live Tables bietet native Unterstützung für die Nachverfolgung und Anwendung von SCD-Typ 1 und 2. Verwenden Sie APPLY CHANGES INTO
mit Delta Live Tables, um sicherzustellen, dass nicht ordnungsgemäße Datensätze bei der Verarbeitung von CDC-Feeds korrekt verarbeitet werden. Weitere Informationen finden Sie unter Die APPLY CHANGES-APIs: Vereinfachen der Änderungsdatenerfassung mit Delta Live Tables.
Inkrementelles Synchronisieren der Delta-Tabelle mit der Quelle
In Databricks SQL und Databricks Runtime 12.2 LTS und höher können Sie mithilfe von WHEN NOT MATCHED BY SOURCE
beliebige Bedingungen erstellen, um einen Teil einer Tabelle atomisch zu löschen und zu ersetzen. Dies kann insbesondere hilfreich sein, wenn Sie über eine Quelltabelle verfügen, in der Datensätze nach der ursprünglichen Dateneingabe noch mehrere Tage lang geändert oder gelöscht werden können, aber letztendlich in einen abschließenden Zustand erreichen.
Die folgende Abfrage zeigt, wie dieses Muster verwendet werden kann, um Datensätze der letzten fünf Tage aus der Quelle auszuwählen, entsprechende Datensätze im Ziel zu aktualisieren, neue Datensätze aus der Quelle in das Ziel einzufügen und alle nicht entsprechenden Datensätze der letzten fünf Tage im Ziel zu löschen:
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Durch Angeben des gleichen booleschen Filters für die Quell- und Zieltabellen können Sie Änderungen dynamisch aus Ihrer Quelltabelle an Zieltabellen weitergeben (einschließlich Löschvorgängen).
Hinweis
Dieses Muster kann zwar auch ohne bedingte Klauseln verwendet werden, dies würde jedoch dazu führen, dass die Zieltabelle vollständig neu geschrieben wird, was teuer sein kann.