Bekerja dengan riwayat tabel Delta Lake

Setiap operasi yang memodifikasi tabel Delta Lake membuat versi tabel baru. Anda dapat menggunakan informasi riwayat untuk mengaudit operasi, memutar kembali tabel, atau mengkueri tabel pada titik waktu tertentu menggunakan perjalanan waktu.

Catatan

Databricks tidak merekomendasikan penggunaan riwayat tabel Delta Lake sebagai solusi pencadangan jangka panjang untuk pengarsipan data. Databricks merekomendasikan penggunaan hanya 7 hari terakhir untuk operasi perjalanan waktu kecuali Anda telah mengatur konfigurasi retensi data dan log ke nilai yang lebih besar.

Mengambil riwayat tabel Delta

Anda dapat mengambil informasi termasuk operasi, pengguna, dan tanda waktu untuk setiap penulisan ke tabel Delta dengan menjalankan history perintah . Operasi dikembalikan dalam urutan kronologis terbalik.

Retensi riwayat tabel ditentukan oleh pengaturan delta.logRetentionDurationtabel , yaitu 30 hari secara default.

Catatan

Perjalanan waktu dan riwayat tabel dikendalikan oleh ambang batas retensi yang berbeda. Lihat Apa itu perjalanan waktu Delta Lake?.

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

Untuk detail sintaks Spark SQL, lihat MENJELASKAN RIWAYAT.

Lihat dokumentasi Delta Lake API untuk detail sintaks Scala/Java/Python.

Catalog Explorer menyediakan tampilan visual informasi dan riwayat tabel terperinci ini untuk tabel Delta. Selain skema tabel dan data sampel, Anda dapat mengklik tab Riwayat untuk melihat riwayat tabel yang ditampilkan dengan DESCRIBE HISTORY.

Skema riwayat

Output operasi history memiliki kolom berikut.

Column Tipe Deskripsi
versi long Versi tabel yang dihasilkan oleh operasi.
rentang waktu rentang waktu Saat versi ini diterapkan.
userId string ID pengguna yang menjalankan operasi.
userName string Nama pengguna yang menjalankan operasi.
operasi string Nama operasi.
operationParameters peta Parameter operasi (misalnya, predikat.)
tugas struktur Rincian pekerjaan yang menjalankan operasi.
buku catatan struktur Detail buku catatan tempat operasi dijalankan.
clusterId string ID kluster tempat operasi dijalankan.
readVersion long Versi tabel yang dibaca untuk melakukan operasi tulis.
isolationLevel string Tingkat isolasi yang digunakan untuk operasi ini.
isBlindAppend Boolean Apakah operasi ini menambahkan data.
operationMetrics peta Metrik operasi (misalnya, jumlah baris dan file yang dimodifikasi.)
userMetadata string Metadata penerapan yang ditentukan pengguna jika ditentukan
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Catatan

Kunci metrik operasi

Operasi history mengembalikan kumpulan metrik operasi di peta kolom operationMetrics.

Tabel berikut mencantumkan definisi kunci peta berdasarkan operasi.

Operasi Nama metrik Deskripsi
TULIS, BUAT TABEL TERPILIH, GANTI TABEL TERPILIH, SALIN KE
numFiles Jumlah file yang ditulis.
numOutputBytes Ukuran dalam byte dari konten tertulis.
numOutputRows Jumlah baris yang ditulis.
PEMBARUAN STREAMING
numAddedFiles Jumlah file yang ditambahkan.
numRemovedFiles Jumlah file yang dihapus.
numOutputRows Jumlah baris yang ditulis.
numOutputBytes Ukuran tulis dalam byte.
DELETE
numAddedFiles Jumlah file yang ditambahkan. Tidak disediakan saat partisi tabel dihapus.
numRemovedFiles Jumlah file yang dihapus.
numDeletedRows Jumlah baris yang dihapus. Tidak disediakan saat partisi tabel dihapus.
numCopiedRows Jumlah baris yang disalin dalam proses menghapus file.
executionTimeMs Waktu yang dibutuhkan untuk menjalankan seluruh operasi.
scanTimeMs Waktu yang dibutuhkan untuk memindai file untuk kecocokan.
rewriteTimeMs Waktu yang dibutuhkan untuk menulis ulang file yang cocok.
TRUNCATE
numRemovedFiles Jumlah file yang dihapus.
executionTimeMs Waktu yang dibutuhkan untuk menjalankan seluruh operasi.
GABUNG
numSourceRows Jumlah baris di DataFrame sumber.
numTargetRowsInserted Jumlah baris yang dimasukkan ke dalam tabel target.
numTargetRowsUpdated Jumlah baris yang diperbarui dalam tabel target.
numTargetRowsDeleted Jumlah baris yang dihapus dalam tabel target.
numTargetRowsCopied Jumlah baris target yang disalin.
numOutputRows Jumlah total baris yang ditulis.
numTargetFilesAdded Jumlah file yang ditambahkan ke sink(target).
numTargetFilesRemoved Jumlah file yang dihapus dari sink(target).
executionTimeMs Waktu yang dibutuhkan untuk menjalankan seluruh operasi.
scanTimeMs Waktu yang dibutuhkan untuk memindai file untuk kecocokan.
rewriteTimeMs Waktu yang dibutuhkan untuk menulis ulang file yang cocok.
UPDATE
numAddedFiles Jumlah file yang ditambahkan.
numRemovedFiles Jumlah file yang dihapus.
numUpdatedRows Jumlah baris yang diperbarui.
numCopiedRows Jumlah baris yang baru saja disalin dalam proses memperbarui file.
executionTimeMs Waktu yang dibutuhkan untuk menjalankan seluruh operasi.
scanTimeMs Waktu yang dibutuhkan untuk memindai file untuk kecocokan.
rewriteTimeMs Waktu yang dibutuhkan untuk menulis ulang file yang cocok.
FSCK numRemovedFiles Jumlah file yang dihapus.
CONVERT numConvertedFiles Jumlah file Parket yang telah dikonversi.
OPTIMIZE
numAddedFiles Jumlah file yang ditambahkan.
numRemovedFiles Jumlah file yang dioptimalkan.
numAddedBytes Jumlah byte yang ditambahkan setelah tabel dioptimalkan.
numRemovedBytes Jumlah byte yang dihapus.
minFileSize Ukuran file terkecil setelah tabel dioptimalkan.
p25FileSize Ukuran file persentil ke-25 setelah tabel dioptimalkan.
p50FileSize Ukuran file median setelah tabel dioptimalkan.
p75FileSize Ukuran file persentil ke-75 setelah tabel dioptimalkan.
maxFileSize Ukuran file terbesar setelah tabel dioptimalkan.
CLONE
sourceTableSize Ukuran dalam byte tabel sumber pada versi yang dikloning.
sourceNumOfFiles Jumlah file dalam tabel sumber pada versi yang dikloning.
numRemovedFiles Jumlah file yang dihapus dari tabel target jika tabel Delta sebelumnya diganti.
removedFilesSize Ukuran total dalam byte file dihapus dari tabel target jika tabel Delta sebelumnya diganti.
numCopiedFiles Jumlah file yang disalin ke lokasi baru. 0 untuk klon dangkal.
copiedFilesSize Ukuran total dalam byte file yang disalin ke lokasi baru. 0 untuk klon dangkal.
MEMULIHKAN
tableSizeAfterRestore Ukuran tabel dalam byte setelah pemulihan.
numOfFilesAfterRestore Jumlah file dalam tabel setelah pemulihan.
numRemovedFiles Jumlah file yang dihapus oleh operasi pemulihan.
numRestoredFiles Jumlah file yang ditambahkan sebagai hasil pemulihan.
removedFilesSize Ukuran dalam byte file yang dihapus oleh pemulihan.
restoredFilesSize Ukuran dalam byte file yang ditambahkan oleh pemulihan.
VAKUM
numDeletedFiles Jumlah file yang dihapus.
numVacuumedDirectories Jumlah direktori yang dikosongkan.
numFilesToDelete Jumlah file yang akan dihapus.

Apa itu perjalanan waktu Delta Lake?

Perjalanan waktu Delta Lake mendukung kueri versi tabel sebelumnya berdasarkan tanda waktu atau versi tabel (seperti yang dicatat dalam log transaksi). Anda dapat menggunakan perjalanan waktu untuk aplikasi seperti berikut:

  • Membuat ulang analisis, laporan, atau keluaran (misalnya, keluaran model pembelajaran mesin). Ini bisa berguna untuk debugging atau audit, terutama di industri yang diatur.
  • Menulis pertanyaan temporal yang kompleks.
  • Memperbaiki kesalahan dalam data Anda.
  • Menyediakan isolasi snapshot untuk sekumpulan kueri untuk tabel yang berubah dengan cepat.

Penting

Versi tabel yang dapat diakses dengan perjalanan waktu ditentukan oleh kombinasi ambang retensi untuk file log transaksi dan frekuensi dan retensi yang ditentukan untuk VACUUM operasi. Jika Anda menjalankan VACUUM setiap hari dengan nilai default, data 7 hari tersedia untuk perjalanan waktu.

Sintaks perjalanan waktu delta

Anda mengkueri tabel Delta dengan perjalanan waktu dengan menambahkan klausa setelah spesifikasi nama tabel.

  • timestamp_expression dapat berupa salah satu dari:
    • '2018-10-18T22:15:12.013Z', yaitu, string yang dapat ditransmisikan ke stempel waktu
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', yaitu, string tanggal
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Ekspresi lain yang sedang atau dapat ditransmisikan ke stempel waktu
  • version adalah nilai panjang yang dapat diperoleh dari output DESCRIBE HISTORY table_spec.

Baik timestamp_expression maupun version tidak boleh berupa subkueri.

Hanya string tanggal atau tanda waktu yang diterima. Misalnya, "2019-01-01" dan "2019-01-01T00:00:00.000Z". Lihat kode berikut untuk contoh sintaks:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

Anda juga dapat menggunakan @ sintaks untuk menentukan tanda waktu atau versi sebagai bagian dari nama tabel. Stempel waktu harus dalam format yyyyMMddHHmmssSSS. Anda dapat menentukan versi setelah @ dengan menambahkan v ke versi tersebut. Lihat kode berikut untuk contoh sintaks:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

Apa itu titik pemeriksaan log transaksi?

Delta Lake merekam versi tabel sebagai file JSON dalam _delta_log direktori, yang disimpan bersama data tabel. Untuk mengoptimalkan kueri titik pemeriksaan, Delta Lake mengagregasi versi tabel ke file titik pemeriksaan Parquet, mencegah kebutuhan untuk membaca semua riwayat tabel versi JSON. Azure Databricks mengoptimalkan frekuensi titik pemeriksaan untuk ukuran data dan beban kerja. Pengguna tidak perlu berinteraksi dengan titik pemeriksaan secara langsung. Frekuensi titik pemeriksaan dapat berubah tanpa pemberitahuan.

Mengonfigurasi retensi data untuk kueri perjalanan waktu

Untuk mengkueri versi tabel sebelumnya, Anda harus menyimpan log dan file data untuk versi tersebut.

File data dihapus saat VACUUM dijalankan terhadap tabel. Delta Lake mengelola penghapusan file log secara otomatis setelah mengecek versi tabel.

Karena sebagian besar tabel Delta telah VACUUM berjalan terhadapnya secara teratur, kueri point-in-time harus menghormati ambang retensi untuk VACUUM, yaitu 7 hari secara default.

Untuk meningkatkan ambang retensi data untuk tabel Delta, Anda harus mengonfigurasi properti tabel berikut:

  • delta.logRetentionDuration = "interval <interval>": mengontrol lama riwayat untuk tabel disimpan. Default adalah interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": menentukan ambang batas VACUUM yang digunakan untuk menghapus file data yang tidak lagi dirujuk dalam versi tabel saat ini. Default adalah interval 7 days.

Anda dapat menentukan properti Delta selama pembuatan tabel atau mengaturnya dengan ALTER TABLE pernyataan. Lihat Referensi properti tabel Delta.

Catatan

Anda harus mengatur kedua properti ini untuk memastikan riwayat tabel dipertahankan untuk durasi yang lebih lama untuk tabel dengan operasi yang sering VACUUM . Misalnya, untuk mengakses 30 hari data historis, atur delta.deletedFileRetentionDuration = "interval 30 days" (yang cocok dengan pengaturan default untuk delta.logRetentionDuration).

Meningkatkan ambang retensi data dapat menyebabkan biaya penyimpanan Anda naik, karena lebih banyak file data dipertahankan.

Memulihkan tabel Delta ke kondisi sebelumnya

Anda dapat memulihkan tabel Delta ke kondisi sebelumnya dengan menggunakan perintah RESTORE. Tabel Delta secara internal mempertahankan versi historis tabel yang memungkinkannya dipulihkan ke kondisi sebelumnya. Versi yang sesuai dengan kondisi sebelumnya atau stempel waktu saat kondisi sebelumnya dibuat didukung sebagai opsi oleh perintah RESTORE.

Penting

  • Anda dapat memulihkan tabel yang sudah dipulihkan.
  • Anda dapat memulihkan tabel kloning.
  • Anda harus memiliki izin MODIFY pada tabel yang sedang dipulihkan.
  • Anda tidak dapat memulihkan tabel ke versi yang lebih lama di mana file data dihapus secara manual atau oleh vacuum. Memulihkan ke versi ini sebagian masih memungkinkan jika spark.sql.files.ignoreMissingFiles diatur ke true.
  • Format stempel waktu untuk memulihkan ke kondisi sebelumnya adalah yyyy-MM-dd HH:mm:ss. Hanya menyediakan string tanggal(yyyy-MM-dd) juga didukung.
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

Untuk detail sintaks, lihat PULIHKAN.

Penting

Pemulihan dianggap sebagai operasi pengubahan data. Entri log Delta Lake yang ditambahkan oleh perintah RESTORE berisi dataChange yang diatur ke true. Jika ada aplikasi hilir, seperti pekerjaan Streaming terstruktur yang memproses pembaruan ke tabel Delta Lake, entri log perubahan data yang ditambahkan oleh operasi pemulihan dianggap sebagai pembaruan data baru, dan memprosesnya mungkin mengakibatkan data duplikat.

Contohnya:

Versi tabel Operasi Pembaruan log Delta Rekaman dalam pembaruan log perubahan data
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (nama = George, usia = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Tidak ada rekaman sebagai Optimalkan pemadatan tidak mengubah data dalam tabel)
3 KEMBALIKAN(versi=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (nama = Viktor, usia = 29), (nama = George, usia = 55), (nama = George, usia = 39)

Pada contoh sebelumnya, perintah RESTORE menghasilkan pembaruan yang sudah terlihat saat membaca tabel Delta versi 0 dan 1. Jika kueri streaming membaca tabel ini, maka file ini akan dianggap sebagai data yang baru ditambahkan dan akan diproses lagi.

Pulihkan metrik

RESTORE melaporkan metrik berikut sebagai DataFrame baris tunggal setelah operasi selesai:

  • table_size_after_restore: Ukuran tabel setelah memulihkan.

  • num_of_files_after_restore: Jumlah file dalam tabel setelah memulihkan.

  • num_removed_files: Jumlah file yang dihapus (dihapus secara logis) dari tabel.

  • num_restored_files: Jumlah file yang dipulihkan karena bergulir kembali.

  • removed_files_size: Ukuran total dalam byte file yang dihapus dari tabel.

  • restored_files_size: Ukuran total dalam byte file yang dipulihkan.

    Contoh memulihkan metrik

Contoh penggunaan perjalanan waktu Delta Lake

  • Memperbaiki penghapusan yang tidak disengaja ke tabel untuk pengguna 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Memperbaiki pembaruan yang salah secara tidak sengaja ke tabel:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Tanyakan jumlah pelanggan baru yang ditambahkan selama seminggu terakhir.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Bagaimana cara menemukan versi penerapan terakhir dalam sesi Spark?

Untuk mendapatkan nomor versi dari penerapan terakhir yang ditulis oleh SparkSession saat ini di semua utas dan semua tabel, kueri konfigurasi SQL spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Jika tidak ada komit yang dibuat oleh SparkSession, mengkueri kunci mengembalikan nilai kosong.

Catatan

Jika Anda berbagi SparkSession yang sama di beberapa utas, ini serupa dengan berbagi variabel di beberapa utas; Anda dapat mencapai kondisi balap karena nilai konfigurasi diperbarui secara bersamaan.