Praca z historią tabel usługi Delta Lake
Każda operacja modyfikującą tabelę usługi Delta Lake tworzy nową wersję tabeli. Informacje o historii umożliwiają przeprowadzanie inspekcji operacji, wycofywanie tabeli lub wykonywanie zapytań względem tabeli w określonym punkcie w czasie przy użyciu podróży czasowej.
Uwaga
Rozwiązanie Databricks nie zaleca używania historii tabel Delta Lake jako długoterminowego rozwiązania do tworzenia kopii zapasowych w celu archiwizacji danych. Platforma Databricks zaleca używanie tylko ostatnich 7 dni dla operacji przechodzenia w czasie, chyba że konfiguracje przechowywania danych i dzienników ustawiono na większą wartość.
Pobieranie historii tabeli usługi Delta
Informacje, takie jak operacje, użytkownik i sygnatura czasowa dla każdego zapisu w tabeli delty, można pobrać, uruchamiając history
polecenie . Operacje są zwracane w odwrotnej kolejności chronologicznej.
Przechowywanie historii tabel jest określane przez ustawienie delta.logRetentionDuration
tabeli , czyli domyślnie 30 dni.
Uwaga
Przechodzenie w czasie i historia tabeli są kontrolowane przez różne progi przechowywania. Zobacz Co to jest przechodzenia w czasie na platformie Delta Lake?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Aby uzyskać szczegółowe informacje o składni spark SQL, zobacz OPIS HISTORII.
Zobacz dokumentację interfejsu API usługi Delta Lake, aby uzyskać szczegółowe informacje o składni języka Scala/Java/Python.
Eksplorator wykazu udostępnia wizualny widok tych szczegółowych informacji o tabeli i historii tabel różnicowych. Oprócz schematu tabeli i przykładowych danych możesz kliknąć kartę Historia , aby wyświetlić historię tabel, która jest wyświetlana za pomocą DESCRIBE HISTORY
polecenia .
Schemat historii
Dane wyjściowe history
operacji zawierają następujące kolumny.
Kolumna | Type | Opis |
---|---|---|
version | długi | Wersja tabeli wygenerowana przez operację. |
timestamp | timestamp | Gdy ta wersja została zatwierdzona. |
Identyfikator użytkownika | string | Identyfikator użytkownika, który uruchomił operację. |
userName | string | Nazwa użytkownika, który uruchomił operację. |
rozdzielnicy | string | Nazwa operacji. |
operationParameters | map | Parametry operacji (na przykład predykaty). |
zadanie | struktura | Szczegóły zadania, w ramach którego uruchomiono operację. |
notes | struktura | Szczegóły notesu, z którego została uruchomiona operacja. |
clusterId | string | Identyfikator klastra, na którym uruchomiono operację. |
readVersion | długi | Wersja tabeli odczytanej w celu wykonania operacji zapisu. |
isolationLevel | string | Poziom izolacji używany dla tej operacji. |
isBlindAppend | boolean | Czy ta operacja dołączała dane. |
operationMetrics | map | Metryki operacji (na przykład liczba wierszy i plików zmodyfikowanych). |
userMetadata | string | Metadane zatwierdzenia zdefiniowane przez użytkownika, jeśli zostały określone |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Uwaga
- Niektóre inne kolumny nie są dostępne, jeśli zapisujesz w tabeli delty przy użyciu następujących metod:
- Kolumny dodane w przyszłości będą zawsze dodawane po ostatniej kolumnie.
Klucze metryk operacji
Operacja history
zwraca kolekcję metryk operacji na mapie operationMetrics
kolumn.
W poniższych tabelach wymieniono definicje kluczy mapy według operacji.
Operacja | Nazwa metryki | opis |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO | ||
numFiles | Liczba zapisanych plików. | |
numOutputBytes | Rozmiar w bajtach zapisanej zawartości. | |
numOutputRows | Liczba zapisanych wierszy. | |
AKTUALIZACJA PRZESYŁANIA STRUMIENIOWEGO | ||
numAddedFiles | Liczba dodanych plików. | |
numRemovedFiles | Liczba usuniętych plików. | |
numOutputRows | Liczba zapisanych wierszy. | |
numOutputBytes | Rozmiar zapisu w bajtach. | |
DELETE | ||
numAddedFiles | Liczba dodanych plików. Nie podano, gdy partycje tabeli są usuwane. | |
numRemovedFiles | Liczba usuniętych plików. | |
numDeletedRows | Liczba usuniętych wierszy. Nie podano, gdy partycje tabeli są usuwane. | |
numCopiedRows | Liczba wierszy skopiowanych w procesie usuwania plików. | |
executionTimeMs | Czas potrzebny na wykonanie całej operacji. | |
scanTimeMs | Czas potrzebny na skanowanie plików pod kątem dopasowań. | |
rewriteTimeMs | Czas potrzebny na ponowne zapisywanie dopasowanych plików. | |
OBCIĄĆ | ||
numRemovedFiles | Liczba usuniętych plików. | |
executionTimeMs | Czas potrzebny na wykonanie całej operacji. | |
POŁĄCZYĆ | ||
numSourceRows | Liczba wierszy w źródłowej ramce danych. | |
numTargetRowsInserted | Liczba wierszy wstawionych do tabeli docelowej. | |
numTargetRowsUpdated | Liczba wierszy zaktualizowanych w tabeli docelowej. | |
numTargetRowsDeleted | Liczba wierszy usuniętych w tabeli docelowej. | |
numTargetRows Skopiowane | Liczba skopiowanych wierszy docelowych. | |
numOutputRows | Całkowita liczba wypisana wierszy. | |
numTargetFilesAdded | Liczba plików dodanych do ujścia(elementu docelowego). | |
numTargetFilesRemoved | Liczba plików usuniętych z ujścia(elementu docelowego). | |
executionTimeMs | Czas potrzebny na wykonanie całej operacji. | |
scanTimeMs | Czas potrzebny na skanowanie plików pod kątem dopasowań. | |
rewriteTimeMs | Czas potrzebny na ponowne zapisywanie dopasowanych plików. | |
UPDATE | ||
numAddedFiles | Liczba dodanych plików. | |
numRemovedFiles | Liczba usuniętych plików. | |
numUpdatedRows | Liczba zaktualizowanych wierszy. | |
numCopiedRows | Liczba wierszy właśnie skopiowanych w procesie aktualizowania plików. | |
executionTimeMs | Czas potrzebny na wykonanie całej operacji. | |
scanTimeMs | Czas potrzebny na skanowanie plików pod kątem dopasowań. | |
rewriteTimeMs | Czas potrzebny na ponowne zapisywanie dopasowanych plików. | |
FSCK | numRemovedFiles | Liczba usuniętych plików. |
NAWRÓCIĆ | numConvertedFiles | Liczba przekonwertowanych plików Parquet. |
OPTIMIZE | ||
numAddedFiles | Liczba dodanych plików. | |
numRemovedFiles | Liczba zoptymalizowanych plików. | |
numAddedBytes | Liczba bajtów dodanych po zoptymalizowaniu tabeli. | |
numRemovedBytes | Liczba usuniętych bajtów. | |
minFileSize | Rozmiar najmniejszego pliku po zoptymalizowaniu tabeli. | |
p25FileSize | Rozmiar 25. pliku percentyla po zoptymalizowaniu tabeli. | |
p50FileSize | Mediana rozmiaru pliku po zoptymalizowaniu tabeli. | |
p75FileSize | Rozmiar pliku 75. percentyla po zoptymalizowaniu tabeli. | |
maxFileSize | Rozmiar największego pliku po zoptymalizowaniu tabeli. | |
CLONE | ||
sourceTableSize | Rozmiar w bajtach tabeli źródłowej w sklonowanej wersji. | |
sourceNumOfFiles | Liczba plików w tabeli źródłowej w sklonowanej wersji. | |
numRemovedFiles | Liczba plików usuniętych z tabeli docelowej, jeśli poprzednia tabela delty została zamieniona. | |
removedFilesSize | Całkowity rozmiar w bajtach plików usuniętych z tabeli docelowej, jeśli poprzednia tabela delty została zamieniona. | |
numCopiedFiles | Liczba plików skopiowanych do nowej lokalizacji. 0 dla płytkich klonów. | |
copiedFilesSize | Całkowity rozmiar w bajtach plików, które zostały skopiowane do nowej lokalizacji. 0 dla płytkich klonów. | |
PRZYWRÓCIĆ | ||
tableSizeAfterRestore | Rozmiar tabeli w bajtach po przywróceniu. | |
numOfFilesAfterRestore | Liczba plików w tabeli po przywróceniu. | |
numRemovedFiles | Liczba plików usuniętych przez operację przywracania. | |
numRestoredFiles | Liczba plików, które zostały dodane w wyniku przywracania. | |
removedFilesSize | Rozmiar w bajtach plików usuniętych przez przywracanie. | |
restoredFilesSize | Rozmiar w bajtach plików dodanych przez przywracanie. | |
VACUUM | ||
numDeletedFiles | Liczba usuniętych plików. | |
numVacuumedDirectories | Liczba katalogów opróżnionych. | |
numFilesToDelete | Liczba plików do usunięcia. |
Co to jest podróż w czasie usługi Delta Lake?
Podróż czasowa usługi Delta Lake obsługuje wykonywanie zapytań dotyczących poprzednich wersji tabeli na podstawie sygnatury czasowej lub wersji tabeli (zarejestrowanej w dzienniku transakcji). Możesz użyć podróży w czasie dla aplikacji, takich jak:
- Ponowne tworzenie analiz, raportów lub danych wyjściowych (na przykład danych wyjściowych modelu uczenia maszynowego). Może to być przydatne w przypadku debugowania lub inspekcji, zwłaszcza w branżach regulowanych.
- Pisanie złożonych zapytań czasowych.
- Naprawianie błędów w danych.
- Zapewnienie izolacji migawek dla zestawu zapytań dotyczących szybko zmieniających się tabel.
Ważne
Wersje tabel dostępne z podróżą czasową są określane przez kombinację progu przechowywania dla plików dziennika transakcji oraz częstotliwości i określonego przechowywania dla VACUUM
operacji. Jeśli uruchamiasz VACUUM
codziennie z wartościami domyślnymi, 7 dni danych jest dostępnych na potrzeby podróży w czasie.
Składnia przechodzenia w czasie na poziomie Delta
Kwerenda tabeli delty z podróżą czasową przez dodanie klauzuli po specyfikacji nazwy tabeli.
timestamp_expression
może być jednym z:'2018-10-18T22:15:12.013Z'
, czyli ciąg, który można rzutować na znacznik czasucast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, czyli ciąg datycurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Dowolne inne wyrażenie, które jest lub można rzutować na znacznik czasu
version
to długa wartość, którą można uzyskać z danych wyjściowych elementuDESCRIBE HISTORY table_spec
.
Ani nie timestamp_expression
version
może być podzapytaniem.
Akceptowane są tylko ciągi daty lub znacznika czasu. Przykład: "2019-01-01"
i "2019-01-01T00:00:00.000Z"
. Zobacz następujący kod, aby zapoznać się z przykładowym kodem:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
Można również użyć @
składni, aby określić znacznik czasu lub wersję w ramach nazwy tabeli. Znacznik czasu musi być w yyyyMMddHHmmssSSS
formacie. Możesz określić wersję po @
, poprzedzając v
element do wersji. Zobacz następujący kod, aby zapoznać się z przykładowym kodem:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
Co to są punkty kontrolne dziennika transakcji?
Usługa Delta Lake rejestruje wersje tabel jako pliki JSON w _delta_log
katalogu, które są przechowywane wraz z danymi tabeli. Aby zoptymalizować wykonywanie zapytań dotyczących punktów kontrolnych, usługa Delta Lake agreguje wersje tabel do plików punktów kontrolnych Parquet, co uniemożliwia odczytywanie wszystkich wersji historii tabel w formacie JSON. Usługa Azure Databricks optymalizuje częstotliwość tworzenia punktów kontrolnych pod kątem rozmiaru i obciążenia danych. Użytkownicy nie powinni bezpośrednio korzystać z punktów kontrolnych. Częstotliwość punktów kontrolnych może ulec zmianie bez powiadomienia.
Konfigurowanie przechowywania danych dla zapytań dotyczących podróży w czasie
Aby wykonać zapytanie dotyczące poprzedniej wersji tabeli, należy zachować zarówno dziennik, jak i pliki danych dla tej wersji.
Pliki danych są usuwane podczas VACUUM
uruchamiania względem tabeli. Usługa Delta Lake automatycznie zarządza usuwaniem plików dziennika po wersjach tabeli punktów kontrolnych.
Ponieważ większość tabel delty jest VACUUM
regularnie uruchamiana, zapytania do punktu w czasie powinny uwzględniać próg przechowywania dla VACUUM
wartości , która domyślnie wynosi 7 dni.
Aby zwiększyć próg przechowywania danych dla tabel delty, należy skonfigurować następujące właściwości tabeli:
delta.logRetentionDuration = "interval <interval>"
: określa, jak długo jest przechowywana historia tabeli. Wartość domyślna tointerval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: określa wartość progowąVACUUM
używaną do usuwania plików danych, do których nie odwołuje się już bieżąca wersja tabeli. Wartość domyślna tointerval 7 days
.
Właściwości delty można określić podczas tworzenia tabeli lub ustawić za pomocą instrukcji ALTER TABLE
. Zobacz Informacje o właściwościach tabeli delty.
Uwaga
Obie te właściwości należy ustawić, aby zapewnić, że historia tabeli jest przechowywana przez dłuższy czas dla tabel z częstymi VACUUM
operacjami. Na przykład aby uzyskać dostęp do 30 dni danych historycznych, ustaw wartość delta.deletedFileRetentionDuration = "interval 30 days"
(która jest zgodna z ustawieniem domyślnym dla elementu delta.logRetentionDuration
).
Zwiększenie progu przechowywania danych może spowodować zwiększenie kosztów magazynowania, ponieważ utrzymuje się więcej plików danych.
Przywracanie tabeli delty do wcześniejszego stanu
Tabelę delty można przywrócić do wcześniejszego RESTORE
stanu za pomocą polecenia . Tabela usługi Delta wewnętrznie przechowuje historyczne wersje tabeli, co umożliwia przywrócenie jej do wcześniejszego stanu.
Wersja odpowiadająca wcześniejszemu stanowi lub znacznikowi czasu wskazującemu moment utworzenia wcześniejszego stanu są obsługiwane jako opcje przy użyciu polecenia RESTORE
.
Ważne
- Możesz przywrócić już przywróconą tabelę.
- Możesz przywrócić sklonowaną tabelę.
- Musisz mieć
MODIFY
uprawnienia do przywracanej tabeli. - Nie można przywrócić tabeli do starszej wersji, w której pliki danych zostały usunięte ręcznie lub przez
vacuum
program . Przywracanie do tej wersji częściowo jest nadal możliwe, jeślispark.sql.files.ignoreMissingFiles
jest ustawiona wartośćtrue
. - Format znacznika czasu przywracania do wcześniejszego stanu to
yyyy-MM-dd HH:mm:ss
. Udostępnianie tylko ciągu date(yyyy-MM-dd
) jest również obsługiwane.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Aby uzyskać szczegółowe informacje o składni, zobacz PRZYWRACANIE.
Ważne
Przywracanie jest uznawane za operację zmiany danych. Wpisy dziennika usługi Delta Lake dodane przez RESTORE
polecenie zawierają wartość dataChange ustawioną na true. Jeśli istnieje aplikacja podrzędna, taka jak zadanie przesyłania strumieniowego ze strukturą, które przetwarza aktualizacje tabeli usługi Delta Lake, wpisy dziennika zmian danych dodane przez operację przywracania są traktowane jako nowe aktualizacje danych i przetwarzanie może spowodować zduplikowanie danych.
Na przykład:
Wersja tabeli | Operacja | Aktualizacje dziennika różnicowego | Rekordy w aktualizacjach dziennika zmian danych |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Brak rekordów, ponieważ optymalizacja kompaktowania nie zmienia danych w tabeli) |
3 | RESTORE(wersja=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
W poprzednim przykładzie polecenie powoduje wyświetlenie aktualizacji, RESTORE
które były już widoczne podczas odczytywania tabeli delty w wersji 0 i 1. Jeśli zapytanie przesyłane strumieniowo odczytuje tę tabelę, te pliki będą traktowane jako nowo dodane dane i będą przetwarzane ponownie.
Przywracanie metryk
RESTORE
raportuje następujące metryki jako ramkę danych z pojedynczym wierszem po zakończeniu operacji:
table_size_after_restore
: rozmiar tabeli po przywróceniu.num_of_files_after_restore
: liczba plików w tabeli po przywróceniu.num_removed_files
: liczba plików usuniętych (logicznie usuniętych) z tabeli.num_restored_files
: liczba przywróconych plików z powodu wycofywania.removed_files_size
: całkowity rozmiar w bajtach plików usuniętych z tabeli.restored_files_size
: całkowity rozmiar w bajtach przywróconych plików.
Przykłady korzystania z przechodzenia w czasie usługi Delta Lake
Napraw przypadkowe usunięcie tabeli dla użytkownika
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Napraw przypadkowe nieprawidłowe aktualizacje tabeli:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Wykonaj zapytanie dotyczące liczby nowych klientów dodanych w ciągu ostatniego tygodnia.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Jak mogę znaleźć wersję ostatniego zatwierdzenia w sesji platformy Spark?
Aby uzyskać numer wersji ostatniego zatwierdzenia zapisanego przez bieżący SparkSession
we wszystkich wątkach i wszystkich tabelach, wykonaj zapytanie dotyczące konfiguracji spark.databricks.delta.lastCommitVersionInSession
SQL .
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Jeśli żadne zatwierdzenia nie zostały wykonane przez SparkSession
element , zapytanie dotyczące klucza zwraca pustą wartość.
Uwaga
Jeśli współużytkujesz to samo SparkSession
w wielu wątkach, jest ona podobna do udostępniania zmiennej w wielu wątkach. Możesz trafić w warunki wyścigu, ponieważ wartość konfiguracji jest aktualizowana współbieżnie.