Megosztás a következőn keresztül:


A Delta Lake táblaelőzményeinek működése

Minden olyan művelet, amely módosít egy Delta Lake-táblát, létrehoz egy új táblaverziót. Az előzményadatok segítségével naplózhatja a műveleteket, visszaállíthat egy táblát, vagy lekérdezhet egy táblát egy adott időpontban időutazással.

Feljegyzés

A Databricks nem javasolja a Delta Lake-táblaelőzmények hosszú távú biztonsági mentési megoldásként való használatát az adatarchiváláshoz. A Databricks azt javasolja, hogy csak az elmúlt 7 napot használja az időutazási műveletekhez, kivéve, ha az adatok és a naplómegőrzési konfigurációkat nagyobb értékre állította be.

Delta-tábla előzményeinek lekérése

A parancs futtatásával lekérheti az egyes írások műveleteit, felhasználóit és időbélyegét history egy Delta-táblába. A rendszer fordított időrendi sorrendben adja vissza a műveleteket.

A táblaelőzmények megőrzését a táblabeállítás delta.logRetentionDurationhatározza meg, amely alapértelmezés szerint 30 nap.

Feljegyzés

Az időutazást és a táblázatelőzményeket különböző megőrzési küszöbértékek szabályozzák. Lásd : Mi a Delta Lake időutazás?.

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

A Spark SQL szintaxisának részleteiért tekintse meg az ELŐZMÉNYEK LEÍRÁSA című témakört.

A Scala/Java/Python szintaxisának részleteiért tekintse meg a Delta Lake API dokumentációját .

A Catalog Explorer vizuális nézetet biztosít a Delta-táblák részletes táblázatadatairól és előzményeiről. A táblázatséma és a mintaadatok mellett az Előzmények fülre kattintva megtekintheti a táblázat előzményeit is DESCRIBE HISTORY.

Előzmények séma

A művelet kimenete az history alábbi oszlopokkal rendelkezik.

Oszlop Típus Leírás
Verzió hosszú A művelet által létrehozott táblaverzió.
időbélyeg időbélyeg A verzió véglegesítésekor.
Felhasználói azonosító húr A műveletet futtató felhasználó azonosítója.
Felhasználónév húr A műveletet futtató felhasználó neve.
művelet húr A művelet neve.
operationParameters térkép A művelet paraméterei (például predikátumok).)
feladat Struct A műveletet futtató feladat részletei.
jegyzetfüzet Struct Annak a jegyzetfüzetnek a részletei, amelyből a műveletet futtatták.
clusterId húr Annak a fürtnek az azonosítója, amelyen a művelet futott.
readVersion hosszú Az írási művelet végrehajtásához beolvasott tábla verziója.
isolationLevel húr A művelethez használt elkülönítési szint.
isBlindAppend Logikai Hogy ez a művelet hozzáfűzött-e adatokat.
operationMetrics térkép A művelet metrikái (például a módosított sorok és fájlok száma).)
userMetadata húr Felhasználó által definiált véglegesítési metaadatok megadása esetén
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Feljegyzés

Műveleti metrikák kulcsai

A history művelet a műveleti metrikák gyűjteményét adja vissza az operationMetrics oszloptérképen.

Az alábbi táblázatok a térképkulcs-definíciókat művelet szerint sorolják fel.

Művelet Metrika neve Leírás
ÍRÁS, TÁBLA LÉTREHOZÁSA KIJELÖLÉSKÉNT, TÁBLA LECSERÉLÉSE KIJELÖLÉSKÉNT, MÁSOLÁS
numFiles Megírt fájlok száma.
numOutputBytes Méret bájtban az írott tartalomban.
numOutputRows Megírt sorok száma.
STREAMELÉSI FRISSÍTÉS
numAddedFiles A hozzáadott fájlok száma.
numRemovedFiles Az eltávolított fájlok száma.
numOutputRows Megírt sorok száma.
numOutputBytes Az írás mérete bájtban.
Törlés...
numAddedFiles A hozzáadott fájlok száma. Nincs megadva a tábla partícióinak törlésekor.
numRemovedFiles Az eltávolított fájlok száma.
numDeletedRows Az eltávolított sorok száma. Nincs megadva a tábla partícióinak törlésekor.
numCopiedRows A fájlok törlése során másolt sorok száma.
executionTimeMs A teljes művelet végrehajtásához szükséges idő.
scanTimeMs A fájlok egyezések kereséséhez szükséges idő.
rewriteTimeMs A egyeztetett fájlok újraírásához szükséges idő.
MEGCSONKÍT
numRemovedFiles Az eltávolított fájlok száma.
executionTimeMs A teljes művelet végrehajtásához szükséges idő.
EGYESÜL
numSourceRows A forrásadatkeret sorainak száma.
numTargetRowsInserted A céltáblába beszúrt sorok száma.
numTargetRowsUpdated A céltáblában frissített sorok száma.
numTargetRowsDeleted A céltáblában törölt sorok száma.
numTargetRowsCopied A másolt célsorok száma.
numOutputRows Kiírt sorok teljes száma.
numTargetFilesAdded A fogadóhoz (célhoz) hozzáadott fájlok száma.
numTargetFilesRemoved A fogadóból (célból) eltávolított fájlok száma.
executionTimeMs A teljes művelet végrehajtásához szükséges idő.
scanTimeMs A fájlok egyezések kereséséhez szükséges idő.
rewriteTimeMs A egyeztetett fájlok újraírásához szükséges idő.
UPDATE
numAddedFiles A hozzáadott fájlok száma.
numRemovedFiles Az eltávolított fájlok száma.
numUpdatedRows A frissített sorok száma.
numCopiedRows A fájlok frissítése során átmásolt sorok száma.
executionTimeMs A teljes művelet végrehajtásához szükséges idő.
scanTimeMs A fájlok egyezések kereséséhez szükséges idő.
rewriteTimeMs A egyeztetett fájlok újraírásához szükséges idő.
FSCK numRemovedFiles Az eltávolított fájlok száma.
MEGTÉRÍT numConvertedFiles Konvertált parquet-fájlok száma.
OPTIMIZE
numAddedFiles A hozzáadott fájlok száma.
numRemovedFiles Optimalizált fájlok száma.
numAddedBytes A tábla optimalizálása után hozzáadott bájtok száma.
numRemovedBytes Eltávolított bájtok száma.
minFileSize A legkisebb fájl mérete a táblázat optimalizálása után.
p25FileSize A tábla optimalizálása után a 25. percentilis fájl mérete.
p50FileSize A tábla optimalizálása után medián fájlméret.
p75FileSize A táblázat optimalizálása után a 75. percentilisfájl mérete.
maxFileSize A legnagyobb fájl mérete a tábla optimalizálása után.
CLONE
sourceTableSize A forrástábla mérete bájtban a klónozott verzióban.
sourceNumOfFiles A forrástáblában lévő fájlok száma a klónozott verzióban.
numRemovedFiles A céltáblából eltávolított fájlok száma egy korábbi Delta-tábla lecserélése esetén.
removedFilesSize A céltáblából eltávolított fájlok teljes mérete bájtban, ha egy korábbi Delta-táblázatot cseréltek le.
numCopiedFiles Az új helyre másolt fájlok száma. 0 a sekély klónok esetében.
copiedFilesSize Az új helyre másolt fájlok teljes mérete bájtban. 0 a sekély klónok esetében.
VISSZAAD
tableSizeAfterRestore A táblázat mérete bájtban a visszaállítás után.
numOfFilesAfterRestore A táblában lévő fájlok száma a visszaállítás után.
numRemovedFiles A visszaállítási művelet által eltávolított fájlok száma.
numRestoredFiles A visszaállítás eredményeként hozzáadott fájlok száma.
removedFilesSize A visszaállítás által eltávolított fájlok bájtban megadott mérete.
restoredFilesSize Méret bájtban a visszaállítás által hozzáadott fájlokban.
VACUUM
numDeletedFiles Törölt fájlok száma.
numVacuumedDirectories A porszívózott könyvtárak száma.
numFilesToDelete Törölni kívánt fájlok száma.

Mi az a Delta Lake időutazás?

A Delta Lake időutazása támogatja a korábbi táblaverziók lekérdezését időbélyeg vagy táblaverzió alapján (a tranzakciónaplóban rögzítettek szerint). Az időutazást az alábbi alkalmazásokhoz használhatja:

  • Elemzések, jelentések vagy kimenetek újbóli létrehozása (például egy gépi tanulási modell kimenete). Ez hasznos lehet a hibakereséshez vagy a naplózáshoz, különösen a szabályozott iparágakban.
  • Összetett időbeli lekérdezések írása.
  • Az adatok hibáinak kijavítása.
  • Pillanatkép-elkülönítés biztosítása lekérdezések készletéhez a gyorsan változó táblákhoz.

Fontos

Az időutazással elérhető táblázatverziókat a tranzakciónapló-fájlok megőrzési küszöbértékének, valamint a műveletek gyakoriságának és meghatározott adatmegőrzésének VACUUM kombinációja határozza meg. Ha naponta fut VACUUM az alapértelmezett értékekkel, 7 napnyi adat áll rendelkezésre az időutazáshoz.

Delta időutazás szintaxisa

Az időutazással rendelkező Delta-táblát úgy kérdezheti le, hogy hozzáad egy záradékot a táblanév specifikációja után.

  • timestamp_expression az alábbiak bármelyike lehet:
    • '2018-10-18T22:15:12.013Z', azaz olyan sztring, amely időbélyegre vethető
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', azaz dátumsztring
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Bármely más kifejezés, amely időbélyegbe van öntött vagy adható
  • version egy hosszú érték, amely a következő kimenetből DESCRIBE HISTORY table_speckérhető le: .

Sem nem timestamp_expression version lehet alqueries.

A rendszer csak dátum- vagy időbélyeg-sztringeket fogad el. Például: "2019-01-01" és "2019-01-01T00:00:00.000Z". Lásd például a következő kódot:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

A szintaxis használatával @ megadhatja az időbélyeget vagy a verziót a táblanév részeként. Az időbélyegnek formátumban yyyyMMddHHmmssSSS kell lennie. A verziót a @ verzióra való előerősítéssel v adhatja meg. Lásd például a következő kódot:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

Mik azok a tranzakciónapló-ellenőrzőpontok?

A Delta Lake JSON-fájlként rögzíti a táblaverziókat a könyvtárban, amelyet a _delta_log táblaadatok mellett tárol. Az ellenőrzőpont-lekérdezés optimalizálása érdekében a Delta Lake a táblaverziókat parquet ellenőrzőpontfájlokra összesíti, így nem szükséges elolvasni a táblaelőzmények összes JSON-verzióját. Az Azure Databricks optimalizálja az ellenőrzőpontok gyakoriságát az adatmérethez és a számítási feladathoz. A felhasználóknak nem szabad közvetlenül használniuk az ellenőrzőpontokat. Az ellenőrzőpont gyakorisága értesítés nélkül változhat.

Adatmegőrzés konfigurálása időutazási lekérdezésekhez

Az előző táblaverzió lekérdezéséhez a naplót és az adott verzió adatfájljait is meg kell őriznie.

A rendszer törli az adatfájlokat, amikor VACUUM egy táblán fut. A Delta Lake automatikusan kezeli a naplófájlok eltávolítását a táblaverziók ellenőrzése után.

Mivel a legtöbb Delta-tábla VACUUM rendszeresen fut ellenük, az időponthoz kötött lekérdezéseknek meg kell tartaniuk az alapértelmezett megőrzési küszöbértéket VACUUM, ami alapértelmezés szerint 7 nap.

A Delta-táblák adatmegőrzési küszöbértékének növeléséhez konfigurálnia kell a következő táblázattulajdonságokat:

  • delta.logRetentionDuration = "interval <interval>": azt határozza meg, hogy a tábla előzményei mennyi ideig legyenek megtartva. Az alapértelmezett érték interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": meghatározza az aktuális táblaverzióban már nem hivatkozott adatfájlok eltávolítására használt küszöbértéket VACUUM . Az alapértelmezett érték interval 7 days.

A deltatulajdonságokat a tábla létrehozásakor vagy utasítással ALTER TABLE is megadhatja. Lásd a Delta-tábla tulajdonságaira vonatkozó referenciát.

Feljegyzés

Mindkét tulajdonságot be kell állítania, hogy a táblaelőzmények hosszabb ideig megmaradjanak a gyakori VACUUM műveleteket tartalmazó táblák esetében. Például a 30 napos előzményadatok eléréséhez állítsa be delta.deletedFileRetentionDuration = "interval 30 days" (amely megfelel az alapértelmezett beállításnak delta.logRetentionDuration).

Az adatmegőrzési küszöbérték növelése a tárolási költségek emelkedését okozhatja, mivel a rendszer több adatfájlt tart fenn.

Delta-tábla visszaállítása korábbi állapotba

A parancs használatával RESTORE visszaállíthatja a Delta-táblák korábbi állapotát. A Delta-táblák belsőleg megőrzik a tábla korábbi verzióit, amelyek lehetővé teszik a korábbi állapot visszaállítását. A parancs a korábbi állapotnak megfelelő verziót vagy a korábbi állapot létrehozásának időbélyegét támogatja.RESTORE

Fontos

  • Visszaállíthat egy már visszaállított táblát.
  • A klónozott táblák visszaállíthatók.
  • Engedéllyel kell rendelkeznie MODIFY a visszaállítandó táblára.
  • A táblák nem állíthatók vissza olyan régebbi verzióra, ahol az adatfájlokat manuálisan vagy a program törölte vacuum. Ennek a verziónak a részleges visszaállítása akkor is lehetséges, ha spark.sql.files.ignoreMissingFiles be van állítva.true
  • A korábbi állapotba való visszaállítás időbélyegének formátuma a következő yyyy-MM-dd HH:mm:ss: . A csak dátum(yyyy-MM-dd) sztring megadása is támogatott.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

A szintaxis részleteiért lásd: RESTORE.

Fontos

A visszaállítás adatmódosítási műveletnek minősül. A parancs által hozzáadott Delta Lake-naplóbejegyzések a RESTORE dataChange igaz értékre vannak állítva. Ha van egy alárendelt alkalmazás, például egy Strukturált streamelési feladat, amely egy Delta Lake-tábla frissítéseit dolgozza fel, a visszaállítási művelet által hozzáadott adatváltozási naplóbejegyzések új adatfrissítéseknek minősülnek, és a feldolgozásuk duplikált adatokat eredményezhet.

Példa:

Táblaverzió Művelet Változásnapló-frissítések Adatváltozási naplófrissítések rekordjai
0 INSERT AddFile(/path/to/file-1, dataChange = true) (név = Viktor, kor = 29, (név = György, kor = 55)
0 INSERT AddFile(/path/to/file-2, dataChange = true) (név = György, kor = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Nincs olyan rekord, amely az Optimalizálási tömörítés funkcióval nem módosítja a táblában lévő adatokat)
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (név = Viktor, kor = 29), (név = György, kor = 55), (név = György, kor = 39)

Az előző példában a parancs olyan RESTORE frissítéseket eredményez, amelyek a Delta tábla 0- és 1-es verziójának olvasása során már megjelentek. Ha egy streamelési lekérdezés olvasta ezt a táblát, akkor ezeket a fájlokat újonnan hozzáadott adatoknak tekintjük, és újra feldolgozzuk őket.

Metrikák visszaállítása

RESTORE a művelet befejezése után a következő metrikákat egyetlen sorú DataFrame-adatkeretként jelenti:

  • table_size_after_restore: A tábla mérete a visszaállítás után.

  • num_of_files_after_restore: A táblában lévő fájlok száma a visszaállítás után.

  • num_removed_files: A táblából eltávolított (logikailag törölt) fájlok száma.

  • num_restored_files: A visszaállítás miatt visszaállított fájlok száma.

  • removed_files_size: A táblából eltávolított fájlok teljes mérete bájtban.

  • restored_files_size: A visszaállított fájlok teljes mérete bájtban.

    Példa a visszaállítási metrikákra

Példák a Delta Lake időutazás használatára

  • A véletlen törlések kijavítása a felhasználó 111tábláiban:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Egy tábla véletlen helytelen frissítésének javítása:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Az elmúlt héten hozzáadott új ügyfelek számának lekérdezése.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Hogyan megkeresni az utolsó véglegesítés verzióját a Spark-munkamenetben?

Ha le szeretné kérdezni az aktuális SparkSession véglegesítés verziószámát az összes szálon és táblán, kérje le az SQL-konfigurációt spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Ha a kulcs nem véglegesítést hajtott végre SparkSession, a kulcs lekérdezése üres értéket ad vissza.

Feljegyzés

Ha ugyanazt SparkSession több szálon is megosztja, az hasonlít egy változó több szálon való megosztásához; a konfigurációs érték egyidejű frissítésével versenyhelyzetbe ütközhet.