Upsert w tabeli usługi Delta Lake przy użyciu scalania
Dane można upsert z tabeli źródłowej, widoku lub ramki danych do docelowej tabeli delty przy użyciu MERGE
operacji SQL. Usługa Delta Lake obsługuje wstawiania, aktualizacji i usuwania w MERGE
systemie oraz obsługuje rozszerzoną składnię wykraczaną poza standardy SQL w celu ułatwienia zaawansowanych przypadków użycia.
Załóżmy, że masz tabelę źródłową o nazwie people10mupdates
lub ścieżkę źródłową zawierającą /tmp/delta/people-10m-updates
nowe dane dla tabeli docelowej o nazwie people10m
lub ścieżkę docelową pod /tmp/delta/people-10m
adresem . Niektóre z tych nowych rekordów mogą już znajdować się w danych docelowych. Aby scalić nowe dane, chcesz zaktualizować wiersze, w których dana osoba id
jest już obecna, i wstawić nowe wiersze, w których nie ma pasujących id
elementów. Możesz uruchomić następujące zapytanie:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Ważne
Tylko jeden wiersz z tabeli źródłowej może być zgodny z danym wierszem w tabeli docelowej. W środowisku Databricks Runtime 16.0 lub nowszym ocenia warunki określone w WHEN MATCHED
klauzulach i ON
w MERGE
celu określenia zduplikowanych dopasowań. W środowisku Databricks Runtime 15.4 LTS i poniżej MERGE
operacje uwzględniają tylko warunki określone w klauzuli ON
.
Zapoznaj się z dokumentacją interfejsu API usługi Delta Lake, aby uzyskać szczegółowe informacje o składni języka Scala i Python. Aby uzyskać szczegółowe informacje o składni SQL, zobacz MERGE INTO
Modyfikowanie wszystkich niedopasowanych wierszy przy użyciu scalania
W usługach Databricks SQL i Databricks Runtime 12.2 LTS i nowszych można użyć WHEN NOT MATCHED BY SOURCE
klauzuli do UPDATE
lub DELETE
rekordów w tabeli docelowej, które nie mają odpowiednich rekordów w tabeli źródłowej. Usługa Databricks zaleca dodanie opcjonalnej klauzuli warunkowej, aby uniknąć pełnego ponownego zapisywania tabeli docelowej.
Poniższy przykład kodu przedstawia podstawową składnię używania tej metody do usuwania, zastępowanie tabeli docelowej zawartością tabeli źródłowej i usuwanie niezgodnych rekordów w tabeli docelowej. Aby uzyskać bardziej skalowalny wzorzec dla tabel, w których aktualizacje źródłowe i usunięcia są powiązane czasowo, zobacz Incrementally sync Delta table with source (Przyrostowa synchronizacja tabeli różnicowej ze źródłem).
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Poniższy przykład dodaje warunki do klauzuli WHEN NOT MATCHED BY SOURCE
i określa wartości, które mają być aktualizowane w niedopasowanych wierszach docelowych.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Semantyka operacji scalania
Poniżej przedstawiono szczegółowy opis merge
semantyki operacji programowych.
Może istnieć dowolna
whenMatched
liczba klauzul iwhenNotMatched
.whenMatched
Klauzule są wykonywane, gdy wiersz źródłowy pasuje do wiersza tabeli docelowej na podstawie warunku dopasowania. Te klauzule mają następującą semantyka.whenMatched
Klauzule mogą zawierać co najwyżej jednąupdate
akcję i jednądelete
. Akcjaupdate
w programiemerge
aktualizuje tylko określone kolumny (podobne doupdate
operacji) dopasowanego wiersza docelowego. Akcjadelete
usuwa dopasowany wiersz.Każda klauzula
whenMatched
może mieć opcjonalny warunek. Jeśli ten warunek klauzuli istnieje, akcjaupdate
lubdelete
jest wykonywana dla dowolnej pasującej pary wierszy źródłowych docelowych tylko wtedy, gdy warunek klauzuli jest spełniony.Jeśli istnieje wiele
whenMatched
klauzul, są one oceniane w kolejności, w której są określone. WszystkiewhenMatched
klauzule, z wyjątkiem ostatniego, muszą mieć warunki.Jeśli żadna z
whenMatched
warunków nie zwróci wartości true dla pary wierszy źródłowych i docelowych pasujących do warunku scalania, wiersz docelowy pozostanie niezmieniony.Aby zaktualizować wszystkie kolumny docelowej tabeli delty z odpowiednimi kolumnami źródłowego zestawu danych, użyj polecenia
whenMatched(...).updateAll()
. Jest to odpowiednik:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
dla wszystkich kolumn docelowej tabeli delty. W związku z tym ta akcja zakłada, że tabela źródłowa ma te same kolumny co w tabeli docelowej. W przeciwnym razie zapytanie zgłasza błąd analizy.
Uwaga
To zachowanie zmienia się, gdy włączono automatyczną ewolucję schematu. Aby uzyskać szczegółowe informacje, zobacz automatyczną ewolucję schematu.
whenNotMatched
Klauzule są wykonywane, gdy wiersz źródłowy nie pasuje do żadnego wiersza docelowego na podstawie warunku dopasowania. Te klauzule mają następującą semantyka.whenNotMatched
Klauzule mogą mieć tylkoinsert
akcję. Nowy wiersz jest generowany na podstawie określonej kolumny i odpowiednich wyrażeń. Nie trzeba określać wszystkich kolumn w tabeli docelowej. W przypadku nieokreślonych kolumnNULL
docelowych jest wstawiony.Każda klauzula
whenNotMatched
może mieć opcjonalny warunek. Jeśli warunek klauzuli jest obecny, wiersz źródłowy jest wstawiany tylko wtedy, gdy ten warunek jest spełniony dla tego wiersza. W przeciwnym razie kolumna źródłowa jest ignorowana.Jeśli istnieje wiele
whenNotMatched
klauzul, są one oceniane w kolejności, w której są określone. WszystkiewhenNotMatched
klauzule, z wyjątkiem ostatniego, muszą mieć warunki.Aby wstawić wszystkie kolumny docelowej tabeli delty z odpowiednimi kolumnami źródłowego zestawu danych, użyj polecenia
whenNotMatched(...).insertAll()
. Jest to odpowiednik:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
dla wszystkich kolumn docelowej tabeli delty. W związku z tym ta akcja zakłada, że tabela źródłowa ma te same kolumny co w tabeli docelowej. W przeciwnym razie zapytanie zgłasza błąd analizy.
Uwaga
To zachowanie zmienia się, gdy włączono automatyczną ewolucję schematu. Aby uzyskać szczegółowe informacje, zobacz automatyczną ewolucję schematu.
whenNotMatchedBySource
Klauzule są wykonywane, gdy wiersz docelowy nie pasuje do żadnego wiersza źródłowego na podstawie warunku scalania. Te klauzule mają następującą semantyka.whenNotMatchedBySource
klauzule mogą określaćdelete
iupdate
akcje.- Każda klauzula
whenNotMatchedBySource
może mieć opcjonalny warunek. Jeśli warunek klauzuli jest obecny, wiersz docelowy jest modyfikowany tylko wtedy, gdy ten warunek jest spełniony dla tego wiersza. W przeciwnym razie wiersz docelowy pozostanie niezmieniony. - Jeśli istnieje wiele
whenNotMatchedBySource
klauzul, są one oceniane w kolejności, w której są określone. WszystkiewhenNotMatchedBySource
klauzule, z wyjątkiem ostatniego, muszą mieć warunki. - Z definicji
whenNotMatchedBySource
klauzule nie mają wiersza źródłowego do ściągania wartości kolumn z, więc nie można odwoływać się do kolumn źródłowych. Dla każdej kolumny do zmodyfikowania można określić literał lub wykonać akcję w kolumnie docelowej, na przykładSET target.deleted_count = target.deleted_count + 1
.
Ważne
- Operacja może zakończyć się niepowodzeniem
merge
, jeśli pasuje wiele wierszy źródłowego zestawu danych i scalanie próbuje zaktualizować te same wiersze docelowej tabeli delty. Według semantyki SQL scalania taka operacja aktualizacji jest niejednoznaczna, ponieważ nie jest jasne, który wiersz źródłowy powinien być używany do aktualizowania dopasowanego wiersza docelowego. Możesz wstępnie przetworzyć tabelę źródłową, aby wyeliminować możliwość wielu dopasowań. - Operację SQL można zastosować w widoku SQL
MERGE
tylko wtedy, gdy widok został zdefiniowany jakoCREATE VIEW viewName AS SELECT * FROM deltaTable
.
Deduplikacja danych podczas zapisywania w tabelach delty
Typowy przypadek użycia ETL polega na zbieraniu dzienników do tabeli delty przez dołączenie ich do tabeli. Jednak często źródła mogą generować zduplikowane rekordy dziennika, a kroki deduplikacji podrzędnej są potrzebne do ich obsługi. W programie merge
można uniknąć wstawiania zduplikowanych rekordów.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Uwaga
Zestaw danych zawierający nowe dzienniki musi być deduplikowany w obrębie samego siebie. Semantyka sql scalania dopasowuje i deduplikuje nowe dane z istniejącymi danymi w tabeli, ale jeśli w nowym zestawie danych są zduplikowane dane, zostanie wstawiona. W związku z tym deduplikuj nowe dane przed scaleniem z tabelą.
Jeśli wiesz, że możesz uzyskać zduplikowane rekordy tylko przez kilka dni, możesz zoptymalizować zapytanie dalej, partycjonując tabelę według daty, a następnie określając zakres dat tabeli docelowej, który ma być zgodny.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Jest to bardziej wydajne niż poprzednie polecenie, ponieważ wyszukuje duplikaty tylko w ciągu ostatnich 7 dni dzienników, a nie w całej tabeli. Ponadto można użyć tego scalania tylko wstawiania ze strukturą przesyłania strumieniowego do wykonywania ciągłej deduplikacji dzienników.
- W zapytaniu przesyłanym strumieniowo można użyć operacji scalania w programie ,
foreachBatch
aby stale zapisywać dowolne dane przesyłane strumieniowo do tabeli delty z deduplikacją. Aby uzyskać więcej informacji na temat usługiforeachBatch
, zobacz poniższy przykład przesyłania strumieniowego. - W innym zapytaniu przesyłanym strumieniowo można stale odczytywać deduplikowane dane z tej tabeli delty. Jest to możliwe, ponieważ scalanie tylko wstawiania dołącza nowe dane do tabeli delty.
Powolne zmienianie danych (SCD) i przechwytywanie danych (CDC) za pomocą usługi Delta Lake
Funkcja Delta Live Tables ma natywną obsługę śledzenia i stosowania typu SCD 1 i typu 2. Użyj funkcji APPLY CHANGES INTO
Delta Live Tables, aby upewnić się, że rekordy poza kolejnością są prawidłowo obsługiwane podczas przetwarzania kanałów informacyjnych CDC. Zobacz Interfejsy API ZASTOSUJ ZMIANY: upraszczanie przechwytywania danych zmian za pomocą tabel różnicowych na żywo.
Przyrostowa synchronizacja tabeli delty ze źródłem
W usługach Databricks SQL i Databricks Runtime 12.2 LTS i nowszych można utworzyć WHEN NOT MATCHED BY SOURCE
dowolne warunki w celu niepodzielnego usunięcia i zastąpienia części tabeli. Może to być szczególnie przydatne, gdy masz tabelę źródłową, w której rekordy mogą ulec zmianie lub zostaną usunięte przez kilka dni po początkowym wpisie danych, ale ostatecznie osiedlą się w stanie końcowym.
Poniższe zapytanie pokazuje użycie tego wzorca do wybrania 5 dni rekordów ze źródła, zaktualizowanie pasujących rekordów w obiekcie docelowym, wstawienie nowych rekordów ze źródła do miejsca docelowego i usunięcie wszystkich niezgodnych rekordów z ostatnich 5 dni w obiekcie docelowym.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Udostępniając ten sam filtr logiczny w tabelach źródłowych i docelowych, można dynamicznie propagować zmiany ze źródła do tabel docelowych, w tym usuwania.
Uwaga
Chociaż ten wzorzec może być używany bez żadnych klauzul warunkowych, może to prowadzić do pełnego ponownego zapisania tabeli docelowej, która może być kosztowna.