Upsert to a Delta Lake table using merge
Az SQL-művelettel MERGE
adatokat állíthat be egy forrástáblából, nézetből vagy DataFrame-ből egy cél Delta-táblába. A Delta Lake támogatja a beszúrásokat, frissítéseket és törléseket MERGE
, és támogatja az SQL-szabványokon túli kiterjesztett szintaxist a speciális használati esetek megkönnyítése érdekében.
Tegyük fel, hogy rendelkezik egy forrástáblával vagy people10mupdates
egy forrásútvonallal /tmp/delta/people-10m-updates
, amely új adatokat tartalmaz egy elnevezett people10m
céltáblához vagy egy célútvonalhoz a következő helyen /tmp/delta/people-10m
: . Előfordulhat, hogy az új rekordok némelyike már szerepel a céladatokban. Az új adatok egyesítéséhez frissítenie kell azokat a sorokat, ahol a személy id
már jelen van, és beszúrja azokat az új sorokat, ahol nincs egyezés id
. A következő lekérdezést futtathatja:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Fontos
A céltábla egy adott sorához csak a forrástábla egyetlen sora felelhet meg. A Databricks Runtime 16.0-s és újabb MERGE
verziókban kiértékeli a benne megadott feltételeket és ON
záradékokat az WHEN MATCHED
ismétlődő egyezések meghatározásához. A Databricks Runtime 15.4 LTS és újabb MERGE
verziókban a műveletek csak a ON
záradékban megadott feltételeket veszik figyelembe.
A Scala és a Python szintaxisának részleteiért tekintse meg a Delta Lake API dokumentációját . Az SQL szintaxisának részleteiért lásd: MERGE INTO
Az összes nem egyező sor módosítása egyesítéssel
A Databricks SQL-ben és a Databricks Runtime 12.2 LTS-ben és újabb verziókban használhatja a WHEN NOT MATCHED BY SOURCE
záradékot a céltábla azon rekordjaihoz vagy DELETE
rekordjaihozUPDATE
, amelyek nem rendelkeznek megfelelő rekordokkal a forrástáblában. A Databricks egy opcionális feltételes záradék hozzáadását javasolja a céltábla teljes újraírásának elkerülése érdekében.
Az alábbi példakód azt mutatja be, hogy milyen alapszintaxissal használható ez a törléshez, felülírja a céltáblát a forrástábla tartalmával, és törli a céltáblában a nem egyező rekordokat. A forrásfrissítéseket és törléseket időkorláttal rendelkező táblák méretezhetőbb mintáját lásd : Delta-tábla növekményes szinkronizálása a forrással.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Az alábbi példa feltételeket ad a WHEN NOT MATCHED BY SOURCE
záradékhoz, és a nem egyező célsorokban frissíteni kívánt értékeket adja meg.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Egyesítési művelet szemantikája
A programozott művelet szemantikájának részletes leírása az merge
alábbiakban található.
A záradékok száma és
whenNotMatched
számawhenMatched
tetszőleges lehet.whenMatched
záradékok akkor lesznek végrehajtva, ha egy forrássor megfelel egy céltáblasornak az egyeztetési feltétel alapján. Ezek a záradékok a következő szemantikával rendelkeznek.whenMatched
a záradékok legfeljebb egyupdate
és egydelete
műveletet tartalmazhatnak. Aupdate
műveletmerge
csak a megfeleltetett célsor megadott oszlopait frissíti (aupdate
művelethez hasonlóan). Adelete
művelet törli a egyeztetett sort.Minden
whenMatched
záradék rendelkezhet opcionális feltétellel. Ha ez a záradékfeltétel létezik, a program csak akkor hajtja végre aupdate
delete
műveletet az egyező forrás-cél sorpár esetében, ha a záradékfeltétel igaz.Ha több
whenMatched
záradék is létezik, a rendszer a megadott sorrendben értékeli ki őket. Az utolsó kivételével mindenwhenMatched
záradéknak feltételekkel kell rendelkeznie.Ha az
whenMatched
egyesítési feltételnek megfelelő forrás- és célsorpár egyik feltétele sem lesz igaz, a célsor változatlan marad.Ha frissíteni szeretné a cél Delta-tábla összes oszlopát a forrásadatkészlet megfelelő oszlopaival, használja a következőt
whenMatched(...).updateAll()
: . Ez egyenértékű a következő értékeket:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
a cél Delta-tábla összes oszlopához. Ezért ez a művelet feltételezi, hogy a forrástábla oszlopai megegyeznek a céltáblában szereplő oszlopokkal, ellenkező esetben a lekérdezés elemzési hibát jelez.
Feljegyzés
Ez a viselkedés megváltozik, ha engedélyezve van az automatikus sémafejlődés. Részletekért tekintse meg az automatikus sémafejlődést .
whenNotMatched
záradékok akkor lesznek végrehajtva, ha egy forrássor nem egyezik meg egyetlen célsorsal sem az egyeztetési feltétel alapján. Ezek a záradékok a következő szemantikával rendelkeznek.whenNotMatched
záradékok csak ainsert
műveletet tartalmazhatják. Az új sor a megadott oszlop és a hozzájuk tartozó kifejezések alapján jön létre. Nem kell megadnia a céltábla összes oszlopát. A meg nem határozott céloszlopokNULL
be lesznek szúrva.Minden
whenNotMatched
záradék rendelkezhet opcionális feltétellel. Ha a záradékfeltétel jelen van, a forrássor csak akkor lesz beszúrva, ha ez a feltétel igaz az adott sorra. Ellenkező esetben a forrásoszlop figyelmen kívül lesz hagyva.Ha több
whenNotMatched
záradék is létezik, a rendszer a megadott sorrendben értékeli ki őket. Az utolsó kivételével mindenwhenNotMatched
záradéknak feltételekkel kell rendelkeznie.Ha a cél Delta-tábla összes oszlopát a forrásadatkészlet megfelelő oszlopaival szeretné beszúrni, használja
whenNotMatched(...).insertAll()
a következőt: . Ez egyenértékű a következő értékeket:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
a cél Delta-tábla összes oszlopához. Ezért ez a művelet feltételezi, hogy a forrástábla oszlopai megegyeznek a céltáblában szereplő oszlopokkal, ellenkező esetben a lekérdezés elemzési hibát jelez.
Feljegyzés
Ez a viselkedés megváltozik, ha engedélyezve van az automatikus sémafejlődés. Részletekért tekintse meg az automatikus sémafejlődést .
whenNotMatchedBySource
záradékok akkor lesznek végrehajtva, ha a célsor nem egyezik meg egyetlen forrássorsal sem az egyesítési feltétel alapján. Ezek a záradékok a következő szemantikával rendelkeznek.whenNotMatchedBySource
záradékok megadhatók ésupdate
műveleteket hajthatnak végredelete
.- Minden
whenNotMatchedBySource
záradék rendelkezhet opcionális feltétellel. Ha a záradékfeltétel jelen van, a célsor csak akkor módosul, ha ez a feltétel igaz az adott sorra. Ellenkező esetben a célsor változatlan marad. - Ha több
whenNotMatchedBySource
záradék is létezik, a rendszer a megadott sorrendben értékeli ki őket. Az utolsó kivételével mindenwhenNotMatchedBySource
záradéknak feltételekkel kell rendelkeznie. - A záradékok definíció szerint
whenNotMatchedBySource
nem rendelkeznek forrássorsal az oszlopértékek lekéréséhez, ezért a forrásoszlopokra nem lehet hivatkozni. Az egyes oszlopok módosításához megadhatja a konstanst, vagy végrehajthat egy műveletet a céloszlopon, példáulSET target.deleted_count = target.deleted_count + 1
.
Fontos
- A
merge
művelet meghiúsulhat, ha a forrásadatkészlet több sora egyezik, és az egyesítés megkísérli frissíteni a cél deltatábla ugyanazon sorait. Az egyesítés SQL-szemantikája szerint az ilyen frissítési művelet nem egyértelmű, mivel nem világos, hogy melyik forrássort kell használni a megfeleltetett célsor frissítéséhez. A forrástábla előfeldolgozásával kiküszöbölheti a több egyezés lehetőségét. - SQL-műveletet
MERGE
csak akkor alkalmazhat sql view-ra, ha a nézet definiálvaCREATE VIEW viewName AS SELECT * FROM deltaTable
van.
Adatdeduplikáció Delta-táblákba való íráskor
Az ETL gyakori használati esete, hogy naplókat gyűjt a Delta táblába úgy, hogy hozzáfűzi őket egy táblához. A források azonban gyakran duplikált naplórekordokat hozhatnak létre, és az alsóbb rétegbeli deduplikációs lépésekre van szükség a kezelésükhöz. Ezzel merge
elkerülheti az ismétlődő rekordok beszúrását.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Feljegyzés
Az új naplókat tartalmazó adatkészletet önmagában kell deduplikálni. Az egyesítés SQL-szemantikai adatai megegyeznek és deduplikálják az új adatokat a táblában lévő meglévő adatokkal, de ha az új adatkészletben duplikált adatok találhatók, akkor be lesz szúrva. Ezért deduplikálja az új adatokat a táblába való egyesítés előtt.
Ha tudja, hogy csak néhány napig kaphat duplikált rekordokat, a lekérdezést tovább optimalizálhatja a tábla dátum szerinti particionálásával, majd a céltábla dátumtartományának megadásával.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Ez hatékonyabb, mint az előző parancs, mivel csak a naplók utolsó 7 napjában keresi az ismétlődéseket, nem pedig a teljes táblát. Emellett ezt a csak beszúrásos egyesítést a strukturált streameléssel is használhatja a naplók folyamatos deduplikációjának végrehajtásához.
- Streamelési lekérdezésekben
foreachBatch
az egyesítési művelettel folyamatosan írhat streamelési adatokat egy Delta-táblába deduplikációval. További információt a következő streamelési példábanforeachBatch
talál. - Egy másik streamelési lekérdezésben folyamatosan olvashat deduplikált adatokat ebből a Delta-táblából. Ez azért lehetséges, mert egy csak beszúrásos egyesítés csak új adatokat fűz a Delta táblához.
Adatok lassú módosítása (SCD) és adatrögzítés (CDC) módosítása a Delta Lake használatával
A Delta Live Tables natív támogatást nyújt az SCD 1. és 2. típusának nyomon követéséhez és alkalmazásához. A APPLY CHANGES INTO
Delta Live Tables használatával biztosíthatja, hogy a cdc-hírcsatornák feldolgozásakor a rendszer megfelelően kezelje a nem megfelelő rekordokat. Lásd : AZ APPLY CHANGES API-k: A változásadatok rögzítésének egyszerűsítése Delta Live-táblákkal.
Delta-tábla növekményes szinkronizálása a forrással
A Databricks SQL és a Databricks Runtime 12.2 LTS és újabb verzióiban tetszőleges feltételeket hozhat WHEN NOT MATCHED BY SOURCE
létre a tábla egy részének atomi törléséhez és cseréjéhez. Ez különösen akkor lehet hasznos, ha olyan forrástáblával rendelkezik, amelyben a rekordok a kezdeti adatbevitel után néhány napig változhatnak vagy törölhetők, de végül végleges állapotba kerülnek.
Az alábbi lekérdezés azt mutatja be, hogy ezzel a mintával 5 napnyi rekordot választhat ki a forrásból, frissítheti a cél egyező rekordjait, új rekordokat szúrhat be a forrásból a célhelyre, és törölheti az összes nem egyező rekordot a célban az elmúlt 5 napból.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Ha ugyanazt a logikai szűrőt adja meg a forrás- és céltáblákon, dinamikusan propagálja a módosításokat a forrásból a céltáblákba, beleértve a törléseket is.
Feljegyzés
Bár ez a minta feltételes záradékok nélkül is használható, ez a céltábla teljes újraírásához vezet, ami költséges lehet.