Praca z historią tabel usługi Delta Lake
Każda operacja modyfikującą tabelę usługi Delta Lake tworzy nową wersję tabeli. Informacje o historii umożliwiają przeprowadzanie inspekcji operacji lub wykonywanie zapytań względem tabeli w określonym punkcie w czasie.
Pobieranie historii tabeli usługi Delta
Możesz pobrać informacje o operacjach, użytkowniku, znaczniku czasu itd. dla każdego zapisu w tabeli delty, uruchamiając history
polecenie . Operacje są zwracane w odwrotnej kolejności chronologicznej. Domyślnie historia tabeli jest zachowywana przez 30 dni.
DESCRIBE HISTORY '/data/events/' -- get the full history of the table
DESCRIBE HISTORY delta.`/data/events/`
DESCRIBE HISTORY '/data/events/' LIMIT 1 -- get the last operation only
DESCRIBE HISTORY eventsTable
Aby uzyskać szczegółowe informacje o składni języka Spark SQL, zobacz OPIS HISTORII.
Zapoznaj się z dokumentacją interfejsu API usługi Delta Lake , aby uzyskać szczegółowe informacje o składni języka Scala/Java/Python.
Data Explorer udostępnia wizualny widok tych szczegółowych informacji o tabeli i historii tabel różnicowych. Oprócz schematu tabeli i przykładowych danych możesz kliknąć kartę Historia , aby wyświetlić historię tabel, która jest wyświetlana za pomocą polecenia DESCRIBE HISTORY
.
Schemat historii
Dane wyjściowe history
operacji zawierają następujące kolumny.
Kolumna | Typ | Opis |
---|---|---|
Wersja | długi | Wersja tabeli wygenerowana przez operację. |
sygnatura czasowa | sygnatura czasowa | Po zatwierdzeniu tej wersji. |
userId | ciąg | Identyfikator użytkownika, który uruchomił operację. |
userName | ciąg | Nazwa użytkownika, który uruchomił operację. |
operation | ciąg | Nazwa operacji. |
operationParameters | map (mapa) | Parametry operacji (na przykład predykaty). |
zadanie | struktura | Szczegóły zadania, które uruchomiły operację. |
notes | struktura | Szczegóły notesu, z którego została uruchomiona operacja. |
clusterId | ciąg | Identyfikator klastra, na którym uruchomiono operację. |
readVersion | długi | Wersja tabeli odczytanej w celu wykonania operacji zapisu. |
Isolationlevel | ciąg | Poziom izolacji używany dla tej operacji. |
isBlindAppend | boolean | Czy ta operacja dołączała dane. |
operationMetrics | map (mapa) | Metryki operacji (na przykład liczba wierszy i zmodyfikowanych plików). |
userMetadata | ciąg | Metadane zatwierdzenia zdefiniowane przez użytkownika, jeśli zostały określone |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Uwaga
- Niektóre z pozostałych kolumn nie są dostępne, jeśli zapisujesz w tabeli delty przy użyciu następujących metod:
- Kolumny dodane w przyszłości będą zawsze dodawane po ostatniej kolumnie.
Klucze metryk operacji
Operacja history
zwraca kolekcję metryk operacji na mapie operationMetrics
kolumn.
W poniższych tabelach wymieniono definicje klucza mapy według operacji.
Operacja | Nazwa metryki | Opis |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO | ||
numFiles | Liczba zapisanych plików. | |
numOutputBytes | Rozmiar w bajtach zapisanej zawartości. | |
numOutputRows | Liczba zapisanych wierszy. | |
AKTUALIZACJA PRZESYŁANIA STRUMIENIOWEGO | ||
numAddedFiles | Liczba dodanych plików. | |
numRemovedFiles | Liczba usuniętych plików. | |
numOutputRows | Liczba zapisanych wierszy. | |
numOutputBytes | Rozmiar zapisu w bajtach. | |
DELETE | ||
numAddedFiles | Liczba dodanych plików. Nie podano, gdy partycje tabeli zostaną usunięte. | |
numRemovedFiles | Liczba usuniętych plików. | |
numDeletedRows | Liczba usuniętych wierszy. Nie podano, gdy partycje tabeli zostaną usunięte. | |
numCopiedRows | Liczba wierszy skopiowanych w procesie usuwania plików. | |
executionTimeMs | Czas potrzebny na wykonanie całej operacji. | |
scanTimeMs | Czas potrzebny na skanowanie plików pod kątem dopasowań. | |
rewriteTimeMs | Czas potrzebny na ponowne zapisywanie pasowanych plików. | |
OBCIĄĆ | ||
numRemovedFiles | Liczba usuniętych plików. | |
executionTimeMs | Czas potrzebny na wykonanie całej operacji. | |
SCALANIA | ||
numSourceRows | Liczba wierszy w źródłowej ramce danych. | |
numTargetRowsInserted | Liczba wierszy wstawionych do tabeli docelowej. | |
numTargetRowsUpdated | Liczba wierszy zaktualizowanych w tabeli docelowej. | |
numTargetRowsDeleted | Liczba wierszy usuniętych w tabeli docelowej. | |
numTargetRows Skopiowane | Liczba skopiowanych wierszy docelowych. | |
numOutputRows | Całkowita liczba zapisanych wierszy. | |
numTargetFilesAdded | Liczba plików dodanych do ujścia (elementu docelowego). | |
numTargetFilesRemoved | Liczba plików usuniętych z ujścia(elementu docelowego). | |
executionTimeMs | Czas potrzebny na wykonanie całej operacji. | |
scanTimeMs | Czas potrzebny na skanowanie plików pod kątem dopasowań. | |
rewriteTimeMs | Czas potrzebny na ponowne zapisywanie pasowanych plików. | |
UPDATE | ||
numAddedFiles | Liczba dodanych plików. | |
numRemovedFiles | Liczba usuniętych plików. | |
numUpdatedRows | Liczba zaktualizowanych wierszy. | |
numCopiedRows | Liczba wierszy skopiowanych właśnie w procesie aktualizowania plików. | |
executionTimeMs | Czas potrzebny na wykonanie całej operacji. | |
scanTimeMs | Czas potrzebny na skanowanie plików pod kątem dopasowań. | |
rewriteTimeMs | Czas potrzebny na ponowne zapisywanie pasowanych plików. | |
FSCK | numRemovedFiles | Liczba usuniętych plików. |
CONVERT | numConvertedFiles | Liczba przekonwertowanych plików Parquet. |
OPTIMIZE | ||
numAddedFiles | Liczba dodanych plików. | |
numRemovedFiles | Liczba zoptymalizowanych plików. | |
numAddedBytes | Liczba bajtów dodanych po zoptymalizowaniu tabeli. | |
numRemovedBytes | Liczba usuniętych bajtów. | |
minFileSize | Rozmiar najmniejszego pliku po zoptymalizowaniu tabeli. | |
p25FileSize | Rozmiar pliku 25 percentyla po zoptymalizowaniu tabeli. | |
p50FileSize | Mediana rozmiaru pliku po zoptymalizowaniu tabeli. | |
p75FileSize | Rozmiar pliku 75 percentyla po zoptymalizowaniu tabeli. | |
Maxfilesize | Rozmiar największego pliku po zoptymalizowaniu tabeli. | |
KLON | ||
sourceTableSize | Rozmiar w bajtach tabeli źródłowej w sklonowanej wersji. | |
sourceNumOfFiles | Liczba plików w tabeli źródłowej w sklonowanej wersji. | |
numRemovedFiles | Liczba plików usuniętych z tabeli docelowej, jeśli poprzednia tabela delta została zamieniona. | |
removedFilesSize | Całkowity rozmiar w bajtach plików usuniętych z tabeli docelowej, jeśli poprzednia tabela delta została zamieniona. | |
numCopiedFiles | Liczba plików skopiowanych do nowej lokalizacji. 0 dla płytkich klonów. | |
copiedFilesSize | Łączny rozmiar w bajtach plików skopiowanych do nowej lokalizacji. 0 dla płytkich klonów. | |
PRZYWRÓCIĆ | ||
tableSizeAfterRestore | Rozmiar tabeli w bajtach po przywróceniu. | |
numOfFilesAfterRestore | Liczba plików w tabeli po przywróceniu. | |
numRemovedFiles | Liczba plików usuniętych przez operację przywracania. | |
numRestoredFiles | Liczba plików, które zostały dodane w wyniku przywracania. | |
removedFilesSize | Rozmiar w bajtach plików usuniętych przez przywracanie. | |
restoredFilesSize | Rozmiar w bajtach plików dodanych przez przywracanie. | |
VACUUM | ||
numDeletedFiles | Liczba usuniętych plików. | |
numVacuumedDirectories | Liczba katalogów opróżnionych. | |
numFilesToDelete | Liczba plików do usunięcia. |
Wykonywanie zapytań względem starszej migawki tabeli (podróż czasowa)
Podróż czasowa usługi Delta Lake umożliwia wykonywanie zapytań względem starszej migawki tabeli delty. Podróże czasowe mają wiele przypadków użycia, w tym:
- Ponowne tworzenie analiz, raportów lub danych wyjściowych (na przykład danych wyjściowych modelu uczenia maszynowego). Może to być przydatne w przypadku debugowania lub inspekcji, zwłaszcza w branżach regulowanych.
- Pisanie złożonych zapytań czasowych.
- Naprawianie błędów w danych.
- Zapewnianie izolacji migawek dla zestawu zapytań dotyczących szybko zmieniających się tabel.
Składnia podróży czasowej usługi Delta Lake
Usługa Delta Lake obsługuje wykonywanie zapytań dotyczących poprzednich wersji tabeli na podstawie sygnatury czasowej lub wersji tabeli (zgodnie z zapisem w dzienniku transakcji).
timestamp_expression
może być dowolną z:'2018-10-18T22:15:12.013Z'
, czyli ciąg, który można rzutować do znacznika czasucast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, czyli ciąg datycurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Dowolne inne wyrażenie, które jest lub można rzutować do znacznika czasu
version
to długa wartość, którą można uzyskać z danych wyjściowych elementuDESCRIBE HISTORY table_spec
.
Ani timestamp_expression
version
nie może być podquery.
Akceptowane są tylko ciągi daty lub znacznika czasu. Na przykład "2019-01-01"
i "2019-01-01T00:00:00.000Z"
. Zobacz następujący kod, aby uzyskać przykład składnię:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")
Składnia służy również @
do określania sygnatury czasowej lub wersji jako części nazwy tabeli. Sygnatura czasowa musi być w yyyyMMddHHmmssSSS
formacie. Możesz określić wersję po @
, poprzedzając v
element do wersji. Zobacz następujący kod, aby uzyskać przykład składnię:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")
Co to są punkty kontrolne dziennika transakcji?
Usługa Delta Lake rejestruje wersje tabel jako pliki JSON w _delta_log
katalogu, które są przechowywane obok danych tabeli. Aby zoptymalizować wykonywanie zapytań dotyczących punktów kontrolnych, usługa Delta Lake agreguje wersje tabel do plików punktów kontrolnych Parquet, co uniemożliwia odczytywanie wszystkich wersji historii tabel w formacie JSON. Usługa Azure Databricks optymalizuje częstotliwość punktów kontrolnych dla rozmiaru i obciążenia danych. Użytkownicy nie powinni bezpośrednio korzystać z punktów kontrolnych. Częstotliwość punktów kontrolnych może ulec zmianie bez powiadomienia.
Konfigurowanie przechowywania danych na potrzeby podróży czasowych
Aby można było przeprowadzić podróż do poprzedniej wersji, należy zachować zarówno dziennik, jak i pliki danych dla tej wersji.
Pliki danych kopii zapasowej tabeli delty nigdy nie są usuwane automatycznie; pliki danych są usuwane tylko wtedy, gdy uruchamiasz program VACUUM. VACUUM
nie usuwa plików dziennika delty; pliki dziennika są automatycznie czyszczone po zapisaniu punktów kontrolnych.
Domyślnie możesz przejść do tabeli delta do 30 dni, chyba że masz:
- Uruchom
VACUUM
polecenie w tabeli delta. - Zmieniono okresy przechowywania danych lub plików dziennika przy użyciu następujących właściwości tabeli:
delta.logRetentionDuration = "interval <interval>"
: określa, jak długo jest przechowywana historia tabeli. Wartość domyślna tointerval 30 days
.Za każdym razem, gdy punkt kontrolny jest zapisywany, usługa Azure Databricks automatycznie czyści wpisy dziennika starsze niż interwał przechowywania. Jeśli ustawisz tę konfigurację na wystarczająco dużą wartość, wiele wpisów dziennika zostanie zachowanych. Nie powinno to wpływać na wydajność, ponieważ operacje względem dziennika są stałe. Operacje na historii są równoległe, ale staną się droższe w miarę wzrostu rozmiaru dziennika.
delta.deletedFileRetentionDuration = "interval <interval>"
: określa, jak dawno temu plik musiał zostać usunięty przed kandydatem.VACUUM
Wartość domyślna tointerval 7 days
.Aby uzyskać dostęp do 30 dni danych historycznych, nawet jeśli uruchomisz polecenie
VACUUM
w tabeli delta, ustaw wartośćdelta.deletedFileRetentionDuration = "interval 30 days"
. To ustawienie może spowodować, że koszty magazynowania będą się prowadzić do góry.
Przywracanie tabeli delty do wcześniejszego stanu
Uwaga
Dostępne w środowisku Databricks Runtime 7.4 lub nowszym.
Tabelę delty można przywrócić do wcześniejszego RESTORE
stanu przy użyciu polecenia . Tabela różnicowa wewnętrznie przechowuje historyczne wersje tabeli, które umożliwiają przywrócenie jej do wcześniejszego stanu.
Wersja odpowiadająca wcześniejszemu stanowi lub znacznikowi czasu utworzenia wcześniejszego stanu jest obsługiwana jako opcje za pomocą RESTORE
polecenia .
Ważne
- Możesz przywrócić już przywróconą tabelę.
- Sklonowaną tabelę można przywrócić.
- Musisz mieć
MODIFY
uprawnienia do przywracanej tabeli. - Przywracanie tabeli do starszej wersji, w której pliki danych zostały usunięte ręcznie lub przez
vacuum
program zakończy się niepowodzeniem. Przywracanie do tej wersji częściowo jest możliwe, jeślispark.sql.files.ignoreMissingFiles
ustawiono wartośćtrue
. - Format znacznika czasu przywracania do wcześniejszego stanu to
yyyy-MM-dd HH:mm:ss
. Udostępnianie tylko ciągu date(yyyy-MM-dd
) jest również obsługiwane.
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>
Aby uzyskać szczegółowe informacje o składni, zobacz PRZYWRACANIE.
Ważne
Przywracanie jest uznawane za operację zmiany danych. Wpisy dziennika usługi Delta Lake dodane przez RESTORE
polecenie zawierają wartość dataChange ustawioną na wartość true. Jeśli istnieje aplikacja podrzędna, taka jak zadanie przesyłania strumieniowego ze strukturą , które przetwarza aktualizacje tabeli usługi Delta Lake, wpisy dziennika zmian danych dodane przez operację przywracania są traktowane jako nowe aktualizacje danych i przetwarzanie może spowodować zduplikowanie danych.
Na przykład:
Wersja tabeli | Operacja | Aktualizacje dziennika różnicowego | Rekordy w aktualizacjach dziennika zmian danych |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Brak rekordów, ponieważ optymalizowanie kompaktacji nie zmienia danych w tabeli) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
W poprzednim przykładzie RESTORE
polecenie powoduje wyświetlenie aktualizacji, które były już widoczne podczas odczytywania tabeli delta w wersji 0 i 1. Jeśli zapytanie przesyłane strumieniowo odczytuje tę tabelę, te pliki będą traktowane jako nowo dodane dane i będą przetwarzane ponownie.
Przywracanie metryk
Uwaga
Dostępne w środowisku Databricks Runtime 8.2 lub nowszym.
RESTORE
raportuje następujące metryki jako ramkę danych z jednym wierszem po zakończeniu operacji:
table_size_after_restore
: rozmiar tabeli po przywróceniu.num_of_files_after_restore
: liczba plików w tabeli po przywróceniu.num_removed_files
: liczba plików usuniętych (logicznie usuniętych) z tabeli.num_restored_files
: liczba przywróconych plików z powodu wycofywania.removed_files_size
: całkowity rozmiar w bajtach plików, które są usuwane z tabeli.restored_files_size
: całkowity rozmiar w bajtach przywróconych plików.
Przykłady korzystania z podróży czasowej usługi Delta Lake
Napraw przypadkowe usunięcie tabeli dla użytkownika
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Napraw przypadkowe nieprawidłowe aktualizacje tabeli:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Wykonaj zapytanie dotyczące liczby nowych klientów dodanych w ciągu ostatniego tygodnia.
SELECT count(distinct userId) - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Jak mogę znaleźć wersję ostatniego zatwierdzenia w sesji platformy Spark?
Aby uzyskać numer wersji ostatniego zatwierdzenia zapisanego przez bieżące SparkSession
we wszystkich wątkach i wszystkich tabelach, wykonaj zapytanie dotyczące konfiguracji spark.databricks.delta.lastCommitVersionInSession
SQL .
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Jeśli żadne zatwierdzenia nie zostały dokonane przez SparkSession
element , zapytanie o klucz zwraca pustą wartość.
Uwaga
Jeśli współużytkujesz to samo SparkSession
w wielu wątkach, jest ona podobna do udostępniania zmiennej w wielu wątkach; warunki wyścigu mogą być osiągane, ponieważ wartość konfiguracji jest aktualizowana współbieżnie.