Megosztás a következőn keresztül:


Upsert to a Delta Lake table using merge

Az SQL-művelettel MERGE adatokat állíthat be egy forrástáblából, nézetből vagy DataFrame-ből egy cél Delta-táblába. A Delta Lake támogatja a beszúrásokat, frissítéseket és törléseket MERGE, és támogatja az SQL-szabványokon túli kiterjesztett szintaxist a speciális használati esetek megkönnyítése érdekében.

Tegyük fel, hogy rendelkezik egy forrástáblával vagy people10mupdates egy forrásútvonallal /tmp/delta/people-10m-updates , amely új adatokat tartalmaz egy elnevezett people10m céltáblához vagy egy célútvonalhoz a következő helyen /tmp/delta/people-10m: . Előfordulhat, hogy az új rekordok némelyike már szerepel a céladatokban. Az új adatok egyesítéséhez frissítenie kell azokat a sorokat, ahol a személy id már jelen van, és beszúrja azokat az új sorokat, ahol nincs egyezés id . A következő lekérdezést futtathatja:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Fontos

A céltábla egy adott sorához csak a forrástábla egyetlen sora felelhet meg. A Databricks Runtime 16.0-s és újabb MERGE verziókban kiértékeli a benne megadott feltételeket és ON záradékokat az WHEN MATCHED ismétlődő egyezések meghatározásához. A Databricks Runtime 15.4 LTS és újabb MERGE verziókban a műveletek csak a ON záradékban megadott feltételeket veszik figyelembe.

A Scala és a Python szintaxisának részleteiért tekintse meg a Delta Lake API dokumentációját . Az SQL szintaxisának részleteiért lásd: MERGE INTO

Az összes nem egyező sor módosítása egyesítéssel

A Databricks SQL-ben és a Databricks Runtime 12.2 LTS-ben és újabb verziókban használhatja a WHEN NOT MATCHED BY SOURCE záradékot a céltábla azon rekordjaihoz vagy DELETE rekordjaihozUPDATE, amelyek nem rendelkeznek megfelelő rekordokkal a forrástáblában. A Databricks egy opcionális feltételes záradék hozzáadását javasolja a céltábla teljes újraírásának elkerülése érdekében.

Az alábbi példakód azt mutatja be, hogy milyen alapszintaxissal használható ez a törléshez, felülírja a céltáblát a forrástábla tartalmával, és törli a céltáblában a nem egyező rekordokat. A forrásfrissítéseket és törléseket időkorláttal rendelkező táblák méretezhetőbb mintáját lásd : Delta-tábla növekményes szinkronizálása a forrással.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Az alábbi példa feltételeket ad a WHEN NOT MATCHED BY SOURCE záradékhoz, és a nem egyező célsorokban frissíteni kívánt értékeket adja meg.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Egyesítési művelet szemantikája

A programozott művelet szemantikájának részletes leírása az merge alábbiakban található.

  • A záradékok száma és whenNotMatched száma whenMatched tetszőleges lehet.

  • whenMatched záradékok akkor lesznek végrehajtva, ha egy forrássor megfelel egy céltáblasornak az egyeztetési feltétel alapján. Ezek a záradékok a következő szemantikával rendelkeznek.

    • whenMatched a záradékok legfeljebb egy update és egy delete műveletet tartalmazhatnak. A update művelet merge csak a megfeleltetett célsor megadott oszlopait frissíti (a update művelethez hasonlóan). A delete művelet törli a egyeztetett sort.

    • Minden whenMatched záradék rendelkezhet opcionális feltétellel. Ha ez a záradékfeltétel létezik, a program csak akkor hajtja végre a update delete műveletet az egyező forrás-cél sorpár esetében, ha a záradékfeltétel igaz.

    • Ha több whenMatched záradék is létezik, a rendszer a megadott sorrendben értékeli ki őket. Az utolsó kivételével minden whenMatched záradéknak feltételekkel kell rendelkeznie.

    • Ha az whenMatched egyesítési feltételnek megfelelő forrás- és célsorpár egyik feltétele sem lesz igaz, a célsor változatlan marad.

    • Ha frissíteni szeretné a cél Delta-tábla összes oszlopát a forrásadatkészlet megfelelő oszlopaival, használja a következőt whenMatched(...).updateAll(): . Ez egyenértékű a következő értékeket:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      a cél Delta-tábla összes oszlopához. Ezért ez a művelet feltételezi, hogy a forrástábla oszlopai megegyeznek a céltáblában szereplő oszlopokkal, ellenkező esetben a lekérdezés elemzési hibát jelez.

      Feljegyzés

      Ez a viselkedés megváltozik, ha engedélyezve van az automatikus sémafejlődés. Részletekért tekintse meg az automatikus sémafejlődést .

  • whenNotMatched záradékok akkor lesznek végrehajtva, ha egy forrássor nem egyezik meg egyetlen célsorsal sem az egyeztetési feltétel alapján. Ezek a záradékok a következő szemantikával rendelkeznek.

    • whenNotMatched záradékok csak a insert műveletet tartalmazhatják. Az új sor a megadott oszlop és a hozzájuk tartozó kifejezések alapján jön létre. Nem kell megadnia a céltábla összes oszlopát. A meg nem határozott céloszlopok NULL be lesznek szúrva.

    • Minden whenNotMatched záradék rendelkezhet opcionális feltétellel. Ha a záradékfeltétel jelen van, a forrássor csak akkor lesz beszúrva, ha ez a feltétel igaz az adott sorra. Ellenkező esetben a forrásoszlop figyelmen kívül lesz hagyva.

    • Ha több whenNotMatched záradék is létezik, a rendszer a megadott sorrendben értékeli ki őket. Az utolsó kivételével minden whenNotMatched záradéknak feltételekkel kell rendelkeznie.

    • Ha a cél Delta-tábla összes oszlopát a forrásadatkészlet megfelelő oszlopaival szeretné beszúrni, használja whenNotMatched(...).insertAll()a következőt: . Ez egyenértékű a következő értékeket:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      a cél Delta-tábla összes oszlopához. Ezért ez a művelet feltételezi, hogy a forrástábla oszlopai megegyeznek a céltáblában szereplő oszlopokkal, ellenkező esetben a lekérdezés elemzési hibát jelez.

      Feljegyzés

      Ez a viselkedés megváltozik, ha engedélyezve van az automatikus sémafejlődés. Részletekért tekintse meg az automatikus sémafejlődést .

  • whenNotMatchedBySource záradékok akkor lesznek végrehajtva, ha a célsor nem egyezik meg egyetlen forrássorsal sem az egyesítési feltétel alapján. Ezek a záradékok a következő szemantikával rendelkeznek.

    • whenNotMatchedBySourcezáradékok megadhatók és update műveleteket hajthatnak végredelete.
    • Minden whenNotMatchedBySource záradék rendelkezhet opcionális feltétellel. Ha a záradékfeltétel jelen van, a célsor csak akkor módosul, ha ez a feltétel igaz az adott sorra. Ellenkező esetben a célsor változatlan marad.
    • Ha több whenNotMatchedBySource záradék is létezik, a rendszer a megadott sorrendben értékeli ki őket. Az utolsó kivételével minden whenNotMatchedBySource záradéknak feltételekkel kell rendelkeznie.
    • A záradékok definíció szerint whenNotMatchedBySource nem rendelkeznek forrássorsal az oszlopértékek lekéréséhez, ezért a forrásoszlopokra nem lehet hivatkozni. Az egyes oszlopok módosításához megadhatja a konstanst, vagy végrehajthat egy műveletet a céloszlopon, például SET target.deleted_count = target.deleted_count + 1.

Fontos

  • A merge művelet meghiúsulhat, ha a forrásadatkészlet több sora egyezik, és az egyesítés megkísérli frissíteni a cél deltatábla ugyanazon sorait. Az egyesítés SQL-szemantikája szerint az ilyen frissítési művelet nem egyértelmű, mivel nem világos, hogy melyik forrássort kell használni a megfeleltetett célsor frissítéséhez. A forrástábla előfeldolgozásával kiküszöbölheti a több egyezés lehetőségét.
  • SQL-műveletet MERGE csak akkor alkalmazhat sql view-ra, ha a nézet definiálva CREATE VIEW viewName AS SELECT * FROM deltaTablevan.

Adatdeduplikáció Delta-táblákba való íráskor

Az ETL gyakori használati esete, hogy naplókat gyűjt a Delta táblába úgy, hogy hozzáfűzi őket egy táblához. A források azonban gyakran duplikált naplórekordokat hozhatnak létre, és az alsóbb rétegbeli deduplikációs lépésekre van szükség a kezelésükhöz. Ezzel mergeelkerülheti az ismétlődő rekordok beszúrását.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Feljegyzés

Az új naplókat tartalmazó adatkészletet önmagában kell deduplikálni. Az egyesítés SQL-szemantikai adatai megegyeznek és deduplikálják az új adatokat a táblában lévő meglévő adatokkal, de ha az új adatkészletben duplikált adatok találhatók, akkor be lesz szúrva. Ezért deduplikálja az új adatokat a táblába való egyesítés előtt.

Ha tudja, hogy csak néhány napig kaphat duplikált rekordokat, a lekérdezést tovább optimalizálhatja a tábla dátum szerinti particionálásával, majd a céltábla dátumtartományának megadásával.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Ez hatékonyabb, mint az előző parancs, mivel csak a naplók utolsó 7 napjában keresi az ismétlődéseket, nem pedig a teljes táblát. Emellett ezt a csak beszúrásos egyesítést a strukturált streameléssel is használhatja a naplók folyamatos deduplikációjának végrehajtásához.

  • Streamelési lekérdezésekben foreachBatch az egyesítési művelettel folyamatosan írhat streamelési adatokat egy Delta-táblába deduplikációval. További információt a következő streamelési példában foreachBatchtalál.
  • Egy másik streamelési lekérdezésben folyamatosan olvashat deduplikált adatokat ebből a Delta-táblából. Ez azért lehetséges, mert egy csak beszúrásos egyesítés csak új adatokat fűz a Delta táblához.

Adatok lassú módosítása (SCD) és adatrögzítés (CDC) módosítása a Delta Lake használatával

A Delta Live Tables natív támogatást nyújt az SCD 1. és 2. típusának nyomon követéséhez és alkalmazásához. A APPLY CHANGES INTO Delta Live Tables használatával biztosíthatja, hogy a cdc-hírcsatornák feldolgozásakor a rendszer megfelelően kezelje a nem megfelelő rekordokat. Lásd : AZ APPLY CHANGES API-k: A változásadatok rögzítésének egyszerűsítése Delta Live-táblákkal.

Delta-tábla növekményes szinkronizálása a forrással

A Databricks SQL és a Databricks Runtime 12.2 LTS és újabb verzióiban tetszőleges feltételeket hozhat WHEN NOT MATCHED BY SOURCE létre a tábla egy részének atomi törléséhez és cseréjéhez. Ez különösen akkor lehet hasznos, ha olyan forrástáblával rendelkezik, amelyben a rekordok a kezdeti adatbevitel után néhány napig változhatnak vagy törölhetők, de végül végleges állapotba kerülnek.

Az alábbi lekérdezés azt mutatja be, hogy ezzel a mintával 5 napnyi rekordot választhat ki a forrásból, frissítheti a cél egyező rekordjait, új rekordokat szúrhat be a forrásból a célhelyre, és törölheti az összes nem egyező rekordot a célban az elmúlt 5 napból.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Ha ugyanazt a logikai szűrőt adja meg a forrás- és céltáblákon, dinamikusan propagálja a módosításokat a forrásból a céltáblákba, beleértve a törléseket is.

Feljegyzés

Bár ez a minta feltételes záradékok nélkül is használható, ez a céltábla teljes újraírásához vezet, ami költséges lehet.