Upsert till en Delta Lake-tabell med sammanslagning

Du kan överföra data från en källtabell, vy eller DataFrame till en Delta-måltabell med hjälp MERGE av SQL-åtgärden. Delta Lake stöder infogningar, uppdateringar och borttagningar i MERGEoch stöder utökad syntax utöver SQL-standarderna för att underlätta avancerade användningsfall.

Anta att du har en källtabell med namnet people10mupdates eller en källsökväg som /tmp/delta/people-10m-updates innehåller nya data för en måltabell med namnet people10m eller en målsökväg på /tmp/delta/people-10m. Vissa av dessa nya poster kanske redan finns i måldata. Om du vill sammanfoga nya data vill du uppdatera rader där personens id redan finns och infoga de nya raderna där ingen matchning id finns. Du kan köra följande fråga:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Se Delta Lake API-dokumentationen för Scala- och Python-syntaxinformation. Information om SQL-syntax finns i MERGE INTO

Ändra alla omatchade rader med hjälp av sammanslagning

I Databricks SQL och Databricks Runtime 12.1 och senare kan du använda WHEN NOT MATCHED BY SOURCE satsen till UPDATE eller DELETE poster i måltabellen som inte har motsvarande poster i källtabellen. Databricks rekommenderar att du lägger till en valfri villkorssats för att undvika att skriva om måltabellen helt.

I följande kodexempel visas den grundläggande syntaxen för att använda detta för borttagningar, skriva över måltabellen med innehållet i källtabellen och ta bort omatchade poster i måltabellen. Ett mer skalbart mönster för tabeller där källuppdateringar och borttagningar är tidsbundna finns i Synkronisera Delta-tabell stegvis med källan.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

I följande exempel läggs villkor till i WHEN NOT MATCHED BY SOURCE -satsen och värden som ska uppdateras i omatchade målrader.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Kopplingsåtgärdssemantik

Följande är en detaljerad beskrivning av semantiken för merge programmatisk åtgärd.

  • Det kan finnas valfritt antal whenMatched och whenNotMatched satser.

  • whenMatched -satser körs när en källrad matchar en måltabellrad baserat på matchningsvillkoret. Dessa satser har följande semantik.

    • whenMatched satser kan ha högst en update och en delete åtgärd. Åtgärden update i merge uppdaterar endast de angivna kolumnerna (liknar updateåtgärden) för den matchade målraden. Åtgärden delete tar bort den matchade raden.

    • Varje whenMatched sats kan ha ett valfritt villkor. Om det här villkoret finns update körs eller-åtgärden delete endast för matchande källmålradpar när villkoret för satsen är sant.

    • Om det finns flera whenMatched satser utvärderas de i den ordning de anges. Alla whenMatched satser, utom den sista, måste ha villkor.

    • Om inget av whenMatched villkoren utvärderas till sant för ett käll- och målradpar som matchar kopplingsvillkoret lämnas målraden oförändrad.

    • Om du vill uppdatera alla kolumner i måldeltabeln med motsvarande kolumner i källdatauppsättningen använder du whenMatched(...).updateAll(). Detta motsvarar:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      för alla kolumner i deltatabellen för mål. Därför förutsätter den här åtgärden att källtabellen har samma kolumner som de i måltabellen, annars utlöser frågan ett analysfel.

      Kommentar

      Det här beteendet ändras när automatisk schemamigrering är aktiverad. Mer information finns i automatisk schemautveckling .

  • whenNotMatched -satser körs när en källrad inte matchar någon målrad baserat på matchningsvillkoret. Dessa satser har följande semantik.

    • whenNotMatched -satser kan bara ha åtgärden insert . Den nya raden genereras baserat på den angivna kolumnen och motsvarande uttryck. Du behöver inte ange alla kolumner i måltabellen. För ospecificerade målkolumner NULL infogas.

    • Varje whenNotMatched sats kan ha ett valfritt villkor. Om villkoret finns infogas endast en källrad om villkoret är sant för den raden. Annars ignoreras källkolumnen.

    • Om det finns flera whenNotMatched satser utvärderas de i den ordning de anges. Alla whenNotMatched satser, utom den sista, måste ha villkor.

    • Om du vill infoga alla kolumner i måldeltabellen med motsvarande kolumner i källdatauppsättningen använder du whenNotMatched(...).insertAll(). Detta motsvarar:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      för alla kolumner i deltatabellen för mål. Därför förutsätter den här åtgärden att källtabellen har samma kolumner som de i måltabellen, annars utlöser frågan ett analysfel.

      Kommentar

      Det här beteendet ändras när automatisk schemamigrering är aktiverad. Mer information finns i automatisk schemautveckling .

  • whenNotMatchedBySource -satser körs när en målrad inte matchar någon källrad baserat på kopplingsvillkoret. Dessa satser har följande semantik.

    • whenNotMatchedBySource satser kan ange delete och update åtgärder.
    • Varje whenNotMatchedBySource sats kan ha ett valfritt villkor. Om villkoret för satsen finns ändras endast en målrad om villkoret är sant för den raden. Annars lämnas målraden oförändrad.
    • Om det finns flera whenNotMatchedBySource satser utvärderas de i den ordning de anges. Alla whenNotMatchedBySource satser, utom den sista, måste ha villkor.
    • Satser har per definition whenNotMatchedBySource ingen källrad att hämta kolumnvärden från, så källkolumner kan inte refereras till. För varje kolumn som ska ändras kan du antingen ange en literal eller utföra en åtgärd på målkolumnen, till exempel SET target.deleted_count = target.deleted_count + 1.

Viktigt!

  • En merge åtgärd kan misslyckas om flera rader i källdatauppsättningen matchar och sammanfogningen försöker uppdatera samma rader i måldeltabeln. Enligt SQL-semantiken för sammanslagning är en sådan uppdateringsåtgärd tvetydig eftersom det är oklart vilken källrad som ska användas för att uppdatera den matchade målraden. Du kan förbearbeta källtabellen för att eliminera risken för flera matchningar.
  • Du kan endast tillämpa en SQL-åtgärd MERGE på en SQL VIEW om vyn har definierats som CREATE VIEW viewName AS SELECT * FROM deltaTable.

Datadeduplicering när du skriver till Delta-tabeller

Ett vanligt ETL-användningsfall är att samla in loggar i Delta-tabellen genom att lägga till dem i en tabell. Men ofta kan källorna generera dubbletter av loggposter och underordnade dedupliceringssteg krävs för att ta hand om dem. Med mergekan du undvika att infoga de duplicerade posterna.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Kommentar

Datamängden som innehåller de nya loggarna måste dedupliceras inom sig själv. Med SQL-semantiken för sammanslagning matchar och deduplicerar den nya data med befintliga data i tabellen, men om det finns duplicerade data i den nya datauppsättningen infogas de. Därför deduplicerar du de nya data innan de sammanfogas i tabellen.

Om du vet att du bara kan hämta dubbletter av poster i några dagar kan du optimera frågan ytterligare genom att partitionera tabellen efter datum och sedan ange datumintervallet för måltabellen som ska matchas.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Det här är effektivare än föregående kommando eftersom det bara söker efter dubbletter under de senaste 7 dagarnas loggar, inte hela tabellen. Dessutom kan du använda den här infogade sammanfogningen med Structured Streaming för att utföra kontinuerlig deduplicering av loggarna.

  • I en direktuppspelningsfråga kan du använda sammanslagningsåtgärden i foreachBatch för att kontinuerligt skriva strömmande data till en Delta-tabell med deduplicering. Mer information om finns i följande strömningsexempelforeachBatch.
  • I en annan direktuppspelningsfråga kan du kontinuerligt läsa deduplicerade data från den här Delta-tabellen. Detta är möjligt eftersom en sammanfogning endast för infogning lägger till nya data i Delta-tabellen.

Ändra data (SCD) långsamt och ändra datainsamling (CDC) med Delta Lake

Delta Live Tables har inbyggt stöd för spårning och tillämpning av SCD Typ 1 och Typ 2. Använd APPLY CHANGES INTO med Delta Live Tables för att säkerställa att poster i fel ordning hanteras korrekt vid bearbetning av CDC-feeds. Se Förenklad insamling av ändringsdata med API:et APPLY CHANGES i Delta Live Tables.

Synkronisera Delta-tabellen stegvis med källan

I Databricks SQL och Databricks Runtime 12.1 och senare kan du använda WHEN NOT MATCHED BY SOURCE för att skapa godtyckliga villkor för att atomiskt ta bort och ersätta en del av en tabell. Detta kan vara särskilt användbart när du har en källtabell där poster kan ändras eller tas bort i flera dagar efter den första datainmatningen, men slutligen regleras till ett slutligt tillstånd.

Följande fråga visar hur du använder det här mönstret för att välja 5 dagars poster från källan, uppdatera matchande poster i målet, infoga nya poster från källan till målet och ta bort alla omatchade poster från de senaste 5 dagarna i målet.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Genom att tillhandahålla samma booleska filter på käll- och måltabellerna kan du dynamiskt sprida ändringar från källan till måltabeller, inklusive borttagningar.

Kommentar

Även om det här mönstret kan användas utan några villkorssatser, skulle detta leda till att måltabellen skrivs om helt och hållet, vilket kan vara dyrt.