Praca z historią tabel usługi Delta Lake

Każda operacja modyfikującą tabelę usługi Delta Lake tworzy nową wersję tabeli. Informacje o historii umożliwiają przeprowadzanie inspekcji operacji lub wykonywanie zapytań względem tabeli w określonym punkcie w czasie.

Pobieranie historii tabeli usługi Delta

Możesz pobrać informacje o operacjach, użytkowniku, znaczniku czasu itd. dla każdego zapisu w tabeli delty, uruchamiając history polecenie . Operacje są zwracane w odwrotnej kolejności chronologicznej. Domyślnie historia tabeli jest zachowywana przez 30 dni.

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

Aby uzyskać szczegółowe informacje o składni języka Spark SQL, zobacz OPIS HISTORII.

Zapoznaj się z dokumentacją interfejsu API usługi Delta Lake , aby uzyskać szczegółowe informacje o składni języka Scala/Java/Python.

Data Explorer udostępnia wizualny widok tych szczegółowych informacji o tabeli i historii tabel różnicowych. Oprócz schematu tabeli i przykładowych danych możesz kliknąć kartę Historia , aby wyświetlić historię tabel, która jest wyświetlana za pomocą polecenia DESCRIBE HISTORY.

Schemat historii

Dane wyjściowe history operacji zawierają następujące kolumny.

Kolumna Typ Opis
Wersja długi Wersja tabeli wygenerowana przez operację.
sygnatura czasowa sygnatura czasowa Po zatwierdzeniu tej wersji.
userId ciąg Identyfikator użytkownika, który uruchomił operację.
userName ciąg Nazwa użytkownika, który uruchomił operację.
operation ciąg Nazwa operacji.
operationParameters map (mapa) Parametry operacji (na przykład predykaty).
zadanie struktura Szczegóły zadania, które uruchomiły operację.
notes struktura Szczegóły notesu, z którego została uruchomiona operacja.
clusterId ciąg Identyfikator klastra, na którym uruchomiono operację.
readVersion długi Wersja tabeli odczytanej w celu wykonania operacji zapisu.
Isolationlevel ciąg Poziom izolacji używany dla tej operacji.
isBlindAppend boolean Czy ta operacja dołączała dane.
operationMetrics map (mapa) Metryki operacji (na przykład liczba wierszy i zmodyfikowanych plików).
userMetadata ciąg Metadane zatwierdzenia zdefiniowane przez użytkownika, jeśli zostały określone
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Uwaga

Klucze metryk operacji

Operacja history zwraca kolekcję metryk operacji na mapie operationMetrics kolumn.

W poniższych tabelach wymieniono definicje klucza mapy według operacji.

Operacja Nazwa metryki Opis
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
numFiles Liczba zapisanych plików.
numOutputBytes Rozmiar w bajtach zapisanej zawartości.
numOutputRows Liczba zapisanych wierszy.
AKTUALIZACJA PRZESYŁANIA STRUMIENIOWEGO
numAddedFiles Liczba dodanych plików.
numRemovedFiles Liczba usuniętych plików.
numOutputRows Liczba zapisanych wierszy.
numOutputBytes Rozmiar zapisu w bajtach.
DELETE
numAddedFiles Liczba dodanych plików. Nie podano, gdy partycje tabeli zostaną usunięte.
numRemovedFiles Liczba usuniętych plików.
numDeletedRows Liczba usuniętych wierszy. Nie podano, gdy partycje tabeli zostaną usunięte.
numCopiedRows Liczba wierszy skopiowanych w procesie usuwania plików.
executionTimeMs Czas potrzebny na wykonanie całej operacji.
scanTimeMs Czas potrzebny na skanowanie plików pod kątem dopasowań.
rewriteTimeMs Czas potrzebny na ponowne zapisywanie pasowanych plików.
OBCIĄĆ
numRemovedFiles Liczba usuniętych plików.
executionTimeMs Czas potrzebny na wykonanie całej operacji.
SCALANIA
numSourceRows Liczba wierszy w źródłowej ramce danych.
numTargetRowsInserted Liczba wierszy wstawionych do tabeli docelowej.
numTargetRowsUpdated Liczba wierszy zaktualizowanych w tabeli docelowej.
numTargetRowsDeleted Liczba wierszy usuniętych w tabeli docelowej.
numTargetRows Skopiowane Liczba skopiowanych wierszy docelowych.
numOutputRows Całkowita liczba zapisanych wierszy.
numTargetFilesAdded Liczba plików dodanych do ujścia (elementu docelowego).
numTargetFilesRemoved Liczba plików usuniętych z ujścia(elementu docelowego).
executionTimeMs Czas potrzebny na wykonanie całej operacji.
scanTimeMs Czas potrzebny na skanowanie plików pod kątem dopasowań.
rewriteTimeMs Czas potrzebny na ponowne zapisywanie pasowanych plików.
UPDATE
numAddedFiles Liczba dodanych plików.
numRemovedFiles Liczba usuniętych plików.
numUpdatedRows Liczba zaktualizowanych wierszy.
numCopiedRows Liczba wierszy skopiowanych właśnie w procesie aktualizowania plików.
executionTimeMs Czas potrzebny na wykonanie całej operacji.
scanTimeMs Czas potrzebny na skanowanie plików pod kątem dopasowań.
rewriteTimeMs Czas potrzebny na ponowne zapisywanie pasowanych plików.
FSCK numRemovedFiles Liczba usuniętych plików.
CONVERT numConvertedFiles Liczba przekonwertowanych plików Parquet.
OPTIMIZE
numAddedFiles Liczba dodanych plików.
numRemovedFiles Liczba zoptymalizowanych plików.
numAddedBytes Liczba bajtów dodanych po zoptymalizowaniu tabeli.
numRemovedBytes Liczba usuniętych bajtów.
minFileSize Rozmiar najmniejszego pliku po zoptymalizowaniu tabeli.
p25FileSize Rozmiar pliku 25 percentyla po zoptymalizowaniu tabeli.
p50FileSize Mediana rozmiaru pliku po zoptymalizowaniu tabeli.
p75FileSize Rozmiar pliku 75 percentyla po zoptymalizowaniu tabeli.
Maxfilesize Rozmiar największego pliku po zoptymalizowaniu tabeli.
KLON
sourceTableSize Rozmiar w bajtach tabeli źródłowej w sklonowanej wersji.
sourceNumOfFiles Liczba plików w tabeli źródłowej w sklonowanej wersji.
numRemovedFiles Liczba plików usuniętych z tabeli docelowej, jeśli poprzednia tabela delta została zamieniona.
removedFilesSize Całkowity rozmiar w bajtach plików usuniętych z tabeli docelowej, jeśli poprzednia tabela delta została zamieniona.
numCopiedFiles Liczba plików skopiowanych do nowej lokalizacji. 0 dla płytkich klonów.
copiedFilesSize Łączny rozmiar w bajtach plików skopiowanych do nowej lokalizacji. 0 dla płytkich klonów.
PRZYWRÓCIĆ
tableSizeAfterRestore Rozmiar tabeli w bajtach po przywróceniu.
numOfFilesAfterRestore Liczba plików w tabeli po przywróceniu.
numRemovedFiles Liczba plików usuniętych przez operację przywracania.
numRestoredFiles Liczba plików, które zostały dodane w wyniku przywracania.
removedFilesSize Rozmiar w bajtach plików usuniętych przez przywracanie.
restoredFilesSize Rozmiar w bajtach plików dodanych przez przywracanie.
VACUUM
numDeletedFiles Liczba usuniętych plików.
numVacuumedDirectories Liczba katalogów opróżnionych.
numFilesToDelete Liczba plików do usunięcia.

Wykonywanie zapytań względem starszej migawki tabeli (podróż czasowa)

Podróż czasowa usługi Delta Lake umożliwia wykonywanie zapytań względem starszej migawki tabeli delty. Podróże czasowe mają wiele przypadków użycia, w tym:

  • Ponowne tworzenie analiz, raportów lub danych wyjściowych (na przykład danych wyjściowych modelu uczenia maszynowego). Może to być przydatne w przypadku debugowania lub inspekcji, zwłaszcza w branżach regulowanych.
  • Pisanie złożonych zapytań czasowych.
  • Naprawianie błędów w danych.
  • Zapewnianie izolacji migawek dla zestawu zapytań dotyczących szybko zmieniających się tabel.

Składnia podróży czasowej usługi Delta Lake

Usługa Delta Lake obsługuje wykonywanie zapytań dotyczących poprzednich wersji tabeli na podstawie sygnatury czasowej lub wersji tabeli (zgodnie z zapisem w dzienniku transakcji).

  • timestamp_expression może być dowolną z:
    • '2018-10-18T22:15:12.013Z', czyli ciąg, który można rzutować do znacznika czasu
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', czyli ciąg daty
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Dowolne inne wyrażenie, które jest lub można rzutować do znacznika czasu
  • version to długa wartość, którą można uzyskać z danych wyjściowych elementu DESCRIBE HISTORY table_spec.

Ani timestamp_expressionversion nie może być podquery.

Akceptowane są tylko ciągi daty lub znacznika czasu. Na przykład "2019-01-01" i "2019-01-01T00:00:00.000Z". Zobacz następujący kod, aby uzyskać przykład składnię:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

Składnia służy również @ do określania sygnatury czasowej lub wersji jako części nazwy tabeli. Sygnatura czasowa musi być w yyyyMMddHHmmssSSS formacie. Możesz określić wersję po @ , poprzedzając v element do wersji. Zobacz następujący kod, aby uzyskać przykład składnię:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

Co to są punkty kontrolne dziennika transakcji?

Usługa Delta Lake rejestruje wersje tabel jako pliki JSON w _delta_log katalogu, które są przechowywane obok danych tabeli. Aby zoptymalizować wykonywanie zapytań dotyczących punktów kontrolnych, usługa Delta Lake agreguje wersje tabel do plików punktów kontrolnych Parquet, co uniemożliwia odczytywanie wszystkich wersji historii tabel w formacie JSON. Usługa Azure Databricks optymalizuje częstotliwość punktów kontrolnych dla rozmiaru i obciążenia danych. Użytkownicy nie powinni bezpośrednio korzystać z punktów kontrolnych. Częstotliwość punktów kontrolnych może ulec zmianie bez powiadomienia.

Konfigurowanie przechowywania danych na potrzeby podróży czasowych

Aby można było przeprowadzić podróż do poprzedniej wersji, należy zachować zarówno dziennik, jak i pliki danych dla tej wersji.

Pliki danych kopii zapasowej tabeli delty nigdy nie są usuwane automatycznie; pliki danych są usuwane tylko wtedy, gdy uruchamiasz program VACUUM. VACUUMnie usuwa plików dziennika delty; pliki dziennika są automatycznie czyszczone po zapisaniu punktów kontrolnych.

Domyślnie możesz przejść do tabeli delta do 30 dni, chyba że masz:

  • Uruchom VACUUM polecenie w tabeli delta.
  • Zmieniono okresy przechowywania danych lub plików dziennika przy użyciu następujących właściwości tabeli:
    • delta.logRetentionDuration = "interval <interval>": określa, jak długo jest przechowywana historia tabeli. Wartość domyślna to interval 30 days.

      Za każdym razem, gdy punkt kontrolny jest zapisywany, usługa Azure Databricks automatycznie czyści wpisy dziennika starsze niż interwał przechowywania. Jeśli ustawisz tę konfigurację na wystarczająco dużą wartość, wiele wpisów dziennika zostanie zachowanych. Nie powinno to wpływać na wydajność, ponieważ operacje względem dziennika są stałe. Operacje na historii są równoległe, ale staną się droższe w miarę wzrostu rozmiaru dziennika.

    • delta.deletedFileRetentionDuration = "interval <interval>": określa, jak dawno temu plik musiał zostać usunięty przed kandydatem.VACUUM Wartość domyślna to interval 7 days.

      Aby uzyskać dostęp do 30 dni danych historycznych, nawet jeśli uruchomisz polecenie VACUUM w tabeli delta, ustaw wartość delta.deletedFileRetentionDuration = "interval 30 days". To ustawienie może spowodować, że koszty magazynowania będą się prowadzić do góry.

Przywracanie tabeli delty do wcześniejszego stanu

Uwaga

Dostępne w środowisku Databricks Runtime 7.4 lub nowszym.

Tabelę delty można przywrócić do wcześniejszego RESTORE stanu przy użyciu polecenia . Tabela różnicowa wewnętrznie przechowuje historyczne wersje tabeli, które umożliwiają przywrócenie jej do wcześniejszego stanu. Wersja odpowiadająca wcześniejszemu stanowi lub znacznikowi czasu utworzenia wcześniejszego stanu jest obsługiwana jako opcje za pomocą RESTORE polecenia .

Ważne

  • Możesz przywrócić już przywróconą tabelę.
  • Sklonowaną tabelę można przywrócić.
  • Musisz mieć MODIFY uprawnienia do przywracanej tabeli.
  • Przywracanie tabeli do starszej wersji, w której pliki danych zostały usunięte ręcznie lub przez vacuum program zakończy się niepowodzeniem. Przywracanie do tej wersji częściowo jest możliwe, jeśli spark.sql.files.ignoreMissingFiles ustawiono wartość true.
  • Format znacznika czasu przywracania do wcześniejszego stanu to yyyy-MM-dd HH:mm:ss. Udostępnianie tylko ciągu date(yyyy-MM-dd) jest również obsługiwane.
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

Aby uzyskać szczegółowe informacje o składni, zobacz PRZYWRACANIE.

Ważne

Przywracanie jest uznawane za operację zmiany danych. Wpisy dziennika usługi Delta Lake dodane przez RESTORE polecenie zawierają wartość dataChange ustawioną na wartość true. Jeśli istnieje aplikacja podrzędna, taka jak zadanie przesyłania strumieniowego ze strukturą , które przetwarza aktualizacje tabeli usługi Delta Lake, wpisy dziennika zmian danych dodane przez operację przywracania są traktowane jako nowe aktualizacje danych i przetwarzanie może spowodować zduplikowanie danych.

Na przykład:

Wersja tabeli Operacja Aktualizacje dziennika różnicowego Rekordy w aktualizacjach dziennika zmian danych
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Brak rekordów, ponieważ optymalizowanie kompaktacji nie zmienia danych w tabeli)
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

W poprzednim przykładzie RESTORE polecenie powoduje wyświetlenie aktualizacji, które były już widoczne podczas odczytywania tabeli delta w wersji 0 i 1. Jeśli zapytanie przesyłane strumieniowo odczytuje tę tabelę, te pliki będą traktowane jako nowo dodane dane i będą przetwarzane ponownie.

Przywracanie metryk

Uwaga

Dostępne w środowisku Databricks Runtime 8.2 lub nowszym.

RESTORE raportuje następujące metryki jako ramkę danych z jednym wierszem po zakończeniu operacji:

  • table_size_after_restore: rozmiar tabeli po przywróceniu.

  • num_of_files_after_restore: liczba plików w tabeli po przywróceniu.

  • num_removed_files: liczba plików usuniętych (logicznie usuniętych) z tabeli.

  • num_restored_files: liczba przywróconych plików z powodu wycofywania.

  • removed_files_size: całkowity rozmiar w bajtach plików, które są usuwane z tabeli.

  • restored_files_size: całkowity rozmiar w bajtach przywróconych plików.

    Przykładowe metryki przywracania

Przykłady korzystania z podróży czasowej usługi Delta Lake

  • Napraw przypadkowe usunięcie tabeli dla użytkownika 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Napraw przypadkowe nieprawidłowe aktualizacje tabeli:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Wykonaj zapytanie dotyczące liczby nowych klientów dodanych w ciągu ostatniego tygodnia.

    SELECT count(distinct userId) - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Jak mogę znaleźć wersję ostatniego zatwierdzenia w sesji platformy Spark?

Aby uzyskać numer wersji ostatniego zatwierdzenia zapisanego przez bieżące SparkSession we wszystkich wątkach i wszystkich tabelach, wykonaj zapytanie dotyczące konfiguracji spark.databricks.delta.lastCommitVersionInSessionSQL .

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Jeśli żadne zatwierdzenia nie zostały dokonane przez SparkSessionelement , zapytanie o klucz zwraca pustą wartość.

Uwaga

Jeśli współużytkujesz to samo SparkSession w wielu wątkach, jest ona podobna do udostępniania zmiennej w wielu wątkach; warunki wyścigu mogą być osiągane, ponieważ wartość konfiguracji jest aktualizowana współbieżnie.