Praca z historią tabel usługi Delta Lake

Każda operacja modyfikującą tabelę usługi Delta Lake tworzy nową wersję tabeli. Informacje o historii umożliwiają przeprowadzanie inspekcji operacji, wycofywanie tabeli lub wykonywanie zapytań względem tabeli w określonym punkcie w czasie przy użyciu podróży czasowej.

Uwaga

Rozwiązanie Databricks nie zaleca używania historii tabel Delta Lake jako długoterminowego rozwiązania do tworzenia kopii zapasowych w celu archiwizacji danych. Platforma Databricks zaleca używanie tylko ostatnich 7 dni dla operacji przechodzenia w czasie, chyba że konfiguracje przechowywania danych i dzienników ustawiono na większą wartość.

Pobieranie historii tabeli usługi Delta

Informacje, takie jak operacje, użytkownik i sygnatura czasowa dla każdego zapisu w tabeli delty, można pobrać, uruchamiając history polecenie . Operacje są zwracane w odwrotnej kolejności chronologicznej.

Przechowywanie historii tabel jest określane przez ustawienie delta.logRetentionDurationtabeli , czyli domyślnie 30 dni.

Uwaga

Przechodzenie w czasie i historia tabeli są kontrolowane przez różne progi przechowywania. Zobacz Co to jest przechodzenia w czasie na platformie Delta Lake?.

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

Aby uzyskać szczegółowe informacje o składni spark SQL, zobacz OPIS HISTORII.

Zobacz dokumentację interfejsu API usługi Delta Lake, aby uzyskać szczegółowe informacje o składni języka Scala/Java/Python.

Eksplorator wykazu udostępnia wizualny widok tych szczegółowych informacji o tabeli i historii tabel różnicowych. Oprócz schematu tabeli i przykładowych danych możesz kliknąć kartę Historia , aby wyświetlić historię tabel, która jest wyświetlana za pomocą DESCRIBE HISTORYpolecenia .

Schemat historii

Dane wyjściowe history operacji zawierają następujące kolumny.

Kolumna Type Opis
version długi Wersja tabeli wygenerowana przez operację.
timestamp timestamp Gdy ta wersja została zatwierdzona.
Identyfikator użytkownika string Identyfikator użytkownika, który uruchomił operację.
userName string Nazwa użytkownika, który uruchomił operację.
rozdzielnicy string Nazwa operacji.
operationParameters map Parametry operacji (na przykład predykaty).
zadanie struktura Szczegóły zadania, w ramach którego uruchomiono operację.
notes struktura Szczegóły notesu, z którego została uruchomiona operacja.
clusterId string Identyfikator klastra, na którym uruchomiono operację.
readVersion długi Wersja tabeli odczytanej w celu wykonania operacji zapisu.
Isolationlevel string Poziom izolacji używany dla tej operacji.
isBlindAppend boolean Czy ta operacja dołączała dane.
operationMetrics map Metryki operacji (na przykład liczba wierszy i plików zmodyfikowanych).
userMetadata string Metadane zatwierdzenia zdefiniowane przez użytkownika, jeśli zostały określone
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Uwaga

Klucze metryk operacji

Operacja history zwraca kolekcję metryk operacji na mapie operationMetrics kolumn.

W poniższych tabelach wymieniono definicje kluczy mapy według operacji.

Operacja Nazwa metryki opis
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
numFiles Liczba zapisanych plików.
numOutputBytes Rozmiar w bajtach zapisanej zawartości.
numOutputRows Liczba zapisanych wierszy.
AKTUALIZACJA PRZESYŁANIA STRUMIENIOWEGO
numAddedFiles Liczba dodanych plików.
numRemovedFiles Liczba usuniętych plików.
numOutputRows Liczba zapisanych wierszy.
numOutputBytes Rozmiar zapisu w bajtach.
DELETE
numAddedFiles Liczba dodanych plików. Nie podano, gdy partycje tabeli są usuwane.
numRemovedFiles Liczba usuniętych plików.
numDeletedRows Liczba usuniętych wierszy. Nie podano, gdy partycje tabeli są usuwane.
numCopiedRows Liczba wierszy skopiowanych w procesie usuwania plików.
executionTimeMs Czas potrzebny na wykonanie całej operacji.
scanTimeMs Czas potrzebny na skanowanie plików pod kątem dopasowań.
rewriteTimeMs Czas potrzebny na ponowne zapisywanie dopasowanych plików.
OBCIĄĆ
numRemovedFiles Liczba usuniętych plików.
executionTimeMs Czas potrzebny na wykonanie całej operacji.
SCALANIA
numSourceRows Liczba wierszy w źródłowej ramce danych.
numTargetRowsInserted Liczba wierszy wstawionych do tabeli docelowej.
numTargetRowsUpdated Liczba wierszy zaktualizowanych w tabeli docelowej.
numTargetRowsDeleted Liczba wierszy usuniętych w tabeli docelowej.
numTargetRows Skopiowane Liczba skopiowanych wierszy docelowych.
numOutputRows Całkowita liczba wypisana wierszy.
numTargetFilesAdded Liczba plików dodanych do ujścia(elementu docelowego).
numTargetFilesRemoved Liczba plików usuniętych z ujścia(elementu docelowego).
executionTimeMs Czas potrzebny na wykonanie całej operacji.
scanTimeMs Czas potrzebny na skanowanie plików pod kątem dopasowań.
rewriteTimeMs Czas potrzebny na ponowne zapisywanie dopasowanych plików.
UPDATE
numAddedFiles Liczba dodanych plików.
numRemovedFiles Liczba usuniętych plików.
numUpdatedRows Liczba zaktualizowanych wierszy.
numCopiedRows Liczba wierszy właśnie skopiowanych w procesie aktualizowania plików.
executionTimeMs Czas potrzebny na wykonanie całej operacji.
scanTimeMs Czas potrzebny na skanowanie plików pod kątem dopasowań.
rewriteTimeMs Czas potrzebny na ponowne zapisywanie dopasowanych plików.
FSCK numRemovedFiles Liczba usuniętych plików.
PRZEKONWERTOWAĆ numConvertedFiles Liczba przekonwertowanych plików Parquet.
OPTIMIZE
numAddedFiles Liczba dodanych plików.
numRemovedFiles Liczba zoptymalizowanych plików.
numAddedBytes Liczba bajtów dodanych po zoptymalizowaniu tabeli.
numRemovedBytes Liczba usuniętych bajtów.
minFileSize Rozmiar najmniejszego pliku po zoptymalizowaniu tabeli.
p25FileSize Rozmiar 25. pliku percentyla po zoptymalizowaniu tabeli.
p50FileSize Mediana rozmiaru pliku po zoptymalizowaniu tabeli.
p75FileSize Rozmiar pliku 75. percentyla po zoptymalizowaniu tabeli.
Maxfilesize Rozmiar największego pliku po zoptymalizowaniu tabeli.
CLONE
sourceTableSize Rozmiar w bajtach tabeli źródłowej w sklonowanej wersji.
sourceNumOfFiles Liczba plików w tabeli źródłowej w sklonowanej wersji.
numRemovedFiles Liczba plików usuniętych z tabeli docelowej, jeśli poprzednia tabela delty została zamieniona.
removedFilesSize Całkowity rozmiar w bajtach plików usuniętych z tabeli docelowej, jeśli poprzednia tabela delty została zamieniona.
numCopiedFiles Liczba plików skopiowanych do nowej lokalizacji. 0 dla płytkich klonów.
copiedFilesSize Całkowity rozmiar w bajtach plików, które zostały skopiowane do nowej lokalizacji. 0 dla płytkich klonów.
PRZYWRÓCIĆ
tableSizeAfterRestore Rozmiar tabeli w bajtach po przywróceniu.
numOfFilesAfterRestore Liczba plików w tabeli po przywróceniu.
numRemovedFiles Liczba plików usuniętych przez operację przywracania.
numRestoredFiles Liczba plików, które zostały dodane w wyniku przywracania.
removedFilesSize Rozmiar w bajtach plików usuniętych przez przywracanie.
restoredFilesSize Rozmiar w bajtach plików dodanych przez przywracanie.
VACUUM
numDeletedFiles Liczba usuniętych plików.
numVacuumedDirectories Liczba katalogów opróżnionych.
numFilesToDelete Liczba plików do usunięcia.

Co to jest podróż w czasie usługi Delta Lake?

Podróż czasowa usługi Delta Lake obsługuje wykonywanie zapytań dotyczących poprzednich wersji tabeli na podstawie sygnatury czasowej lub wersji tabeli (zarejestrowanej w dzienniku transakcji). Możesz użyć podróży w czasie dla aplikacji, takich jak:

  • Ponowne tworzenie analiz, raportów lub danych wyjściowych (na przykład danych wyjściowych modelu uczenia maszynowego). Może to być przydatne w przypadku debugowania lub inspekcji, zwłaszcza w branżach regulowanych.
  • Pisanie złożonych zapytań czasowych.
  • Naprawianie błędów w danych.
  • Zapewnienie izolacji migawek dla zestawu zapytań dotyczących szybko zmieniających się tabel.

Ważne

Wersje tabel dostępne z podróżą czasową są określane przez kombinację progu przechowywania dla plików dziennika transakcji oraz częstotliwości i określonego przechowywania dla VACUUM operacji. Jeśli uruchamiasz VACUUM codziennie z wartościami domyślnymi, 7 dni danych jest dostępnych na potrzeby podróży w czasie.

Składnia przechodzenia w czasie na poziomie Delta

Kwerenda tabeli delty z podróżą czasową przez dodanie klauzuli po specyfikacji nazwy tabeli.

  • timestamp_expression może być jednym z:
    • '2018-10-18T22:15:12.013Z', czyli ciąg, który można rzutować na znacznik czasu
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', czyli ciąg daty
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Dowolne inne wyrażenie, które jest lub można rzutować na znacznik czasu
  • version to długa wartość, którą można uzyskać z danych wyjściowych elementu DESCRIBE HISTORY table_spec.

Ani nie timestamp_expressionversion może być podzapytaniem.

Akceptowane są tylko ciągi daty lub znacznika czasu. Przykład: "2019-01-01" i "2019-01-01T00:00:00.000Z". Zobacz następujący kod, aby zapoznać się z przykładowym kodem:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

Można również użyć @ składni, aby określić znacznik czasu lub wersję w ramach nazwy tabeli. Znacznik czasu musi być w yyyyMMddHHmmssSSS formacie. Możesz określić wersję po @ , poprzedzając v element do wersji. Zobacz następujący kod, aby zapoznać się z przykładowym kodem:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

Co to są punkty kontrolne dziennika transakcji?

Usługa Delta Lake rejestruje wersje tabel jako pliki JSON w _delta_log katalogu, które są przechowywane wraz z danymi tabeli. Aby zoptymalizować wykonywanie zapytań dotyczących punktów kontrolnych, usługa Delta Lake agreguje wersje tabel do plików punktów kontrolnych Parquet, co uniemożliwia odczytywanie wszystkich wersji historii tabel w formacie JSON. Usługa Azure Databricks optymalizuje częstotliwość tworzenia punktów kontrolnych pod kątem rozmiaru i obciążenia danych. Użytkownicy nie powinni bezpośrednio korzystać z punktów kontrolnych. Częstotliwość punktów kontrolnych może ulec zmianie bez powiadomienia.

Konfigurowanie przechowywania danych dla zapytań dotyczących podróży w czasie

Aby wykonać zapytanie dotyczące poprzedniej wersji tabeli, należy zachować zarówno dziennik, jak i pliki danych dla tej wersji.

Pliki danych są usuwane podczas VACUUM uruchamiania względem tabeli. Usługa Delta Lake automatycznie zarządza usuwaniem plików dziennika po wersjach tabeli punktów kontrolnych.

Ponieważ większość tabel delty jest VACUUM regularnie uruchamiana, zapytania do punktu w czasie powinny uwzględniać próg przechowywania dla VACUUMwartości , która domyślnie wynosi 7 dni.

Aby zwiększyć próg przechowywania danych dla tabel delty, należy skonfigurować następujące właściwości tabeli:

  • delta.logRetentionDuration = "interval <interval>": określa, jak długo jest przechowywana historia tabeli. Wartość domyślna to interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": określa wartość progową VACUUM używaną do usuwania plików danych, do których nie odwołuje się już bieżąca wersja tabeli. Wartość domyślna to interval 7 days.

Właściwości delty można określić podczas tworzenia tabeli lub ustawić za pomocą instrukcji ALTER TABLE . Zobacz Informacje o właściwościach tabeli delty.

Uwaga

Obie te właściwości należy ustawić, aby zapewnić, że historia tabeli jest przechowywana przez dłuższy czas dla tabel z częstymi VACUUM operacjami. Na przykład aby uzyskać dostęp do 30 dni danych historycznych, ustaw wartość delta.deletedFileRetentionDuration = "interval 30 days" (która jest zgodna z ustawieniem domyślnym dla elementu delta.logRetentionDuration).

Zwiększenie progu przechowywania danych może spowodować zwiększenie kosztów magazynowania, ponieważ utrzymuje się więcej plików danych.

Przywracanie tabeli delty do wcześniejszego stanu

Tabelę delty można przywrócić do wcześniejszego RESTORE stanu za pomocą polecenia . Tabela usługi Delta wewnętrznie przechowuje historyczne wersje tabeli, co umożliwia przywrócenie jej do wcześniejszego stanu. Wersja odpowiadająca wcześniejszemu stanowi lub znacznikowi czasu wskazującemu moment utworzenia wcześniejszego stanu są obsługiwane jako opcje przy użyciu polecenia RESTORE.

Ważne

  • Możesz przywrócić już przywróconą tabelę.
  • Możesz przywrócić sklonowaną tabelę.
  • Musisz mieć MODIFY uprawnienia do przywracanej tabeli.
  • Nie można przywrócić tabeli do starszej wersji, w której pliki danych zostały usunięte ręcznie lub przez vacuumprogram . Przywracanie do tej wersji częściowo jest nadal możliwe, jeśli spark.sql.files.ignoreMissingFiles jest ustawiona wartość true.
  • Format znacznika czasu przywracania do wcześniejszego stanu to yyyy-MM-dd HH:mm:ss. Udostępnianie tylko ciągu date(yyyy-MM-dd) jest również obsługiwane.
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

Aby uzyskać szczegółowe informacje o składni, zobacz PRZYWRACANIE.

Ważne

Przywracanie jest uznawane za operację zmiany danych. Wpisy dziennika usługi Delta Lake dodane przez RESTORE polecenie zawierają wartość dataChange ustawioną na true. Jeśli istnieje aplikacja podrzędna, taka jak zadanie przesyłania strumieniowego ze strukturą, które przetwarza aktualizacje tabeli usługi Delta Lake, wpisy dziennika zmian danych dodane przez operację przywracania są traktowane jako nowe aktualizacje danych i przetwarzanie może spowodować zduplikowanie danych.

Na przykład:

Wersja tabeli Operacja Aktualizacje dziennika różnicowego Rekordy w aktualizacjach dziennika zmian danych
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Brak rekordów, ponieważ optymalizacja kompaktowania nie zmienia danych w tabeli)
3 RESTORE(wersja=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

W poprzednim przykładzie polecenie powoduje wyświetlenie aktualizacji, RESTORE które były już widoczne podczas odczytywania tabeli delty w wersji 0 i 1. Jeśli zapytanie przesyłane strumieniowo odczytuje tę tabelę, te pliki będą traktowane jako nowo dodane dane i będą przetwarzane ponownie.

Przywracanie metryk

RESTORE raportuje następujące metryki jako ramkę danych z pojedynczym wierszem po zakończeniu operacji:

  • table_size_after_restore: rozmiar tabeli po przywróceniu.

  • num_of_files_after_restore: liczba plików w tabeli po przywróceniu.

  • num_removed_files: liczba plików usuniętych (logicznie usuniętych) z tabeli.

  • num_restored_files: liczba przywróconych plików z powodu wycofywania.

  • removed_files_size: całkowity rozmiar w bajtach plików usuniętych z tabeli.

  • restored_files_size: całkowity rozmiar w bajtach przywróconych plików.

    Przykład przywracania metryk

Przykłady korzystania z przechodzenia w czasie usługi Delta Lake

  • Napraw przypadkowe usunięcie tabeli dla użytkownika 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Napraw przypadkowe nieprawidłowe aktualizacje tabeli:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Wykonaj zapytanie dotyczące liczby nowych klientów dodanych w ciągu ostatniego tygodnia.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Jak mogę znaleźć wersję ostatniego zatwierdzenia w sesji platformy Spark?

Aby uzyskać numer wersji ostatniego zatwierdzenia zapisanego przez bieżący SparkSession we wszystkich wątkach i wszystkich tabelach, wykonaj zapytanie dotyczące konfiguracji spark.databricks.delta.lastCommitVersionInSessionSQL .

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Jeśli żadne zatwierdzenia nie zostały wykonane przez SparkSessionelement , zapytanie dotyczące klucza zwraca pustą wartość.

Uwaga

Jeśli współużytkujesz to samo SparkSession w wielu wątkach, jest ona podobna do udostępniania zmiennej w wielu wątkach. Możesz trafić w warunki wyścigu, ponieważ wartość konfiguracji jest aktualizowana współbieżnie.