Upsert w tabeli usługi Delta Lake przy użyciu scalania

Dane można upsert z tabeli źródłowej, widoku lub ramki danych do docelowej tabeli delty przy użyciu MERGE operacji SQL. Usługa Delta Lake obsługuje wstawiania, aktualizacji i usuwania w MERGEsystemie oraz obsługuje rozszerzoną składnię wykraczaną poza standardy SQL w celu ułatwienia zaawansowanych przypadków użycia.

Załóżmy, że masz tabelę źródłową o nazwie people10mupdates lub ścieżkę źródłową zawierającą /tmp/delta/people-10m-updates nowe dane dla tabeli docelowej o nazwie people10m lub ścieżkę docelową pod /tmp/delta/people-10madresem . Niektóre z tych nowych rekordów mogą już znajdować się w danych docelowych. Aby scalić nowe dane, chcesz zaktualizować wiersze, w których dana osoba id jest już obecna, i wstawić nowe wiersze, w których nie ma pasujących id elementów. Możesz uruchomić następujące zapytanie:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Zapoznaj się z dokumentacją interfejsu API usługi Delta Lake, aby uzyskać szczegółowe informacje o składni języka Scala i Python. Aby uzyskać szczegółowe informacje o składni SQL, zobacz MERGE INTO

Modyfikowanie wszystkich niedopasowanych wierszy przy użyciu scalania

W usługach Databricks SQL i Databricks Runtime 12.2 LTS i nowszych można użyć WHEN NOT MATCHED BY SOURCE klauzuli do UPDATE lub DELETE rekordów w tabeli docelowej, które nie mają odpowiednich rekordów w tabeli źródłowej. Usługa Databricks zaleca dodanie opcjonalnej klauzuli warunkowej, aby uniknąć pełnego ponownego zapisywania tabeli docelowej.

Poniższy przykład kodu przedstawia podstawową składnię używania tej metody do usuwania, zastępowanie tabeli docelowej zawartością tabeli źródłowej i usuwanie niezgodnych rekordów w tabeli docelowej. Aby uzyskać bardziej skalowalny wzorzec dla tabel, w których aktualizacje źródłowe i usunięcia są powiązane czasowo, zobacz Incrementally sync Delta table with source (Przyrostowa synchronizacja tabeli różnicowej ze źródłem).

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Poniższy przykład dodaje warunki do klauzuli WHEN NOT MATCHED BY SOURCE i określa wartości, które mają być aktualizowane w niedopasowanych wierszach docelowych.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Semantyka operacji scalania

Poniżej przedstawiono szczegółowy opis merge semantyki operacji programowych.

  • Może istnieć dowolna whenMatched liczba klauzul i whenNotMatched .

  • whenMatched Klauzule są wykonywane, gdy wiersz źródłowy pasuje do wiersza tabeli docelowej na podstawie warunku dopasowania. Te klauzule mają następującą semantyka.

    • whenMatched Klauzule mogą zawierać co najwyżej jedną update akcję i jedną delete . Akcja update w programie merge aktualizuje tylko określone kolumny (podobne do updateoperacji) dopasowanego wiersza docelowego. Akcja delete usuwa dopasowany wiersz.

    • Każda klauzula whenMatched może mieć opcjonalny warunek. Jeśli ten warunek klauzuli istnieje, akcja update lub delete jest wykonywana dla dowolnej pasującej pary wierszy źródłowych docelowych tylko wtedy, gdy warunek klauzuli jest spełniony.

    • Jeśli istnieje wiele whenMatched klauzul, są one oceniane w kolejności, w której są określone. Wszystkie whenMatched klauzule, z wyjątkiem ostatniego, muszą mieć warunki.

    • Jeśli żadna z whenMatched warunków nie zwróci wartości true dla pary wierszy źródłowych i docelowych pasujących do warunku scalania, wiersz docelowy pozostanie niezmieniony.

    • Aby zaktualizować wszystkie kolumny docelowej tabeli delty z odpowiednimi kolumnami źródłowego zestawu danych, użyj polecenia whenMatched(...).updateAll(). Jest to odpowiednik:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      dla wszystkich kolumn docelowej tabeli delty. W związku z tym ta akcja zakłada, że tabela źródłowa ma te same kolumny co w tabeli docelowej. W przeciwnym razie zapytanie zgłasza błąd analizy.

      Uwaga

      To zachowanie zmienia się po włączeniu automatycznej migracji schematu. Aby uzyskać szczegółowe informacje, zobacz automatyczną ewolucję schematu.

  • whenNotMatched Klauzule są wykonywane, gdy wiersz źródłowy nie pasuje do żadnego wiersza docelowego na podstawie warunku dopasowania. Te klauzule mają następującą semantyka.

    • whenNotMatched Klauzule mogą mieć tylko insert akcję. Nowy wiersz jest generowany na podstawie określonej kolumny i odpowiednich wyrażeń. Nie trzeba określać wszystkich kolumn w tabeli docelowej. W przypadku nieokreślonych kolumn NULL docelowych jest wstawiony.

    • Każda klauzula whenNotMatched może mieć opcjonalny warunek. Jeśli warunek klauzuli jest obecny, wiersz źródłowy jest wstawiany tylko wtedy, gdy ten warunek jest spełniony dla tego wiersza. W przeciwnym razie kolumna źródłowa jest ignorowana.

    • Jeśli istnieje wiele whenNotMatched klauzul, są one oceniane w kolejności, w której są określone. Wszystkie whenNotMatched klauzule, z wyjątkiem ostatniego, muszą mieć warunki.

    • Aby wstawić wszystkie kolumny docelowej tabeli delty z odpowiednimi kolumnami źródłowego zestawu danych, użyj polecenia whenNotMatched(...).insertAll(). Jest to odpowiednik:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      dla wszystkich kolumn docelowej tabeli delty. W związku z tym ta akcja zakłada, że tabela źródłowa ma te same kolumny co w tabeli docelowej. W przeciwnym razie zapytanie zgłasza błąd analizy.

      Uwaga

      To zachowanie zmienia się po włączeniu automatycznej migracji schematu. Aby uzyskać szczegółowe informacje, zobacz automatyczną ewolucję schematu.

  • whenNotMatchedBySource Klauzule są wykonywane, gdy wiersz docelowy nie pasuje do żadnego wiersza źródłowego na podstawie warunku scalania. Te klauzule mają następującą semantyka.

    • whenNotMatchedBySource klauzule mogą określać delete i update akcje.
    • Każda klauzula whenNotMatchedBySource może mieć opcjonalny warunek. Jeśli warunek klauzuli jest obecny, wiersz docelowy jest modyfikowany tylko wtedy, gdy ten warunek jest spełniony dla tego wiersza. W przeciwnym razie wiersz docelowy pozostanie niezmieniony.
    • Jeśli istnieje wiele whenNotMatchedBySource klauzul, są one oceniane w kolejności, w której są określone. Wszystkie whenNotMatchedBySource klauzule, z wyjątkiem ostatniego, muszą mieć warunki.
    • Z definicji whenNotMatchedBySource klauzule nie mają wiersza źródłowego do ściągania wartości kolumn z, więc nie można odwoływać się do kolumn źródłowych. Dla każdej kolumny do zmodyfikowania można określić literał lub wykonać akcję w kolumnie docelowej, na przykład SET target.deleted_count = target.deleted_count + 1.

Ważne

  • Operacja może zakończyć się niepowodzeniem merge , jeśli pasuje wiele wierszy źródłowego zestawu danych i scalanie próbuje zaktualizować te same wiersze docelowej tabeli delty. Według semantyki SQL scalania taka operacja aktualizacji jest niejednoznaczna, ponieważ nie jest jasne, który wiersz źródłowy powinien być używany do aktualizowania dopasowanego wiersza docelowego. Możesz wstępnie przetworzyć tabelę źródłową, aby wyeliminować możliwość wielu dopasowań.
  • Operację SQL można zastosować w widoku SQL MERGE tylko wtedy, gdy widok został zdefiniowany jako CREATE VIEW viewName AS SELECT * FROM deltaTable.

Deduplikacja danych podczas zapisywania w tabelach delty

Typowy przypadek użycia ETL polega na zbieraniu dzienników do tabeli delty przez dołączenie ich do tabeli. Jednak często źródła mogą generować zduplikowane rekordy dziennika, a kroki deduplikacji podrzędnej są potrzebne do ich obsługi. W programie mergemożna uniknąć wstawiania zduplikowanych rekordów.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Uwaga

Zestaw danych zawierający nowe dzienniki musi być deduplikowany w obrębie samego siebie. Semantyka sql scalania dopasowuje i deduplikuje nowe dane z istniejącymi danymi w tabeli, ale jeśli w nowym zestawie danych są zduplikowane dane, zostanie wstawiona. W związku z tym deduplikuj nowe dane przed scaleniem z tabelą.

Jeśli wiesz, że możesz uzyskać zduplikowane rekordy tylko przez kilka dni, możesz zoptymalizować zapytanie dalej, partycjonując tabelę według daty, a następnie określając zakres dat tabeli docelowej, który ma być zgodny.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Jest to bardziej wydajne niż poprzednie polecenie, ponieważ wyszukuje duplikaty tylko w ciągu ostatnich 7 dni dzienników, a nie w całej tabeli. Ponadto można użyć tego scalania tylko wstawiania ze strukturą przesyłania strumieniowego do wykonywania ciągłej deduplikacji dzienników.

  • W zapytaniu przesyłanym strumieniowo można użyć operacji scalania w programie , foreachBatch aby stale zapisywać dowolne dane przesyłane strumieniowo do tabeli delty z deduplikacją. Aby uzyskać więcej informacji na temat usługi foreachBatch, zobacz poniższy przykład przesyłania strumieniowego.
  • W innym zapytaniu przesyłanym strumieniowo można stale odczytywać deduplikowane dane z tej tabeli delty. Jest to możliwe, ponieważ scalanie tylko wstawiania dołącza nowe dane do tabeli delty.

Powolne zmienianie danych (SCD) i przechwytywanie danych (CDC) za pomocą usługi Delta Lake

Funkcja Delta Live Tables ma natywną obsługę śledzenia i stosowania typu SCD 1 i typu 2. Użyj funkcji APPLY CHANGES INTO Delta Live Tables, aby upewnić się, że rekordy poza kolejnością są prawidłowo obsługiwane podczas przetwarzania kanałów informacyjnych CDC. Zobacz Stosowanie zmian interfejsu API: upraszczanie przechwytywania danych zmian w tabelach delta live.

Przyrostowa synchronizacja tabeli delty ze źródłem

W usługach Databricks SQL i Databricks Runtime 12.2 LTS i nowszych można utworzyć WHEN NOT MATCHED BY SOURCE dowolne warunki w celu niepodzielnego usunięcia i zastąpienia części tabeli. Może to być szczególnie przydatne, gdy masz tabelę źródłową, w której rekordy mogą ulec zmianie lub zostaną usunięte przez kilka dni po początkowym wpisie danych, ale ostatecznie osiedlą się w stanie końcowym.

Poniższe zapytanie pokazuje użycie tego wzorca do wybrania 5 dni rekordów ze źródła, zaktualizowanie pasujących rekordów w obiekcie docelowym, wstawienie nowych rekordów ze źródła do miejsca docelowego i usunięcie wszystkich niezgodnych rekordów z ostatnich 5 dni w obiekcie docelowym.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Udostępniając ten sam filtr logiczny w tabelach źródłowych i docelowych, można dynamicznie propagować zmiany ze źródła do tabel docelowych, w tym usuwania.

Uwaga

Chociaż ten wzorzec może być używany bez żadnych klauzul warunkowych, może to prowadzić do pełnego ponownego zapisania tabeli docelowej, która może być kosztowna.