Menggunakan umpan data perubahan Delta Lake pada Azure Databricks
Catatan
- Artikel ini menjelaskan cara merekam dan mengkueri informasi perubahan tingkat baris untuk tabel Delta menggunakan fitur ubah umpan data. Untuk mempelajari cara memperbarui tabel dalam alur Tabel Langsung Delta berdasarkan perubahan data sumber, lihat MENERAPKAN PERUBAHAN API: Menyederhanakan perubahan pengambilan data di Tabel Langsung Delta.
Mengubah umpan data memungkinkan Azure Databricks melacak perubahan tingkat baris antara versi tabel Delta. Saat diaktifkan pada tabel Delta, runtime bahasa umum mencatat peristiwa perubahan untuk semua data yang ditulis pada tabel. Ini mencakup data baris bersama dengan metadata yang menunjukkan apakah baris yang ditentukan disisipkan, dihapus, atau diperbarui.
Anda bisa membaca peristiwa perubahan dalam kueri batch menggunakan Spark SQL, Apache Spark DataFrames, dan Structured Streaming.
Penting
Ubah umpan data bekerja bersama dengan riwayat tabel untuk memberikan informasi perubahan. Karena mengkloning tabel Delta membuat riwayat terpisah, umpan data perubahan pada tabel kloning tidak cocok dengan tabel asli.
Kasus penggunaan
Umpan data perubahan tidak diaktifkan secara default. Kasus penggunaan berikut akan diaktifkan saat Anda mengaktifkan umpan data perubahan.
- Tabel Perak dan Emas: Meningkatkan performa Delta dengan hanya memproses perubahan tingkat baris setelah awal operasi
MERGE
,UPDATE
, atauDELETE
guna mempercepat dan menyederhanakan operasi ETL dan ELT. - Tampilan materialisasi: Buat tampilan informasi yang up-to-date dan agregat untuk digunakan di BI dan analitik tanpa harus memproses ulang tabel dasar lengkap, alih-alih hanya memperbarui di mana perubahan telah terjadi.
- Transmisikan perubahan: Kirim umpan data perubahan ke sistem hilir seperti Kafka atau RDBMS yang dapat menggunakannya untuk memproses secara bertahap di tahap selanjutnya dari alur data.
- Tabel jejak audit: Ambil umpan data perubahan sebagai tabel Delta menyediakan penyimpanan abadi dan kemampuan kueri yang efisien untuk melihat semua perubahan dari waktu ke waktu, termasuk kapan penghapusan terjadi dan pembaruan apa yang dibuat.
Mengaktifkan umpan data perubahan
Anda harus secara eksplisit mengaktifkan opsi ubah umpan data menggunakan salah satu metode berikut:
Tabel baru: Atur properti
delta.enableChangeDataFeed = true
tabel dalamCREATE TABLE
perintah.CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
Tabel yang sudah ada: Atur properti
delta.enableChangeDataFeed = true
tabel dalamALTER TABLE
perintah.ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Semua tabel baru:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Penting
Hanya perubahan yang dilakukan setelah Anda mengaktifkan umpan data perubahan yang dicatat; perubahan masa lalu ke tabel tidak diambil.
Mengubah penyimpanan data
Catatan Azure Databricks mengubah data bagi operasi UPDATE
, DELETE
, dan MERGE
pada folder _change_data
di bawah direktori tabel Delta. Sejumlah operasi, seperti operasi hanya-masukkan dan penghapusan partisi penuh tidak menghasilkan data pada direktori _change_data
, karena Azure Databricks dapat menghitung umpan data perubahan secara langsung dari log transaksi.
File di _change_data
folder mengikuti kebijakan retensi tabel. Oleh karena itu, jika Anda menjalankan perintah VACUUM, mengubah data umpan data juga dihapus.
Membaca perubahan dalam kueri batch
Anda dapat menyediakan versi atau stempel waktu untuk awal dan akhir. Versi awal dan akhir dan stempel waktu inklusif dalam kueri. Untuk membaca perubahan dari versi mulai tertentu ke versi terbaru tabel, tentukan hanya versi awal atau stempel waktu.
Anda menentukan versi sebagai bilangan bulat dan stempel waktu sebagai string dalam format yyyy-MM-dd[ HH:mm:ss[.SSS]]
.
Jika Anda memberikan versi yang lebih rendah atau stempel waktu yang lebih tua dari yang telah mencatat peristiwa perubahan, yaitu saat umpan data perubahan diaktifkan, kesalahan dilemparkan yang menunjukkan bahwa umpan data perubahan tidak diaktifkan.
SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
Python
# version as ints or longs
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# path based tables
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.load("pathToMyDeltaTable")
Scala
// version as ints or longs
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// path based tables
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.load("pathToMyDeltaTable")
Membaca perubahan dalam kueri streaming
Python
# providing a starting version
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# providing a starting timestamp
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", "2021-04-21 05:35:43") \
.load("/pathToMyDeltaTable")
# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("myDeltaTable")
Scala
// providing a starting version
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// providing a starting timestamp
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "2021-04-21 05:35:43")
.load("/pathToMyDeltaTable")
// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table("myDeltaTable")
Untuk mendapatkan data perubahan saat membaca tabel, atur opsi readChangeFeed
ke true
.
startingVersion
ataustartingTimestamp
bersifat opsional dan jika tidak disediakan, aliran mengembalikan snapshot terbaru tabel pada saat streaming sebagai INSERT
perubahan dan masa mendatang sebagai data perubahan.
Opsi seperti batas tarif (maxFilesPerTrigger
, maxBytesPerTrigger
) dan excludeRegex
juga didukung saat membaca data perubahan.
Catatan
Pembatasan tarif dapat menjadi atom untuk versi selain versi snapshot awal. Artinya, seluruh versi komit akan dibatasi tarif atau seluruh komit akan dikembalikan.
Secara default, jika pengguna meneruskan versi atau stempel waktu yang melebihi penerapan terakhir pada tabel, kesalahan timestampGreaterThanLatestCommit
akan muncul. Di Databricks Runtime 11.3 LTS ke atas, umpan data perubahan dapat menangani kasus versi di luar rentang jika pengguna mengatur konfigurasi berikut ke true
:
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Jika Anda memberikan versi awal yang lebih besar dari penerapan terakhir pada tabel atau stempel waktu mulai yang lebih baru dari penerapan terakhir pada tabel, maka saat konfigurasi sebelumnya diaktifkan, hasil baca kosong akan ditampilkan.
Jika Anda memberikan versi akhir yang lebih besar dari penerapan terakhir pada tabel atau stempel waktu akhir yang lebih baru dari penerapan terakhir pada tabel, maka saat konfigurasi sebelumnya diaktifkan dalam mode baca batch, semua perubahan antara versi awal dan penerapan terakhir akan ditampilkan.
Apa skema untuk umpan data perubahan?
Saat Anda membaca dari umpan data perubahan untuk tabel, skema untuk versi tabel terbaru digunakan.
Catatan
Sebagian besar operasi perubahan skema dan evolusi didukung penuh. Tabel dengan pemetaan kolom diaktifkan tidak mendukung semua kasus penggunaan dan menunjukkan perilaku yang berbeda. Lihat Mengubah batasan umpan data untuk tabel dengan pemetaan kolom diaktifkan.
Selain kolom data dari skema tabel Delta, ubah umpan data berisi kolom metadata yang mengidentifikasi jenis peristiwa perubahan:
Nama kolom | Jenis | Nilai |
---|---|---|
_change_type |
String | insert , update_preimage , update_postimage , delete (1) |
_commit_version |
Panjang | Log Delta atau versi tabel yang berisi perubahan. |
_commit_timestamp |
Tanda Waktu | Stempel waktu terkait saat penerapan dibuat. |
(1)preimage
adalah nilai sebelum pembaruan, postimage
adalah nilai setelah pembaruan.
Catatan
Anda tidak dapat mengaktifkan umpan data perubahan pada tabel jika skema berisi kolom dengan nama yang sama dengan kolom yang ditambahkan ini. Ganti nama kolom dalam tabel untuk mengatasi konflik ini sebelum mencoba mengaktifkan umpan data perubahan.
Mengubah batasan umpan data untuk tabel dengan pemetaan kolom diaktifkan
Dengan pemetaan kolom diaktifkan pada tabel Delta, Anda dapat menghilangkan atau mengganti nama kolom dalam tabel tanpa menulis ulang file data untuk data yang sudah ada. Dengan pemetaan kolom diaktifkan, umpan data perubahan memiliki batasan setelah melakukan perubahan skema non-aditif seperti mengganti nama atau menghilangkan kolom, mengubah jenis data, atau perubahan nullability.
Penting
- Anda tidak dapat membaca mengubah umpan data untuk transaksi atau rentang di mana perubahan skema non-aditif terjadi menggunakan semantik batch.
- Di Databricks Runtime 12.2 LTS dan di bawahnya, tabel dengan pemetaan kolom diaktifkan yang mengalami perubahan skema non-aditif tidak mendukung bacaan streaming pada umpan data perubahan. Untuk detailnya, lihat Streaming dengan pemetaan kolom dan perubahan skema.
- Di Databricks Runtime 11.3 LTS dan di bawahnya, Anda tidak dapat membaca mengubah umpan data untuk tabel dengan pemetaan kolom diaktifkan yang telah mengalami penggantian nama kolom atau penurunan.
Di Databricks Runtime 12.2 LTS ke atas, Anda dapat melakukan pembacaan batch pada umpan data perubahan untuk tabel dengan pemetaan kolom diaktifkan yang telah mengalami perubahan skema non-aditif. Alih-alih menggunakan skema tabel versi terbaru, operasi baca menggunakan skema versi akhir tabel yang ditentukan dalam kueri. Kueri masih gagal jika rentang versi yang ditentukan mencakup perubahan skema non-aditif.
Pertanyaan Umum (FAQ)
Berapa overhead untuk mengaktifkan umpan data perubahan?
Tidak ada dampak yang signifikan. Catatan data perubahan dibuat sejalan selama proses eksekusi kueri, dan umumnya jauh lebih kecil dari ukuran total file yang ditulis ulang.
Apa kebijakan retensi untuk catatan perubahan?
Mengubah catatan mengikuti kebijakan retensi yang sama dengan versi tabel yang sudah kedaluwarsa, dan akan dibersihkan melalui VACUUM jika berada di luar periode retensi yang ditentukan.
Kapan rekaman baru tersedia di feed data perubahan?
Data perubahan dilakukan bersama dengan transaksi Delta Lake, dan akan tersedia pada saat yang sama dengan data baru tersedia dalam tabel.
Contoh buku catatan: Menyebarkan perubahan dengan umpan data perubahan Delta
Notebook ini menunjukkan cara menyebarkan perubahan yang dibuat ke tabel perak jumlah mutlak vaksinasi ke tabel emas tingkat vaksinasi.