A Delta Lake táblaelőzményeinek működése
Minden olyan művelet, amely módosít egy Delta Lake-táblát, létrehoz egy új táblaverziót. Az előzményadatok segítségével naplózhatja a műveleteket, visszaállíthat egy táblát, vagy lekérdezhet egy táblát egy adott időpontban időutazással.
Feljegyzés
A Databricks nem javasolja a Delta Lake-táblaelőzmények hosszú távú biztonsági mentési megoldásként való használatát az adatarchiváláshoz. A Databricks azt javasolja, hogy csak az elmúlt 7 napot használja az időutazási műveletekhez, kivéve, ha az adatok és a naplómegőrzési konfigurációkat nagyobb értékre állította be.
Delta-tábla előzményeinek lekérése
A parancs futtatásával lekérheti az egyes írások műveleteit, felhasználóit és időbélyegét history
egy Delta-táblába. A rendszer fordított időrendi sorrendben adja vissza a műveleteket.
A táblaelőzmények megőrzését a táblabeállítás delta.logRetentionDuration
határozza meg, amely alapértelmezés szerint 30 nap.
Feljegyzés
Az időutazást és a táblázatelőzményeket különböző megőrzési küszöbértékek szabályozzák. Lásd : Mi a Delta Lake időutazás?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
A Spark SQL szintaxisának részleteiért tekintse meg az ELŐZMÉNYEK LEÍRÁSA című témakört.
A Scala/Java/Python szintaxisának részleteiért tekintse meg a Delta Lake API dokumentációját .
A Catalog Explorer vizuális nézetet biztosít a Delta-táblák részletes táblázatadatairól és előzményeiről. A táblázatséma és a mintaadatok mellett az Előzmények fülre kattintva megtekintheti a táblázat előzményeit is DESCRIBE HISTORY
.
Előzmények séma
A művelet kimenete az history
alábbi oszlopokkal rendelkezik.
Oszlop | Típus | Leírás |
---|---|---|
Verzió | hosszú | A művelet által létrehozott táblaverzió. |
időbélyeg | időbélyeg | A verzió véglegesítésekor. |
Felhasználói azonosító | húr | A műveletet futtató felhasználó azonosítója. |
Felhasználónév | húr | A műveletet futtató felhasználó neve. |
művelet | húr | A művelet neve. |
operationParameters | térkép | A művelet paraméterei (például predikátumok).) |
feladat | Struct | A műveletet futtató feladat részletei. |
jegyzetfüzet | Struct | Annak a jegyzetfüzetnek a részletei, amelyből a műveletet futtatták. |
clusterId | húr | Annak a fürtnek az azonosítója, amelyen a művelet futott. |
readVersion | hosszú | Az írási művelet végrehajtásához beolvasott tábla verziója. |
isolationLevel | húr | A művelethez használt elkülönítési szint. |
isBlindAppend | Logikai | Hogy ez a művelet hozzáfűzött-e adatokat. |
operationMetrics | térkép | A művelet metrikái (például a módosított sorok és fájlok száma).) |
userMetadata | húr | Felhasználó által definiált véglegesítési metaadatok megadása esetén |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Feljegyzés
- A többi oszlop közül néhány nem érhető el, ha deltatáblába ír az alábbi módszerekkel:
- A jövőben hozzáadott oszlopok mindig az utolsó oszlop után lesznek hozzáadva.
Műveleti metrikák kulcsai
A history
művelet a műveleti metrikák gyűjteményét adja vissza az operationMetrics
oszloptérképen.
Az alábbi táblázatok a térképkulcs-definíciókat művelet szerint sorolják fel.
Művelet | Metrika neve | Leírás |
---|---|---|
ÍRÁS, TÁBLA LÉTREHOZÁSA KIJELÖLÉSKÉNT, TÁBLA LECSERÉLÉSE KIJELÖLÉSKÉNT, MÁSOLÁS | ||
numFiles | Megírt fájlok száma. | |
numOutputBytes | Méret bájtban az írott tartalomban. | |
numOutputRows | Megírt sorok száma. | |
STREAMELÉSI FRISSÍTÉS | ||
numAddedFiles | A hozzáadott fájlok száma. | |
numRemovedFiles | Az eltávolított fájlok száma. | |
numOutputRows | Megírt sorok száma. | |
numOutputBytes | Az írás mérete bájtban. | |
Törlés... | ||
numAddedFiles | A hozzáadott fájlok száma. Nincs megadva a tábla partícióinak törlésekor. | |
numRemovedFiles | Az eltávolított fájlok száma. | |
numDeletedRows | Az eltávolított sorok száma. Nincs megadva a tábla partícióinak törlésekor. | |
numCopiedRows | A fájlok törlése során másolt sorok száma. | |
executionTimeMs | A teljes művelet végrehajtásához szükséges idő. | |
scanTimeMs | A fájlok egyezések kereséséhez szükséges idő. | |
rewriteTimeMs | A egyeztetett fájlok újraírásához szükséges idő. | |
MEGCSONKÍT | ||
numRemovedFiles | Az eltávolított fájlok száma. | |
executionTimeMs | A teljes művelet végrehajtásához szükséges idő. | |
EGYESÜL | ||
numSourceRows | A forrásadatkeret sorainak száma. | |
numTargetRowsInserted | A céltáblába beszúrt sorok száma. | |
numTargetRowsUpdated | A céltáblában frissített sorok száma. | |
numTargetRowsDeleted | A céltáblában törölt sorok száma. | |
numTargetRowsCopied | A másolt célsorok száma. | |
numOutputRows | Kiírt sorok teljes száma. | |
numTargetFilesAdded | A fogadóhoz (célhoz) hozzáadott fájlok száma. | |
numTargetFilesRemoved | A fogadóból (célból) eltávolított fájlok száma. | |
executionTimeMs | A teljes művelet végrehajtásához szükséges idő. | |
scanTimeMs | A fájlok egyezések kereséséhez szükséges idő. | |
rewriteTimeMs | A egyeztetett fájlok újraírásához szükséges idő. | |
UPDATE | ||
numAddedFiles | A hozzáadott fájlok száma. | |
numRemovedFiles | Az eltávolított fájlok száma. | |
numUpdatedRows | A frissített sorok száma. | |
numCopiedRows | A fájlok frissítése során átmásolt sorok száma. | |
executionTimeMs | A teljes művelet végrehajtásához szükséges idő. | |
scanTimeMs | A fájlok egyezések kereséséhez szükséges idő. | |
rewriteTimeMs | A egyeztetett fájlok újraírásához szükséges idő. | |
FSCK | numRemovedFiles | Az eltávolított fájlok száma. |
MEGTÉRÍT | numConvertedFiles | Konvertált parquet-fájlok száma. |
OPTIMIZE | ||
numAddedFiles | A hozzáadott fájlok száma. | |
numRemovedFiles | Optimalizált fájlok száma. | |
numAddedBytes | A tábla optimalizálása után hozzáadott bájtok száma. | |
numRemovedBytes | Eltávolított bájtok száma. | |
minFileSize | A legkisebb fájl mérete a táblázat optimalizálása után. | |
p25FileSize | A tábla optimalizálása után a 25. percentilis fájl mérete. | |
p50FileSize | A tábla optimalizálása után medián fájlméret. | |
p75FileSize | A táblázat optimalizálása után a 75. percentilisfájl mérete. | |
maxFileSize | A legnagyobb fájl mérete a tábla optimalizálása után. | |
CLONE | ||
sourceTableSize | A forrástábla mérete bájtban a klónozott verzióban. | |
sourceNumOfFiles | A forrástáblában lévő fájlok száma a klónozott verzióban. | |
numRemovedFiles | A céltáblából eltávolított fájlok száma egy korábbi Delta-tábla lecserélése esetén. | |
removedFilesSize | A céltáblából eltávolított fájlok teljes mérete bájtban, ha egy korábbi Delta-táblázatot cseréltek le. | |
numCopiedFiles | Az új helyre másolt fájlok száma. 0 a sekély klónok esetében. | |
copiedFilesSize | Az új helyre másolt fájlok teljes mérete bájtban. 0 a sekély klónok esetében. | |
VISSZAAD | ||
tableSizeAfterRestore | A táblázat mérete bájtban a visszaállítás után. | |
numOfFilesAfterRestore | A táblában lévő fájlok száma a visszaállítás után. | |
numRemovedFiles | A visszaállítási művelet által eltávolított fájlok száma. | |
numRestoredFiles | A visszaállítás eredményeként hozzáadott fájlok száma. | |
removedFilesSize | A visszaállítás által eltávolított fájlok bájtban megadott mérete. | |
restoredFilesSize | Méret bájtban a visszaállítás által hozzáadott fájlokban. | |
VACUUM | ||
numDeletedFiles | Törölt fájlok száma. | |
numVacuumedDirectories | A porszívózott könyvtárak száma. | |
numFilesToDelete | Törölni kívánt fájlok száma. |
Mi az a Delta Lake időutazás?
A Delta Lake időutazása támogatja a korábbi táblaverziók lekérdezését időbélyeg vagy táblaverzió alapján (a tranzakciónaplóban rögzítettek szerint). Az időutazást az alábbi alkalmazásokhoz használhatja:
- Elemzések, jelentések vagy kimenetek újbóli létrehozása (például egy gépi tanulási modell kimenete). Ez hasznos lehet a hibakereséshez vagy a naplózáshoz, különösen a szabályozott iparágakban.
- Összetett időbeli lekérdezések írása.
- Az adatok hibáinak kijavítása.
- Pillanatkép-elkülönítés biztosítása lekérdezések készletéhez a gyorsan változó táblákhoz.
Fontos
Az időutazással elérhető táblázatverziókat a tranzakciónapló-fájlok megőrzési küszöbértékének, valamint a műveletek gyakoriságának és meghatározott adatmegőrzésének VACUUM
kombinációja határozza meg. Ha naponta fut VACUUM
az alapértelmezett értékekkel, 7 napnyi adat áll rendelkezésre az időutazáshoz.
Delta időutazás szintaxisa
Az időutazással rendelkező Delta-táblát úgy kérdezheti le, hogy hozzáad egy záradékot a táblanév specifikációja után.
timestamp_expression
az alábbiak bármelyike lehet:'2018-10-18T22:15:12.013Z'
, azaz olyan sztring, amely időbélyegre vethetőcast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, azaz dátumsztringcurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Bármely más kifejezés, amely időbélyegbe van öntött vagy adható
version
egy hosszú érték, amely a következő kimenetbőlDESCRIBE HISTORY table_spec
kérhető le: .
Sem nem timestamp_expression
version
lehet alqueries.
A rendszer csak dátum- vagy időbélyeg-sztringeket fogad el. Például: "2019-01-01"
és "2019-01-01T00:00:00.000Z"
. Lásd például a következő kódot:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
A szintaxis használatával @
megadhatja az időbélyeget vagy a verziót a táblanév részeként. Az időbélyegnek formátumban yyyyMMddHHmmssSSS
kell lennie. A verziót a @
verzióra való előerősítéssel v
adhatja meg. Lásd például a következő kódot:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
Mik azok a tranzakciónapló-ellenőrzőpontok?
A Delta Lake JSON-fájlként rögzíti a táblaverziókat a könyvtárban, amelyet a _delta_log
táblaadatok mellett tárol. Az ellenőrzőpont-lekérdezés optimalizálása érdekében a Delta Lake a táblaverziókat parquet ellenőrzőpontfájlokra összesíti, így nem szükséges elolvasni a táblaelőzmények összes JSON-verzióját. Az Azure Databricks optimalizálja az ellenőrzőpontok gyakoriságát az adatmérethez és a számítási feladathoz. A felhasználóknak nem szabad közvetlenül használniuk az ellenőrzőpontokat. Az ellenőrzőpont gyakorisága értesítés nélkül változhat.
Adatmegőrzés konfigurálása időutazási lekérdezésekhez
Az előző táblaverzió lekérdezéséhez a naplót és az adott verzió adatfájljait is meg kell őriznie.
A rendszer törli az adatfájlokat, amikor VACUUM
egy táblán fut. A Delta Lake automatikusan kezeli a naplófájlok eltávolítását a táblaverziók ellenőrzése után.
Mivel a legtöbb Delta-tábla VACUUM
rendszeresen fut ellenük, az időponthoz kötött lekérdezéseknek meg kell tartaniuk az alapértelmezett megőrzési küszöbértéket VACUUM
, ami alapértelmezés szerint 7 nap.
A Delta-táblák adatmegőrzési küszöbértékének növeléséhez konfigurálnia kell a következő táblázattulajdonságokat:
delta.logRetentionDuration = "interval <interval>"
: azt határozza meg, hogy a tábla előzményei mennyi ideig legyenek megtartva. Az alapértelmezett értékinterval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: meghatározza az aktuális táblaverzióban már nem hivatkozott adatfájlok eltávolítására használt küszöbértéketVACUUM
. Az alapértelmezett értékinterval 7 days
.
A deltatulajdonságokat a tábla létrehozásakor vagy utasítással ALTER TABLE
is megadhatja. Lásd a Delta-tábla tulajdonságaira vonatkozó referenciát.
Feljegyzés
Mindkét tulajdonságot be kell állítania, hogy a táblaelőzmények hosszabb ideig megmaradjanak a gyakori VACUUM
műveleteket tartalmazó táblák esetében. Például a 30 napos előzményadatok eléréséhez állítsa be delta.deletedFileRetentionDuration = "interval 30 days"
(amely megfelel az alapértelmezett beállításnak delta.logRetentionDuration
).
Az adatmegőrzési küszöbérték növelése a tárolási költségek emelkedését okozhatja, mivel a rendszer több adatfájlt tart fenn.
Delta-tábla visszaállítása korábbi állapotba
A parancs használatával RESTORE
visszaállíthatja a Delta-táblák korábbi állapotát. A Delta-táblák belsőleg megőrzik a tábla korábbi verzióit, amelyek lehetővé teszik a korábbi állapot visszaállítását.
A parancs a korábbi állapotnak megfelelő verziót vagy a korábbi állapot létrehozásának időbélyegét támogatja.RESTORE
Fontos
- Visszaállíthat egy már visszaállított táblát.
- A klónozott táblák visszaállíthatók.
- Engedéllyel kell rendelkeznie
MODIFY
a visszaállítandó táblára. - A táblák nem állíthatók vissza olyan régebbi verzióra, ahol az adatfájlokat manuálisan vagy a program törölte
vacuum
. Ennek a verziónak a részleges visszaállítása akkor is lehetséges, haspark.sql.files.ignoreMissingFiles
be van állítva.true
- A korábbi állapotba való visszaállítás időbélyegének formátuma a következő
yyyy-MM-dd HH:mm:ss
: . A csak dátum(yyyy-MM-dd
) sztring megadása is támogatott.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
A szintaxis részleteiért lásd: RESTORE.
Fontos
A visszaállítás adatmódosítási műveletnek minősül. A parancs által hozzáadott Delta Lake-naplóbejegyzések a RESTORE
dataChange igaz értékre vannak állítva. Ha van egy alárendelt alkalmazás, például egy Strukturált streamelési feladat, amely egy Delta Lake-tábla frissítéseit dolgozza fel, a visszaállítási művelet által hozzáadott adatváltozási naplóbejegyzések új adatfrissítéseknek minősülnek, és a feldolgozásuk duplikált adatokat eredményezhet.
Példa:
Táblaverzió | Művelet | Változásnapló-frissítések | Adatváltozási naplófrissítések rekordjai |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (név = Viktor, kor = 29, (név = György, kor = 55) |
0 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (név = György, kor = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Nincs olyan rekord, amely az Optimalizálási tömörítés funkcióval nem módosítja a táblában lévő adatokat) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (név = Viktor, kor = 29), (név = György, kor = 55), (név = György, kor = 39) |
Az előző példában a parancs olyan RESTORE
frissítéseket eredményez, amelyek a Delta tábla 0- és 1-es verziójának olvasása során már megjelentek. Ha egy streamelési lekérdezés olvasta ezt a táblát, akkor ezeket a fájlokat újonnan hozzáadott adatoknak tekintjük, és újra feldolgozzuk őket.
Metrikák visszaállítása
RESTORE
a művelet befejezése után a következő metrikákat egyetlen sorú DataFrame-adatkeretként jelenti:
table_size_after_restore
: A tábla mérete a visszaállítás után.num_of_files_after_restore
: A táblában lévő fájlok száma a visszaállítás után.num_removed_files
: A táblából eltávolított (logikailag törölt) fájlok száma.num_restored_files
: A visszaállítás miatt visszaállított fájlok száma.removed_files_size
: A táblából eltávolított fájlok teljes mérete bájtban.restored_files_size
: A visszaállított fájlok teljes mérete bájtban.
Példák a Delta Lake időutazás használatára
A véletlen törlések kijavítása a felhasználó
111
tábláiban:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Egy tábla véletlen helytelen frissítésének javítása:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Az elmúlt héten hozzáadott új ügyfelek számának lekérdezése.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Hogyan megkeresni az utolsó véglegesítés verzióját a Spark-munkamenetben?
Ha le szeretné kérdezni az aktuális SparkSession
véglegesítés verziószámát az összes szálon és táblán, kérje le az SQL-konfigurációt spark.databricks.delta.lastCommitVersionInSession
.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Ha a kulcs nem véglegesítést hajtott végre SparkSession
, a kulcs lekérdezése üres értéket ad vissza.
Feljegyzés
Ha ugyanazt SparkSession
több szálon is megosztja, az hasonlít egy változó több szálon való megosztásához; a konfigurációs érték egyidejű frissítésével versenyhelyzetbe ütközhet.