Bekerja dengan riwayat tabel

Setiap operasi yang memodifikasi tabel membuat versi tabel baru. Gunakan informasi riwayat untuk mengaudit operasi, memutar kembali tabel, atau mengkueri tabel pada titik waktu tertentu menggunakan perjalanan waktu.

Catatan

Databricks tidak merekomendasikan penggunaan riwayat tabel sebagai solusi pencadangan jangka panjang untuk pengarsipan data. Gunakan hanya 7 hari terakhir untuk operasi perjalanan waktu kecuali Anda telah mengatur konfigurasi retensi data dan log ke nilai yang lebih besar.

Mengambil riwayat tabel

Ambil informasi termasuk operasi, pengguna, dan tanda waktu untuk setiap penulisan ke tabel dengan menjalankan perintah history. Operasi diurutkan kembali dalam urutan kronologis terbalik.

Retensi riwayat tabel ditentukan oleh pengaturan logRetentionDurationtabel , yaitu 30 hari secara default.

Catatan

Perjalanan waktu dan riwayat tabel dikendalikan oleh ambang batas retensi yang berbeda. Lihat Apa itu perjalanan waktu?.

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

Untuk detail sintaks Spark SQL, lihat DESCRIBE HISTORY.

Lihat dokumentasi Delta Lake API untuk detail sintaks Scala/Java/Python.

Catalog Explorer menyediakan tampilan visual informasi dan riwayat tabel terperinci ini. Selain skema tabel dan data sampel, Anda dapat mengklik tab Riwayat untuk melihat riwayat tabel yang ditampilkan dengan DESCRIBE HISTORY.

Skema riwayat

Output dari operasi history memiliki kolom-kolom berikut.

kolom Tipe Deskripsi
versi panjang Versi tabel yang dihasilkan oleh operasi.
stempel waktu stempel waktu Saat versi ini dikomitmenkan.
userId string ID pengguna yang menjalankan operasi.
userName string Nama pengguna yang menjalankan operasi.
operasi string Nama operasi.
operationParameters memetakan Parameter operasi (misalnya, predikat.)
pekerjaan struktur Rincian tugas yang menjalankan proses operasi.
buku catatan struktur Detail buku catatan tempat operasi dijalankan.
clusterId string ID kluster tempat operasi dijalankan.
bacaVersi panjang Versi tabel yang dibaca untuk melakukan operasi tulis.
tingkat isolasi string Tingkat isolasi yang digunakan untuk operasi ini.
isBlindAppend Boolean Apakah operasi ini menambahkan data.
operationMetrics memetakan Metrik operasi (misalnya, jumlah baris dan file yang dimodifikasi.)
userMetadata string Metadata commit yang ditentukan pengguna jika diatur
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Catatan

Memahami partitionBy dalam parameter operasi

Bidang partitionBy ini hanya bermakna untuk operasi CREATE dan OVERWRITE yang menentukan atau mengubah skema partisi tabel.

Untuk menambahkan operasi ke tabel yang ada (TAMBAHKAN, INSERT, , UPDATEHAPUS, GABUNGkan), bidang ini mungkin menampilkan kolom array [] atau partisi kosong tergantung pada metode tulis yang digunakan (.save() vs .saveAsTable()). Ketidakkonsistensi ini adalah perilaku yang diharapkan dan tidak boleh digunakan untuk memvalidasi penulisan.

Penting

Jangan mengandalkan partitionBy dalam riwayat untuk memvalidasi operasi penambahan. Nilai bervariasi berdasarkan detail implementasi tetapi tidak memengaruhi bagaimana data ditulis ke partisi.

Example

Pertimbangkan tabel yang dipartisi oleh date kolom:

# Initial table creation - partitionBy is populated
df.write.format("delta") \
  .partitionBy("date") \
  .saveAsTable("sales_data")

Pengoperasian CREATE yang ada dalam riwayat menunjukkan:

operationParameters: {
  "mode": "ErrorIfExists",
  "partitionBy": "[\"date\"]"
}

Saat Anda menambahkan data ke tabel ini:

# Subsequent append - partitionBy shows empty
new_df.write.format("delta") \
  .mode("append") \
  .saveAsTable("sales_data")

Operasi TAMBAHKAN menunjukkan:

operationParameters: {
  "mode": "Append",
  "partitionBy": "[]"
}

Nilai kosong partitionBy adalah yang diharapkan. Data masih ditulis ke partisi yang benar berdasarkan skema partisi tabel yang ada. Perhatikan bahwa .save() ke jalur dapat memperlihatkan kolom partisi pada bidang ini, tetapi perbedaan ini adalah detail implementasi dan tidak memengaruhi perilaku tulis.

Metrik operasi

Operasi history mengembalikan kumpulan metrik operasi di peta kolom operationMetrics.

Tabel berikut mencantumkan definisi kunci peta berdasarkan operasi.

Operasi Nama metrik Deskripsi
TULIS, CREATE TABLE AS SELECT, GANTI TABLE AS SELECT, COPY INTO
numFiles Jumlah file yang ditulis.
numOutputBytes Ukuran dalam byte dari konten tertulis.
numOutputRows Jumlah baris yang ditulis.
Streaming UPDATE
jumlahFileDitambahkan Jumlah file yang ditambahkan.
jumlahFileDihapus Jumlah file yang dihapus.
numOutputRows Jumlah baris yang ditulis.
numOutputBytes Ukuran tulis dalam byte.
MENGHAPUS
jumlahFileDitambahkan Jumlah file yang ditambahkan. Tidak disediakan saat partisi tabel dihapus.
jumlahFileDihapus Jumlah file yang dihapus.
numDeletedRows Jumlah baris yang dihapus. Tidak disediakan saat partisi tabel dihapus.
numCopiedRows Jumlah baris yang disalin dalam proses menghapus file.
executionTimeMs Waktu yang dibutuhkan untuk menjalankan seluruh operasi.
scanTimeMs Waktu yang dibutuhkan untuk memindai file untuk menemukan kecocokan.
rewriteTimeMs Waktu yang dibutuhkan untuk menulis ulang file yang cocok.
MEMANGKAS
jumlahFileDihapus Jumlah file yang dihapus.
executionTimeMs Waktu yang dibutuhkan untuk menjalankan seluruh operasi.
GABUNG
numSourceRows Jumlah baris di DataFrame sumber.
numTargetRowsInserted Jumlah baris yang dimasukkan ke dalam tabel target.
numTargetRowsUpdated Jumlah baris yang diperbarui dalam tabel target.
JumlahBarisTargetDihapus Jumlah baris yang dihapus dalam tabel target.
numTargetRowsCopied Jumlah baris target yang disalin.
numOutputRows Jumlah total baris yang telah dituliskan.
jumlahBerkasTargetDitambahkan Jumlah file yang ditambahkan ke sink(target).
JumlahFileTargetDihapus Jumlah file yang dihapus dari sink(target).
executionTimeMs Waktu yang dibutuhkan untuk menjalankan seluruh operasi.
scanTimeMs Waktu yang diperlukan untuk memindai file dan menemukan kecocokan.
rewriteTimeMs Waktu yang dibutuhkan untuk menulis ulang file yang cocok.
UPDATE
jumlahFileDitambahkan Jumlah file yang ditambahkan.
jumlahFileDihapus Jumlah file yang dihapus.
numUpdatedRows Jumlah baris yang diperbarui.
numCopiedRows Jumlah baris yang baru saja disalin dalam proses memperbarui file.
executionTimeMs Waktu yang dibutuhkan untuk menjalankan seluruh operasi.
scanTimeMs Waktu yang dibutuhkan untuk memindai file untuk kecocokan.
rewriteTimeMs Waktu yang dibutuhkan untuk menulis ulang file yang cocok.
FSCK jumlahFileDihapus Jumlah file yang dihapus.
MENGUBAH jumlah berkas yang dikonversi Jumlah file Parquet yang telah dikonversi.
OPTIMIZE
jumlahFileDitambahkan Jumlah file yang ditambahkan.
jumlahFileDihapus Jumlah file yang dioptimalkan.
numAddedBytes (jumlahByteDitambahkan) Jumlah byte yang ditambahkan setelah tabel dioptimalkan.
jumlahByteDihapus Jumlah byte yang dihapus.
minFileSize Ukuran file terkecil setelah tabel dioptimalkan.
p25FileSize Ukuran file persentil ke-25 setelah tabel dioptimalkan.
p50FileSize Ukuran file median setelah tabel dioptimalkan.
p75FileSize Ukuran file persentil ke-75 setelah tabel dioptimalkan.
ukuranFileMaksimum Ukuran file terbesar setelah tabel dioptimalkan.
Klon
sourceTableSize Ukuran dalam byte tabel sumber pada versi yang telah dikloning.
sourceNumOfFiles Jumlah file dalam tabel sumber pada versi yang dikloning.
jumlahFileDihapus Jumlah file yang dihapus dari tabel target jika tabel sebelumnya diganti.
removedFilesSize Ukuran total dalam byte file yang dihapus dari tabel target jika tabel sebelumnya diganti.
jumlahFileYangDisalin Jumlah file yang disalin ke lokasi baru. 0 untuk klon dangkal.
UkuranFileTersalin Ukuran total dalam byte file yang disalin ke lokasi baru. 0 untuk klon dangkal.
RESTORE
ukuranTabelSetelahPemulihan Ukuran tabel dalam byte setelah pemulihan.
jumlahFileSetelahPemulihan Jumlah file dalam tabel setelah pemulihan.
jumlahFileDihapus Jumlah file yang dihapus oleh operasi pemulihan.
jumlahFileDipulihkan Jumlah file yang ditambahkan sebagai hasil pemulihan.
removedFilesSize Ukuran dalam byte file yang dihapus oleh pemulihan.
restoredFilesSize Ukuran dalam byte file yang ditambahkan oleh proses pemulihan.
VACUUM
jumlahFileDihapus Jumlah file yang dihapus.
JumlahDirektoriYangDibersihkan Jumlah direktori yang dikosongkan.
numFilesToDelete Jumlah file yang akan dihapus.

Apa itu perjalanan waktu?

Perjalanan waktu mendukung kueri versi tabel sebelumnya berdasarkan tanda waktu atau versi tabel (seperti yang dicatat dalam log transaksi). Anda dapat menggunakan perjalanan waktu untuk aplikasi seperti berikut:

  • Membuat ulang analisis, laporan, atau keluaran (misalnya, keluaran model pembelajaran mesin). Ini bisa berguna untuk debugging atau audit, terutama di industri yang diatur.
  • Menulis pertanyaan temporal yang kompleks.
  • Memperbaiki kesalahan dalam data Anda.
  • Menyediakan isolasi snapshot untuk seperangkat kueri pada tabel yang sering mengalami perubahan.

Penting

Pada Databricks Runtime 18.0 dan lebih tinggi, kueri perjalanan waktu diblokir jika meminta versi yang lebih lama dari properti tabel deletedFileRetentionDuration (default 7 hari). Untuk tabel terkelola Unity Catalog, ini berlaku untuk Databricks Runtime 12.2 ke atas.

Sintaksis perjalanan waktu

Anda mengkueri tabel dengan perjalanan waktu dengan menambahkan klausa setelah spesifikasi nama tabel.

  • timestamp_expression dapat berupa salah satu dari:
    • '2018-10-18T22:15:12.013Z', yaitu, string yang dapat diubah menjadi penanda waktu
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', yaitu, string tanggal
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Ekspresi lain yang dapat diubah menjadi timestamp
  • version adalah nilai panjang yang dapat diperoleh dari output DESCRIBE HISTORY table_spec.

Baik timestamp_expression maupun version tidak boleh berupa subkueri.

Hanya string tanggal atau tanda waktu yang diterima. Misalnya, "2019-01-01" dan "2019-01-01T00:00:00.000Z". Lihat kode berikut untuk contoh sintaks:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Phyton

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

Anda juga dapat menggunakan @ sintaks untuk menentukan tanda waktu atau versi sebagai bagian dari nama tabel. Stempel waktu harus dalam format yyyyMMddHHmmssSSS. Anda dapat menentukan versi setelah @ dengan menambahkan v ke versi tersebut. Lihat kode berikut untuk contoh sintaks:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Phyton

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

Apa itu titik pemeriksaan log transaksi?

Versi tabel dicatat sebagai file JSON dalam direktori log transaksi, yang disimpan bersama data tabel. Untuk mengoptimalkan kueri checkpoint, versi tabel diagregasi ke dalam file checkpoint Parquet, yang mencegah perlunya membaca semua versi JSON dari riwayat tabel. Azure Databricks mengoptimalkan frekuensi titik pemeriksaan untuk ukuran data dan beban kerja. Pengguna tidak perlu berinteraksi dengan titik pemeriksaan secara langsung. Frekuensi titik pemeriksaan dapat berubah tanpa pemberitahuan.

Mengonfigurasi retensi data untuk kueri historis

Untuk mengkueri versi tabel sebelumnya, Anda harus menyimpan log dan file data untuk versi tersebut.

File data dihapus saat VACUUM dijalankan terhadap tabel. Penghapusan file log dikelola secara otomatis setelah mengecek versi tabel.

Karena sebagian besar tabel telah VACUUM berjalan terhadapnya secara teratur, kueri point-in-time harus menghormati ambang retensi untuk VACUUM, yaitu 7 hari secara default.

Untuk meningkatkan ambang retensi data untuk tabel, Anda harus mengonfigurasi properti tabel berikut:

  • delta.logRetentionDuration = "interval <interval>": mengontrol berapa lama riwayat tabel disimpan. Default adalah interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": menentukan ambang batas VACUUM yang digunakan untuk menghapus file data yang tidak lagi dirujuk dalam versi tabel saat ini. Default adalah interval 7 days.

Anda dapat menentukan properti tabel selama pembuatan tabel atau mengaturnya dengan ALTER TABLE pernyataan. Lihat Referensi properti tabel.

Catatan

Dalam Databricks Runtime 18.0 ke atas, logRetentionDuration harus lebih besar dari atau sama dengan deletedFileRetentionDuration. Untuk tabel terkelola Unity Catalog, ini berlaku untuk Databricks Runtime 12.2 ke atas.

Untuk mengakses 30 hari data historis, atur delta.deletedFileRetentionDuration = "interval 30 days" (yang cocok dengan pengaturan default untuk delta.logRetentionDuration).

Meningkatkan ambang retensi data dapat menyebabkan biaya penyimpanan Anda naik, karena lebih banyak file data dipertahankan.

Memulihkan tabel ke status sebelumnya

Anda dapat memulihkan tabel ke status sebelumnya dengan menggunakan RESTORE perintah . Tabel secara internal mempertahankan versi historis yang memungkinkannya dipulihkan ke status sebelumnya. Versi yang sesuai dengan kondisi sebelumnya atau stempel waktu saat kondisi sebelumnya dibuat didukung sebagai opsi oleh perintah RESTORE.

Penting

  • Anda dapat memulihkan tabel yang sudah dipulihkan.
  • Anda dapat memulihkan tabel kloning.
  • Anda harus memiliki izin MODIFY pada tabel yang sedang dipulihkan.
  • Anda tidak dapat memulihkan tabel ke versi yang lebih lama di mana file data dihapus secara manual atau oleh vacuum. Memulihkan ke versi ini sebagian masih memungkinkan jika spark.sql.files.ignoreMissingFiles diatur ke true.
  • Format stempel waktu untuk memulihkan ke kondisi sebelumnya adalah yyyy-MM-dd HH:mm:ss. Penyediaan string tanggal saja (yyyy-MM-dd) juga didukung.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

Untuk detail sintaks, lihat RESTORE.

Penting

Pemulihan data dianggap sebagai operasi pengubahan data. Entri log RESTORE yang ditambahkan oleh perintah berisi dataChange diatur ke true. Jika ada aplikasi hilir, seperti pekerjaan streaming Terstruktur yang memproses pembaruan ke tabel, entri log perubahan data yang ditambahkan oleh operasi pemulihan dianggap sebagai pembaruan data baru, dan memprosesnya dapat mengakibatkan data duplikat.

Contohnya:

Versi tabel Operasi Pembaruan log Catatan dalam pembaruan log perubahan data
0 INSERT AddFile(/path/to/file-1, dataChange = true) (nama = Viktor, usia = 29, (nama = George, usia = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (nama = George, usia = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Tidak ada catatan karena optimalisasi pemadatan tidak mengubah data dalam tabel)
3 RESTORE(versi=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, perubahanData = benar), AddFile(/path/to/file-2, perubahanData = benar) (nama = Viktor, usia = 29), (nama = George, usia = 55), (nama = George, usia = 39)

Dalam contoh sebelumnya, RESTORE perintah menghasilkan pembaruan yang sudah terlihat saat membaca tabel versi 0 dan 1. Jika kueri streaming membaca tabel ini, maka file ini akan dianggap sebagai data yang baru ditambahkan dan akan diproses lagi.

Pulihkan metrik

RESTORE melaporkan metrik berikut sebagai DataFrame baris tunggal setelah operasi selesai:

  • table_size_after_restore: Ukuran tabel setelah memulihkan.

  • num_of_files_after_restore: Jumlah file dalam tabel setelah memulihkan.

  • num_removed_files: Jumlah file yang dihapus (dihapus secara logis) dari tabel.

  • num_restored_files: Jumlah file yang dipulihkan karena bergulir kembali.

  • removed_files_size: Ukuran total dalam byte file yang dihapus dari tabel.

  • restored_files_size: Ukuran total dalam byte file yang dipulihkan.

    Contoh memulihkan metrik

Contoh penggunaan perjalanan waktu

  • Memperbaiki penghapusan yang tidak disengaja ke tabel untuk pengguna 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Memperbaiki pembaruan yang salah secara tidak sengaja ke tabel:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Tanyakan jumlah pelanggan baru yang ditambahkan selama seminggu terakhir.

    SELECT
    (
      SELECT count(distinct userId)
      FROM my_table
    )
    -
    (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)
    ) AS new_customers
    

Bagaimana cara menemukan versi komit terakhir dalam sesi Spark?

Untuk mendapatkan nomor versi dari komit terakhir yang ditulis oleh SparkSession di semua thread dan semua tabel, kueri konfigurasi SQL spark.databricks.delta.lastCommitVersionInSession.

Catatan

Untuk tabel Apache Iceberg, gunakan spark.databricks.iceberg.lastCommitVersionInSession alih-alih spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Phyton

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Jika tidak ada komit yang dibuat oleh SparkSession, mengkueri kunci mengembalikan nilai kosong.

Catatan

Jika Anda berbagi yang sama SparkSession di beberapa utas, itu mirip dengan berbagi variabel di beberapa utas; Anda dapat mengalami kondisi persaingan ketika nilai konfigurasi diperbarui secara bersamaan.