Megosztás a következőn keresztül:


Az APPLY CHANGES API-k: A változásadatok rögzítésének egyszerűsítése Delta Live-táblákkal

A Delta Live Tables leegyszerűsíti a módosítási adatrögzítést (CDC) az APPLY CHANGES API-kkal.APPLY CHANGES FROM SNAPSHOT A használt felület a változási adatok forrásától függ:

  • Változásadatcsatorna (CDF) módosításainak feldolgozására használható APPLY CHANGES .
  • A (nyilvános előzetes verzió) használatával APPLY CHANGES FROM SNAPSHOT feldolgozhatja az adatbázis-pillanatképek változásait.

Korábban az MERGE INTO utasítást gyakran használták CDC-rekordok feldolgozására az Azure Databricksben. MERGE INTO A nem sorozatos rekordok miatt azonban helytelen eredményeket hozhat, vagy összetett logikát igényel a rekordok újrarendezéséhez.

Az APPLY CHANGES API támogatott a Delta Live Tables SQL- és Python-felületeiben. Az APPLY CHANGES FROM SNAPSHOT API támogatott a Delta Live Tables Python felületén.

Támogatja APPLY CHANGES a APPLY CHANGES FROM SNAPSHOT táblák frissítését az 1. és a 2. típusú SCD használatával:

  • Az 1. SCD-típussal közvetlenül frissítheti a rekordokat. A rendszer nem őrzi meg a frissített rekordok előzményeit.
  • Az SCD 2-es típusával megőrizheti a rekordok előzményeit, akár az összes frissítésen, akár egy adott oszlopkészlet frissítésén.

Szintaxissal és egyéb hivatkozásokkal kapcsolatban lásd:

Feljegyzés

Ez a cikk azt ismerteti, hogyan frissíthet táblákat a Delta Live Tables-folyamatban a forrásadatok változásai alapján. A Delta-táblák sorszintű változásadatainak rögzítéséről és lekérdezéséről a Delta Lake változásadatcsatorna használata az Azure Databricksben című témakörben olvashat.

Követelmények

A CDC API-k használatához a folyamatot úgy kell konfigurálni, hogy kiszolgáló nélküli DLT-folyamatokat vagy delta élő táblákat Pro vagy Advanced kiadásokat használjon.

Hogyan implementáljuk a CDC-t az APPLY CHANGES API-val?

A folyamaton kívüli rekordok automatikus kezelésével a Delta Live Tables API-ja biztosítja a APPLY CHANGES CDC-rekordok megfelelő feldolgozását, és nem szükséges összetett logikát kidolgozni a sorrenden kívüli rekordok kezeléséhez. Meg kell adnia egy oszlopot a forrásadatokban, amelyeken rekordokat kell sorrendbe állítani, amelyet a Delta Live Tables a forrásadatok megfelelő sorrendjének monoton módon növekvő ábrázolásaként értelmez. A Delta Live Tables automatikusan kezeli a rendelésből érkező adatokat. A 2. típusú SCD-módosítások esetén a Delta Live Tables propagálja a megfelelő szekvenálási értékeket a céltábla __START_AT és __END_AT az oszlopok számára. Kulcsonként egy külön frissítésnek kell lennie az egyes szekvenálási értékeknél, és a NULL szekvenálási értékek nem támogatottak.

A CDC-feldolgozás APPLY CHANGESvégrehajtásához először létre kell hoznia egy streamelési táblát, majd az APPLY CHANGES INTO SQL-ben vagy a apply_changes() Python-függvényben található utasítással megadhatja a változáscsatorna forrását, kulcsait és szekvenálását. A célstreamelési tábla létrehozásához használja az CREATE OR REFRESH STREAMING TABLE SQL-ben vagy a create_streaming_table() Pythonban található függvényt. Lásd az SCD 1. és 2. típusú feldolgozási példáit.

A szintaxis részleteiért tekintse meg a Delta Live Tables SQL-referencia- vagy Python-referenciát.

Hogyan implementáljuk a CDC-t az APPLY CHANGES FROM SNAPSHOT API-val?

Fontos

Az APPLY CHANGES FROM SNAPSHOT API nyilvános előzetes verzióban érhető el.

APPLY CHANGES FROM SNAPSHOT egy deklaratív API, amely hatékonyan határozza meg a forrásadatok változásait a sorrendben lévő pillanatképek sorozatának összehasonlításával, majd futtatja a pillanatképek rekordjainak CDC-feldolgozásához szükséges feldolgozást. APPLY CHANGES FROM SNAPSHOT csak a Delta Live Tables Python-felülete támogatja.

APPLY CHANGES FROM SNAPSHOT A pillanatképek több forrástípusból való betöltését támogatja:

  • A pillanatképek egy meglévő táblából vagy nézetből való betöltéséhez használjon rendszeres pillanatkép-betöltést. APPLY CHANGES FROM SNAPSHOT egyszerű, egyszerűsített felülettel rendelkezik, amely támogatja a pillanatképek rendszeres betöltését egy meglévő adatbázis-objektumból. Minden folyamatfrissítés új pillanatképet használ, és a betöltési időt használja a rendszer pillanatkép-verzióként. Ha egy folyamat folyamatos módban fut, a rendszer több pillanatképet is betölt az egyes folyamatfrissítésekkel egy olyan időszakon belül, amelyet a PILLANATKÉP-feldolgozás módosításait tartalmazó folyamat eseményindító időközi beállítása határoz meg.
  • Az előzmény-pillanatkép-betöltéssel feldolgozhatja az adatbázis-pillanatképeket tartalmazó fájlokat, például egy Oracle- vagy MySQL-adatbázisból vagy adattárházból létrehozott pillanatképeket.

Ha bármilyen forrástípusból szeretne CDC-feldolgozást APPLY CHANGES FROM SNAPSHOTvégezni, először létre kell hoznia egy streamelési táblát, majd a apply_changes_from_snapshot() Python függvényével megadhatja a feldolgozás megvalósításához szükséges pillanatképeket, kulcsokat és egyéb argumentumokat. Tekintse meg az időszakos pillanatkép-betöltési és a korábbi pillanatkép-betöltési példákat.

Az API-nak átadott pillanatképeknek verzió szerint növekvő sorrendben kell lenniük. Ha a Delta Live Tables rendelésen kívüli pillanatképet észlel, hibaüzenet jelenik meg.

A szintaxis részleteiért tekintse meg a Delta Live Tables Python-referenciát.

Korlátozások

Egy vagy APPLY CHANGES FROM SNAPSHOT több lekérdezés célja APPLY CHANGES nem használható streamelési tábla forrásaként. Egy vagy lekérdezés céljából APPLY CHANGES APPLY CHANGES FROM SNAPSHOT beolvasott tábláknak materializált nézetnek kell lenniük.

Példa: SCD 1. és 2. típusú SCD-feldolgozás CDF-forrásadatokkal

Az alábbi szakaszok példákat mutatnak be a Delta Live Tables SCD 1- és 2-es típusú lekérdezéseire, amelyek a céltáblákat egy változásadatcsatorna forráseseményei alapján frissítik:

  1. Új felhasználói rekordokat hoz létre.
  2. Töröl egy felhasználói rekordot.
  3. Frissíti a felhasználói rekordokat. Az 1. típusú SCD-példában az utolsó UPDATE műveletek késve érkeznek, és a céltáblából kerülnek ki, ami a rendelésen kívüli események kezelését mutatja.

Az alábbi példák feltételezik, hogy ismerik a Delta Live Tables-folyamatok konfigurálását és frissítését. Lásd az oktatóanyagot: Futtassa az első Delta Live Tables-folyamatot.

A példák futtatásához először létre kell hoznia egy mintaadatkészletet. Lásd: Tesztadatok létrehozása.

Ezekhez a példákhoz a következő bemeneti rekordok tartoznak:

Felhasználói azonosító név Város művelet sequenceNum
124 Raul Oaxaca INSERT 0
123 Isabel Monterrey INSERT 0
125 Mercedes Tijuana INSERT 2
126 Liliom Cancun INSERT 2
123 null null Törlés... 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Ha a példaadatok utolsó sorát törli, az a következő rekordot szúrja be, amely meghatározza, hogy hol kell csonkolja a rekordokat:

Felhasználói azonosító név Város művelet sequenceNum
null null null MEGCSONKÍT 3

Feljegyzés

Az alábbi példák a műveletek és TRUNCATE a műveletek DELETE megadására szolgáló lehetőségeket is tartalmazzák, de ezek nem kötelezőek.

1. típusú SCD-frissítések feldolgozása

Az alábbi példa az 1. típusú SCD-frissítések feldolgozását mutatja be:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Az 1. SCD-típus futtatása után a céltábla a következő rekordokat tartalmazza:

Felhasználói azonosító név Város
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Liliom Cancun

Az SCD 1. típusának a további TRUNCATE rekorddal való futtatása után a rekordok 124 és 126 a csonkolt adatok a TRUNCATE következő sequenceNum=3művelet miatt csonkulnak, és a céltábla a következő rekordot tartalmazza:

Felhasználói azonosító név Város
125 Mercedes Guadalajara

2. típusú SCD-frissítések feldolgozása

Az alábbi példa az SCD 2. típusú frissítéseinek feldolgozását mutatja be:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

A 2. SCD-típus futtatása után a céltábla a következő rekordokat tartalmazza:

Felhasználói azonosító név Város __START_AT __END_AT
123 Isabel Monterrey 0 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 0 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Liliom Cancun 2 null

A 2. típusú SCD-lekérdezések a céltáblában nyomon követendő kimeneti oszlopok egy részhalmazát is megadhatják. A többi oszlop módosításai ahelyett, hogy új előzményrekordokat hoznak létre, frissítve lesznek. Az alábbi példa az oszlop nyomon követéséből való kizárását city mutatja be:

Az alábbi példa a 2. scd típusú nyomkövetési előzmények használatát mutatja be:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

A példa további TRUNCATE rekord nélküli futtatása után a céltábla a következő rekordokat tartalmazza:

Felhasználói azonosító név Város __START_AT __END_AT
123 Isabel Chihuahua 0 6
124 Raul Oaxaca 0 null
125 Mercedes Guadalajara 2 null
126 Liliom Cancun 2 null

Tesztadatok létrehozása

Az alábbi kód egy példaadatkészlet létrehozásához használható az oktatóanyagban található példa-lekérdezésekben. Feltételezve, hogy rendelkezik a megfelelő hitelesítő adatokkal egy új séma létrehozásához és egy új tábla létrehozásához, ezeket az utasításokat jegyzetfüzet vagy Databricks SQL használatával futtathatja. A következő kód nem a Delta Live Tables-folyamat részeként futtatható:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Példa: Időszakos pillanatkép-feldolgozás

Az alábbi példa a 2. típusú SCD-feldolgozást mutatja be, amely a következő helyen mycatalog.myschema.mytabletárolt tábla pillanatképeit dolgozza fel. A feldolgozás eredménye egy .target

mycatalog.myschema.mytable rekordok az időbélyegen 2024.01.01. 00:00:00

Kulcs Érték
0 a1
2 a2

mycatalog.myschema.mytable records at the timestamp 2024-01-01 12:00:00

Kulcs Érték
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

A pillanatképek feldolgozása után a céltábla a következő rekordokat tartalmazza:

Kulcs Érték __START_AT __END_AT
0 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 null
3 a3 2024-01-01 12:00:00 null

Példa: Korábbi pillanatképek feldolgozása

Az alábbi példa a 2. típusú SCD-feldolgozást mutatja be, amely egy felhőalapú tárolórendszerben tárolt két pillanatkép forráseseményei alapján frissíti a céltáblát:

Pillanatkép a következő helyen timestamp: /<PATH>/filename1.csv

Kulcs TrackingColumn NonTrackingColumn
0 a1 b1
2 a2 b2
4 a4 b4

Pillanatkép a következő helyen timestamp + 5: /<PATH>/filename2.csv

Kulcs TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

Az alábbi példakód az SCD 2. típusú frissítéseinek feldolgozását mutatja be az alábbi pillanatképekkel:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

A pillanatképek feldolgozása után a céltábla a következő rekordokat tartalmazza:

Kulcs TrackingColumn NonTrackingColumn __START_AT __END_AT
0 a1 b1 0 2
2 a2 b2 0 2
2 a2_new b2 2 null
3 a3 b3 2 null
4 a4 b4_new 0 null

Adatok hozzáadása, módosítása vagy törlése egy célstreamelési táblában

Ha a folyamat táblákat tesz közzé a Unity Catalogban, az utasítások által APPLY CHANGES INTO létrehozott célstreamelési táblák módosításához használhat adatmanipulációs nyelvi (DML) utasításokat, például beszúrási, frissítési, törlési és egyesítési utasításokat.

Feljegyzés

  • A streamelési tábla táblázatsémát módosító DML-utasítások nem támogatottak. Győződjön meg arról, hogy a DML-utasítások nem próbálják továbbfejleszteni a táblázatsémát.
  • A streamelési táblát frissítő DML-utasítások csak megosztott Unity Catalog-fürtön vagy SQL-raktárban futtathatók a Databricks Runtime 13.3 LTS és újabb verziók használatával.
  • Mivel a streameléshez csak hozzáfűző adatforrások szükségesek, ha a feldolgozáshoz forrásstreamelési táblából kell streamelni a módosításokat (például DML-utasítások alapján), állítsa be a skipChangeCommits jelölőt a forrásstreamelési tábla olvasása során. Ha skipChangeCommits be van állítva, a rendszer figyelmen kívül hagyja a forrástábla rekordjait törlő vagy módosító tranzakciókat. Ha a feldolgozáshoz nincs szükség streamelési táblára, céltáblaként használhat materializált nézetet (amely nem rendelkezik csak hozzáfűzési korlátozással).

Mivel a Delta Live Tables egy megadott SEQUENCE BY oszlopot használ, és a céltábla és __END_AT a céltábla oszlopai számára propagálja a megfelelő szekvenálási értékeket __START_AT (2. SCD-típus esetén), gondoskodnia kell arról, hogy a DML-utasítások érvényes értékeket használjanak ezekhez az oszlopokhoz a rekordok megfelelő sorrendjének fenntartásához. Lásd : Hogyan implementálják a CDC-t az APPLY CHANGES API-val?.

A DML-utasítások streamelési táblákkal való használatáról további információt a streamelőtáblák adatainak hozzáadása, módosítása vagy törlése című témakörben talál.

Az alábbi példa egy aktív rekordot szúr be 5-ös kezdősorozattal:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Delta Live Tables CDC-lekérdezés által feldolgozott rekordok adatainak lekérése

Feljegyzés

A következő metrikákat csak APPLY CHANGES a lekérdezések rögzítik, a lekérdezések nem APPLY CHANGES FROM SNAPSHOT .

A lekérdezések a következő metrikákat rögzítik APPLY CHANGES :

  • num_upserted_rows: A frissítés során az adathalmazba beszúrt kimeneti sorok száma.
  • num_deleted_rows: A frissítés során az adathalmazból törölt meglévő kimeneti sorok száma.

A num_output_rows nem CDC-folyamatok kimenetének számító metrikát a rendszer nem rögzíti a lekérdezésekhez apply changes .

Milyen adatobjektumokat használ a Delta Live Tables CDC-feldolgozásához?

Megjegyzés: A következő adatstruktúrák csak feldolgozásra APPLY CHANGES vonatkoznak, feldolgozásra nem APPLY CHANGES FROM SNAPSHOT .

Amikor deklarálja a céltáblát a Hive metaadattárban, a rendszer két adatstruktúrát hoz létre:

  • A céltáblához rendelt névvel rendelkező nézet.
  • A Delta Live Tables által a CDC-feldolgozás kezelésére használt belső háttértábla. Ezt a táblát a céltábla nevére való előtagolással __apply_changes_storage_ nevezik el.

Ha például deklarál egy névvel ellátott dlt_cdc_targetcéltáblát, megjelenik egy elnevezett dlt_cdc_target nézet és egy tábla neve __apply_changes_storage_dlt_cdc_target a metaadattárban. A nézet létrehozása lehetővé teszi, hogy a Delta Live Tables kiszűrje a rendelésen kívüli adatok kezeléséhez szükséges további információkat (például a sírköveket és a verziókat). A feldolgozott adatok megtekintéséhez kérdezze le a célnézetet. Mivel a tábla sémája változhat a __apply_changes_storage_ jövőbeli funkciók vagy fejlesztések támogatásához, nem szabad éles használatra lekérdezni a táblát. Ha manuálisan ad hozzá adatokat a táblához, a rendszer feltételezi, hogy a rekordok más módosítások elé kerülnek, mert hiányoznak a verzióoszlopok.

Ha egy folyamat közzétesz a Unity Catalogban, a belső háttértáblák nem érhetők el a felhasználók számára.