Ausführen eines Upsert-Vorgangs zum Platzieren von Daten in einer Delta Lake-Tabelle mithilfe von „merge“

Sie können für Daten aus einer Quelltabelle, einer Sicht oder einem Datenrahmen (DataFrame) mithilfe des MERGE SQL-Vorgangs ein Upsert in eine Delta-Zieltabelle ausführen. Delta Lake unterstützt Einfügungen, Updates und Löschungen in MERGE sowie eine erweiterte Syntax, die über die SQL-Standards hinausgeht, um komplexere Anwendungsfälle zu vereinfachen.

Angenommen, Sie haben eine Quelltabelle mit dem Namen people10mupdates oder einen Quellpfad unter /tmp/delta/people-10m-updates, die bzw. der neue Daten für eine Zieltabelle mit dem Namen people10m oder einen Zielpfad unter /tmp/delta/people-10m enthält. Einige dieser neuen Datensätze sind in den Zieldaten möglicherweise bereits enthalten. Zum Zusammenführen der neuen Daten möchten Sie Zeilen aktualisieren, in denen die id der Person bereits enthalten ist, und die neuen Zeilen einfügen, in denen es keine übereinstimmende id gibt. Sie können folgende Abfrage ausführen:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Details zur Syntax von Scala und Python finden Sie in der Dokumentation zur Delta Lake-API. Details zur SQL-Syntax finden Sie unter MERGE INTO.

Ändern aller nicht übereinstimmender Zeilen mithilfe von „merge“

In Databricks SQL und Databricks Runtime 12.2 LTS und höher können Sie die WHEN NOT MATCHED BY SOURCE-Klausel verwenden, um UPDATE- oder DELETE-Vorgänge für Datensätze in der Zieltabelle auszuführen, für die keine entsprechenden Datensätze in der Quelltabelle vorhanden sind. Databricks empfiehlt das Hinzufügen einer optionalen bedingten Klausel, um zu vermeiden, dass die Zieltabelle vollständig neu geschrieben wird.

Das folgende Codebeispiel zeigt die grundlegende Syntax der Verwendung für Löschvorgänge. Dabei wird die Zieltabelle mit dem Inhalt der Quelltabelle überschrieben, und nicht übereinstimmende Datensätze werden in der Zieltabelle gelöscht. Ein besser skalierbares Muster für Tabellen mit zeitgebundenen Quellaktualisierungen und -löschungen finden Sie unter Inkrementelles Synchronisieren der Delta-Tabelle mit der Quelle.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Im folgenden Beispiel werden der Klausel WHEN NOT MATCHED BY SOURCE Bedingungen hinzugefügt und Werte angegeben, die in nicht übereinstimmenden Zielzeilen aktualisiert werden sollen:

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Semantik des Zusammenführungsvorgangs

Hier finden Sie eine ausführliche Beschreibung der Semantik des programmgesteuerten Vorgangs vom Typ merge.

  • Es kann eine beliebige Anzahl von whenMatched- und whenNotMatched-Klauseln geben.

  • whenMatched-Klauseln werden ausgeführt, wenn eine Quellzeile basierend auf der Übereinstimmungsbedingung einer Zieltabellenzeile entspricht. Diese Klauseln haben die folgende Semantik.

    • whenMatched-Klauseln können höchstens eine update-Aktion und eine delete-Aktion enthalten. Die update-Aktion in merge aktualisiert nur die angegebenen Spalten (ähnlich wie der update-Vorgang) der übereinstimmenden Zielzeile. Die delete-Aktion löscht die übereinstimmende Zeile.

    • Bei jeder whenMatched-Klausel kann es eine optionale Bedingung geben. Ist diese Klauselbedingung vorhanden, wird die update- oder delete-Aktion nur dann für alle übereinstimmenden Quelle/Ziel-Zeilenpaare ausgeführt, wenn die Klauselbedingung „true“ ist.

    • Wenn es mehrere whenMatched-Klauseln gibt, werden sie in der Reihenfolge ausgewertet, in der sie angegeben wurden. Alle whenMatched-Klauseln, mit Ausnahme der letzten, müssen Bedingungen aufweisen.

    • Wenn keine der whenMatched-Bedingungen für ein Quelle/Ziel-Zeilenpaar, das mit der Zusammenführungsbedingung übereinstimmt, als „true“ ausgewertet wird, bleibt die Zielzeile unverändert.

    • Verwenden Sie whenMatched(...).updateAll(), um alle Spalten der Delta-Zieltabelle mit den entsprechenden Spalten des Quelldatasets zu aktualisieren. Dieser entspricht:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      bei allen Spalten der Delta-Zieltabelle. Deshalb wird bei dieser Aktion davon ausgegangen, dass die Quelltabelle dieselben Spalten wie die Zieltabelle enthält. Andernfalls löst die Abfrage einen Analysefehler aus.

      Hinweis

      Dieses Verhalten ändert sich, wenn die automatische Schemamigration aktiviert wird. Ausführliche Informationen finden Sie unter Automatische Schemaentwicklung für Delta Lake-Merge.

  • whenNotMatched-Klauseln werden ausgeführt, wenn eine Quellzeile basierend auf der Übereinstimmungsbedingung keiner Zielzeile entspricht. Diese Klauseln haben die folgende Semantik.

    • whenNotMatched-Klauseln können nur die insert-Aktion enthalten. Die neue Zeile wird basierend auf der angegebenen Spalte und den entsprechenden Ausdrücken generiert. Sie müssen nicht alle Spalten in der Zieltabelle angeben. Für nicht angegebene Zielspalten wird NULL eingefügt.

    • Jede whenNotMatched-Klausel kann eine optionale Bedingung aufweisen. Wenn die Klauselbedingung vorhanden ist, wird eine Quellzeile nur dann eingefügt, wenn diese Bedingung für die entsprechende Zeile TRUE ist. Andernfalls wird die Quellspalte ignoriert.

    • Wenn es mehrere whenNotMatched-Klauseln gibt, werden sie in der Reihenfolge ausgewertet, in der sie angegeben wurden. Alle whenNotMatched-Klauseln, mit Ausnahme der letzten, müssen Bedingungen aufweisen.

    • Verwenden Sie whenNotMatched(...).insertAll(), um alle Spalten der Delta-Zieltabelle mit den entsprechenden Spalten des Quelldatasets hinzuzufügen. Dieser entspricht:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      bei allen Spalten der Delta-Zieltabelle. Deshalb wird bei dieser Aktion davon ausgegangen, dass die Quelltabelle dieselben Spalten wie die Zieltabelle enthält. Andernfalls löst die Abfrage einen Analysefehler aus.

      Hinweis

      Dieses Verhalten ändert sich, wenn die automatische Schemamigration aktiviert wird. Ausführliche Informationen finden Sie unter Automatische Schemaentwicklung für Delta Lake-Merge.

  • whenNotMatchedBySource-Klauseln werden ausgeführt, wenn eine Zielzeile basierend auf der Mergebedingung keiner Quellzeile entspricht. Diese Klauseln haben die folgende Semantik.

    • whenNotMatchedBySource-Klauseln können Aktionen vom Typ delete oder update angeben.
    • Bei jeder whenNotMatchedBySource-Klausel kann es eine optionale Bedingung geben. Wenn die Klauselbedingung vorhanden ist, wird nur dann eine Zielzeile geändert, wenn diese Bedingung für die entsprechende Zeile erfüllt ist. Andernfalls bleibt die Zielzeile unverändert.
    • Wenn es mehrere whenNotMatchedBySource-Klauseln gibt, werden sie in der Reihenfolge ausgewertet, in der sie angegeben wurden. Bei allen whenNotMatchedBySource-Klauseln, mit Ausnahme der letzten, muss es Bedingungen geben.
    • whenNotMatchedBySource-Klauseln verfügen per Definition nicht über eine Quellzeile zum Abrufen von Spaltenwerten. Daher kann nicht auf Quellspalten verwiesen werden. Für die zu ändernden Spalten können Sie entweder ein Literal angeben oder eine Aktion für die Zielspalte ausführen (beispielsweise SET target.deleted_count = target.deleted_count + 1).

Wichtig

  • Ein Zusammenführungsvorgang (merge) kann fehlschlagen, wenn mehrere Zeilen des Quelldatasets übereinstimmen und der Zusammenführungsvorgang versucht, dieselben Zeilen der Delta-Zieltabelle zu aktualisieren. Entsprechend der SQL-Semantik von „merge“ ist ein solcher Aktualisierungsvorgang mehrdeutig, da unklar ist, welche Quellzeile zum Aktualisieren der übereinstimmenden Zielzeile verwendet werden soll. Sie können die Quelltabelle vorverarbeiten, um die Möglichkeit mehrerer Übereinstimmungen auszuschließen.
  • Sie können einen SQL MERGE-Vorgang nur dann auf eine SQL VIEW anwenden, wenn die Ansicht als CREATE VIEW viewName AS SELECT * FROM deltaTable definiert wurde.

Datendeduplizierung beim Schreiben in Delta-Tabellen

Ein gängiger ETL-Anwendungsfall ist das Sammeln von Protokollen in der Delta-Tabelle, indem sie an eine Tabelle angefügt werden. Oft können die Quellen jedoch doppelte Protokolldatensätze generieren, und es sind nachfolgende Deduplizierungsschritte erforderlich, um sie zu übernehmen. Mit merge können Sie das Einfügen der doppelten Datensätze vermeiden.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Hinweis

Das Dataset, das die neuen Protokolle enthält, muss in sich selbst dedupliziert werden. Durch die SQL-Semantik der Zusammenführung gleicht es die neuen Daten mit den vorhandenen Daten in der Tabelle ab und dedupliziert sie. Wenn das neue Dataset aber doppelte Daten enthält, werden diese eingefügt. Deduplizieren Sie deshalb die neuen Daten, bevor Sie sie in der Tabelle zusammenführen.

Wenn Sie wissen, dass Sie doppelte Datensätze möglicherweise nur für wenige Tage erhalten, können Sie Ihre Abfrage weiter optimieren, indem Sie die Tabelle nach Datum partitionieren und dann den Datumsbereich der Zieltabelle angeben, nach dem eine Übereinstimmung erstellt werden soll.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Dies ist effizienter als der vorhergehende Befehl, da er nur in den Protokollen der letzten 7 Tage nach Duplikaten sucht und nicht in der gesamten Tabelle. Darüber hinaus können Sie diese „insert-only“-Zusammenführung bei strukturiertem Streaming verwenden, um eine kontinuierliche Deduplizierung der Protokolle durchzuführen.

  • In einer Streamingabfrage können Sie mithilfe des Zusammenführungsvorgangs in foreachBatch Streamingdaten mit Deduplizierung fortlaufend in eine Delta-Tabelle schreiben. Weitere Informationen zu foreachBatch finden Sie im folgenden Streamingbeispiel.
  • In einer anderen Streamingabfrage können Sie deduplizierte Daten kontinuierlich aus dieser Delta-Tabelle lesen. Dies ist möglich, weil bei einer „insert-only“-Zusammenführung nur neue Daten an die Delta-Tabelle angefügt werden.

Sich langsam ändernde Daten (SCD) und Change Data Capture (CDC) mit Delta Lake

Delta Live Tables bietet native Unterstützung für die Nachverfolgung und Anwendung von SCD-Typ 1 und 2. Verwenden Sie APPLY CHANGES INTO mit Delta Live Tables, um sicherzustellen, dass nicht ordnungsgemäße Datensätze bei der Verarbeitung von CDC-Feeds korrekt verarbeitet werden. Siehe Vereinfachte Änderungsdatenerfassung mit der ÄNDERUNGEN ÜBERNEHMEN API in Delta Live Tables.

Inkrementelles Synchronisieren der Delta-Tabelle mit der Quelle

In Databricks SQL und Databricks Runtime 12.2 LTS und höher können Sie mithilfe von WHEN NOT MATCHED BY SOURCE beliebige Bedingungen erstellen, um einen Teil einer Tabelle atomisch zu löschen und zu ersetzen. Dies kann insbesondere hilfreich sein, wenn Sie über eine Quelltabelle verfügen, in der Datensätze nach der ursprünglichen Dateneingabe noch mehrere Tage lang geändert oder gelöscht werden können, aber letztendlich in einen abschließenden Zustand erreichen.

Die folgende Abfrage zeigt, wie dieses Muster verwendet werden kann, um Datensätze der letzten fünf Tage aus der Quelle auszuwählen, entsprechende Datensätze im Ziel zu aktualisieren, neue Datensätze aus der Quelle in das Ziel einzufügen und alle nicht entsprechenden Datensätze der letzten fünf Tage im Ziel zu löschen:

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Durch Angeben des gleichen booleschen Filters für die Quell- und Zieltabellen können Sie Änderungen dynamisch aus Ihrer Quelltabelle an Zieltabellen weitergeben (einschließlich Löschvorgängen).

Hinweis

Dieses Muster kann zwar auch ohne bedingte Klauseln verwendet werden, dies würde jedoch dazu führen, dass die Zieltabelle vollständig neu geschrieben wird, was teuer sein kann.