Přenesení do tabulky Delta Lake pomocí sloučení
Pomocí operace SQL můžete přenést data ze zdrojové tabulky, zobrazení nebo datového rámce do cílové tabulky MERGE
Delta. Delta Lake podporuje vkládání, aktualizace a odstraňování MERGE
a podporuje rozšířenou syntaxi nad rámec standardů SQL pro usnadnění pokročilých případů použití.
Předpokládejme, že máte zdrojovou tabulku s názvem people10mupdates
nebo zdrojovou cestou /tmp/delta/people-10m-updates
, která obsahuje nová data pro cílovou tabulku s názvem people10m
nebo cílovou cestou na /tmp/delta/people-10m
adrese . Některé z těchto nových záznamů už můžou být v cílových datech. Chcete-li sloučit nová data, chcete aktualizovat řádky, ve kterých je osoba id
již přítomna, a vložit nové řádky, ve kterých není žádná shoda id
přítomna. Můžete spustit následující dotaz:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Důležité
Pouze jeden řádek ze zdrojové tabulky se může shodovat s daným řádkem v cílové tabulce. V Databricks Runtime 16.0 a novějších MERGE
vyhodnocuje podmínky zadané v WHEN MATCHED
klauzulích a ON
určí duplicitní shody. V Databricks Runtime 15.4 LTS a níže MERGE
operace berou v úvahu pouze podmínky uvedené v ON
klauzuli.
Podrobnosti o syntaxi jazyka Scala a Python najdete v dokumentaci k rozhraní Delta Lake API. Podrobnosti o syntaxi SQL najdete v tématu MERGE INTO
Úprava všech neseřazených řádků pomocí sloučení
V Databricks SQL a Databricks Runtime 12.2 LTS a vyšší můžete klauzuli použít WHEN NOT MATCHED BY SOURCE
k UPDATE
nebo DELETE
záznamům v cílové tabulce, které nemají odpovídající záznamy ve zdrojové tabulce. Databricks doporučuje přidat volitelnou podmíněnou klauzuli, aby se zabránilo úplnému přepisování cílové tabulky.
Následující příklad kódu ukazuje základní syntaxi použití pro odstranění, přepsání cílové tabulky obsahem zdrojové tabulky a odstraněním chybějících záznamů v cílové tabulce. Škálovatelný vzor pro tabulky, ve kterých jsou aktualizace a odstranění zdroje vázané na čas, najdete v tématu Přírůstková synchronizace tabulky Delta se zdrojem.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Následující příklad přidá podmínky do WHEN NOT MATCHED BY SOURCE
klauzule a určuje hodnoty, které se mají aktualizovat v nezařazených cílových řádcích.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Sémantika operace sloučení
Následuje podrobný popis merge
sémantiky programových operací.
Může existovat libovolný počet
whenMatched
klauzulí awhenNotMatched
klauzulí.whenMatched
Klauzule se provádějí, když zdrojový řádek odpovídá cílovému řádku tabulky na základě podmínky shody. Tyto klauzule mají následující sémantiku.whenMatched
klauzule mohou mít maximálně jednuupdate
a jednudelete
akci. Akceupdate
pouzemerge
aktualizuje zadané sloupce (podobně jakoupdate
operace) odpovídajícího cílového řádku. Akcedelete
odstraní odpovídající řádek.Každá
whenMatched
klauzule může mít volitelnou podmínku. Pokud tato podmínka klauzule existuje, provede se akcedelete
pro všechny odpovídající dvojice řádků zdrojového cíle pouze v případě,update
že podmínka klauzule je pravdivá.Pokud existuje více
whenMatched
klauzulí, vyhodnocují se v uvedeném pořadí. VšechnywhenMatched
klauzule, s výjimkou poslední, musí mít podmínky.Pokud se žádná z
whenMatched
podmínek nevyhodnotí jako true pro dvojici zdrojového a cílového řádku, která odpovídá podmínce sloučení, zůstane cílový řádek beze změny.Chcete-li aktualizovat všechny sloupce cílové tabulky Delta odpovídajícími sloupci zdrojové datové sady, použijte
whenMatched(...).updateAll()
. To odpovídá:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
pro všechny sloupce cílové tabulky Delta. Proto tato akce předpokládá, že zdrojová tabulka má stejné sloupce jako v cílové tabulce, jinak dotaz vyvolá chybu analýzy.
Poznámka:
Toto chování se změní při povolení automatického vývoje schématu. Podrobnosti najdete v automatickém vývoji schématu.
whenNotMatched
Klauzule se spouští, když zdrojový řádek neodpovídá žádnému cílovému řádku na základě podmínky shody. Tyto klauzule mají následující sémantiku.whenNotMatched
klauzule mohou mít pouzeinsert
akci. Nový řádek se vygeneruje na základě zadaného sloupce a odpovídajících výrazů. Nemusíte zadávat všechny sloupce v cílové tabulce. Pro nezadané cílové sloupceNULL
se vloží.Každá
whenNotMatched
klauzule může mít volitelnou podmínku. Pokud podmínka klauzule existuje, vloží se zdrojový řádek pouze v případě, že je podmínka pro tento řádek pravdivá. Jinak se zdrojový sloupec ignoruje.Pokud existuje více
whenNotMatched
klauzulí, vyhodnocují se v uvedeném pořadí. VšechnywhenNotMatched
klauzule, s výjimkou poslední, musí mít podmínky.Chcete-li vložit všechny sloupce cílové tabulky Delta s odpovídajícími sloupci zdrojové datové sady, použijte
whenNotMatched(...).insertAll()
. To odpovídá:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
pro všechny sloupce cílové tabulky Delta. Proto tato akce předpokládá, že zdrojová tabulka má stejné sloupce jako v cílové tabulce, jinak dotaz vyvolá chybu analýzy.
Poznámka:
Toto chování se změní při povolení automatického vývoje schématu. Podrobnosti najdete v automatickém vývoji schématu.
whenNotMatchedBySource
Klauzule se spustí, když cílový řádek neodpovídá žádnému zdrojovému řádku na základě podmínky sloučení. Tyto klauzule mají následující sémantiku.whenNotMatchedBySource
klauzule mohou určovatdelete
aupdate
akce.- Každá
whenNotMatchedBySource
klauzule může mít volitelnou podmínku. Pokud je podmínka klauzule přítomná, cílový řádek se upraví jenom v případě, že je tato podmínka pro tento řádek pravdivá. V opačném případě zůstane cílový řádek beze změny. - Pokud existuje více
whenNotMatchedBySource
klauzulí, vyhodnocují se v uvedeném pořadí. VšechnywhenNotMatchedBySource
klauzule, s výjimkou poslední, musí mít podmínky. - Podle definice
whenNotMatchedBySource
klauzule nemají zdrojový řádek pro vyžádání hodnot sloupců, a proto na zdrojové sloupce nelze odkazovat. Pro každý sloupec, který chcete upravit, můžete buď zadat literál, nebo provést akci s cílovým sloupcem, napříkladSET target.deleted_count = target.deleted_count + 1
.
Důležité
merge
Operace může selhat, pokud se shoduje více řádků zdrojové datové sady a pokus o sloučení se pokusí aktualizovat stejné řádky cílové tabulky Delta. Podle sémantiky sloučení SQL je taková operace aktualizace nejednoznačná, protože není jasné, který zdrojový řádek by se měl použít k aktualizaci odpovídajícího cílového řádku. Zdrojová tabulka můžete předem zpracovat, abyste vyloučili možnost více shod.- Operaci SQL
MERGE
můžete použít v ZOBRAZENÍ SQL pouze v případě, že zobrazení bylo definováno jakoCREATE VIEW viewName AS SELECT * FROM deltaTable
.
Odstranění duplicitních dat při zápisu do tabulek Delta
Běžným případem použití ETL je shromáždit protokoly do tabulky Delta tak, že je připojíte k tabulce. Často ale zdroje můžou generovat duplicitní záznamy protokolu a kroky odstranění duplicitních dat jsou potřeba k jejich péči. Díky merge
tomu se můžete vyhnout vkládání duplicitních záznamů.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Poznámka:
Datová sada obsahující nové protokoly musí být odstraněna duplicitními daty uvnitř sebe. Sémantika sloučení SQL odpovídá novým datům v tabulce a odstranění duplicit s existujícími daty v tabulce, ale pokud v nové datové sadě existují duplicitní data, vloží se. Proto před sloučením do tabulky deduplikujte nová data.
Pokud víte, že můžete získat duplicitní záznamy jenom na několik dní, můžete dotaz dále optimalizovat rozdělením tabulky podle data a zadáním rozsahu kalendářních dat cílové tabulky, podle kterého se má shodovat.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
To je efektivnější než předchozí příkaz, protože hledá duplicity pouze za posledních 7 dnů protokolů, ne celou tabulku. Kromě toho můžete použít toto sloučení jen pro vložení se strukturovaným streamováním k průběžnému odstraňování duplicitních dat protokolů.
- V dotazu streamování můžete pomocí operace
foreachBatch
sloučení průběžně zapisovat všechna streamovaná data do tabulky Delta s odstraněním duplicitních dat. Další informace oforeachBatch
. - V jiném streamovacím dotazu můžete průběžně číst odstraněná data z této tabulky Delta. Je to možné, protože sloučení pouze vložení připojí nová data do tabulky Delta.
Pomalu se měnící data (SCD) a zachytávání dat (CDC) pomocí Delta Lake
Delta Live Tables má nativní podporu pro sledování a použití SCD Type 1 a Type 2. Pomocí APPLY CHANGES INTO
rozdílových živých tabulek se ujistěte, že se záznamy mimo pořadí zpracovávají správně při zpracování informačních kanálů CDC. Viz rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí rozdílových živých tabulek.
Přírůstková synchronizace tabulky Delta se zdrojem
V Databricks SQL a Databricks Runtime 12.2 LTS a novějších verzích můžete vytvořit WHEN NOT MATCHED BY SOURCE
libovolné podmínky pro atomické odstranění a nahrazení části tabulky. To může být užitečné zejména v případě, že máte zdrojovou tabulku, kde se záznamy můžou po dobu několika dnů po počátečním zadání dat změnit nebo odstranit, ale nakonec se usadit do konečného stavu.
Následující dotaz ukazuje použití tohoto vzoru k výběru 5 dnů záznamů ze zdroje, aktualizaci odpovídajících záznamů v cíli, vložení nových záznamů ze zdroje do cíle a odstranění všech chybějících záznamů z posledních 5 dnů v cíli.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Poskytnutím stejného logického filtru zdrojových a cílových tabulek můžete dynamicky šířit změny ze zdroje do cílových tabulek, včetně odstranění.
Poznámka:
I když se tento model dá použít bez podmíněných klauzulí, může to vést k úplnému přepsání cílové tabulky, která může být nákladná.