Menggunakan umpan data perubahan Delta Lake pada Azure Databricks
Mengubah umpan data memungkinkan Azure Databricks melacak perubahan tingkat baris antara versi tabel Delta. Saat diaktifkan pada tabel Delta, runtime bahasa umum mencatat peristiwa perubahan untuk semua data yang ditulis pada tabel. Ini mencakup data baris bersama dengan metadata yang menunjukkan apakah baris yang ditentukan disisipkan, dihapus, atau diperbarui.
Penting
Ubah umpan data bekerja bersama dengan riwayat tabel untuk memberikan informasi perubahan. Karena mengkloning tabel Delta membuat riwayat terpisah, umpan data perubahan pada tabel kloning tidak cocok dengan tabel asli.
Memproses perubahan data secara bertahap
Databricks merekomendasikan penggunaan umpan data perubahan dalam kombinasi dengan Streaming Terstruktur untuk memproses perubahan secara bertahap dari tabel Delta. Anda harus menggunakan Streaming Terstruktur untuk Azure Databricks untuk melacak versi secara otomatis untuk umpan data perubahan tabel Anda.
Catatan
Tabel Langsung Delta menyediakan fungsionalitas untuk penyebaran data perubahan yang mudah dan menyimpan hasil sebagai tabel SCD (dimensi yang berubah perlahan) tipe 1 atau tipe 2. Lihat TERAPKAN PERUBAHAN API: Menyederhanakan perubahan pengambilan data dengan Delta Live Tables.
Untuk membaca umpan data perubahan dari tabel, Anda harus mengaktifkan umpan data perubahan pada tabel tersebut. Lihat Mengaktifkan umpan data perubahan.
Atur opsi readChangeFeed
ke true
saat mengonfigurasi aliran terhadap tabel untuk membaca umpan data perubahan, seperti yang ditunjukkan dalam contoh sintaks berikut:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
Secara default, aliran mengembalikan rekam jepret terbaru tabel saat aliran pertama kali dimulai sebagai INSERT
perubahan dan di masa mendatang saat mengubah data.
Ubah penerapan data sebagai bagian dari transaksi Delta Lake, dan menjadi tersedia pada saat yang sama data baru berkomitmen pada tabel.
Anda dapat secara opsional menentukan versi awal. Lihat Haruskah saya menentukan versi awal?.
Umpan data perubahan juga mendukung eksekusi batch, yang mengharuskan menentukan versi awal. Lihat Membaca perubahan dalam kueri batch.
Opsi seperti batas tarif (maxFilesPerTrigger
, maxBytesPerTrigger
) dan excludeRegex
juga didukung saat membaca data perubahan.
Pembatasan tarif dapat menjadi atom untuk versi selain versi snapshot awal. Artinya, seluruh versi komit akan dibatasi tarif atau seluruh komit akan dikembalikan.
Haruskah saya menentukan versi awal?
Anda dapat secara opsional menentukan versi awal jika Anda ingin mengabaikan perubahan yang terjadi sebelum versi tertentu. Anda dapat menentukan versi menggunakan tanda waktu atau nomor ID versi yang direkam dalam log transaksi Delta.
Catatan
Versi awal diperlukan untuk pembacaan batch, dan banyak pola batch dapat memperoleh manfaat dari pengaturan versi akhir opsional.
Saat Anda mengonfigurasi beban kerja Streaming Terstruktur yang melibatkan umpan data perubahan, penting untuk memahami bagaimana menentukan versi awal memengaruhi pemrosesan.
Banyak beban kerja streaming, terutama alur pemrosesan data baru, mendapat manfaat dari perilaku default. Dengan perilaku default, batch pertama diproses saat aliran pertama kali merekam semua rekaman yang ada dalam tabel sebagai INSERT
operasi dalam umpan data perubahan.
Jika tabel target Anda sudah berisi semua rekaman dengan perubahan yang sesuai hingga titik tertentu, tentukan versi awal untuk menghindari pemrosesan status tabel sumber sebagai INSERT
peristiwa.
Contoh sintaks berikut memulihkan dari kegagalan streaming di mana titik pemeriksaan rusak. Dalam contoh ini, asumsikan kondisi berikut:
- Umpan data perubahan diaktifkan pada tabel sumber pada pembuatan tabel.
- Tabel hilir target telah memproses semua perubahan hingga dan termasuk versi 75.
- Riwayat versi untuk tabel sumber tersedia untuk versi 70 ke atas.
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
Dalam contoh ini, Anda juga harus menentukan lokasi titik pemeriksaan baru.
Penting
Jika Anda menentukan versi awal, aliran gagal dimulai dari titik pemeriksaan baru jika versi awal tidak lagi ada dalam riwayat tabel. Delta Lake membersihkan versi historis secara otomatis, yang berarti bahwa semua versi awal yang ditentukan akhirnya dihapus.
Lihat Bisakah saya menggunakan umpan data perubahan untuk memutar ulang seluruh riwayat tabel?.
Membaca perubahan dalam kueri batch
Anda dapat menggunakan sintaks kueri batch untuk membaca semua perubahan mulai dari versi tertentu atau membaca perubahan dalam rentang versi tertentu.
Anda menentukan versi sebagai bilangan bulat dan stempel waktu sebagai string dalam format yyyy-MM-dd[ HH:mm:ss[.SSS]]
.
Versi awal dan akhir sudah termasuk dalam kueri. Untuk membaca perubahan dari versi mulai tertentu ke versi terbaru tabel, tentukan hanya versi awal.
Jika Anda memberikan versi yang lebih rendah atau stempel waktu yang lebih tua dari yang telah mencatat peristiwa perubahan, yaitu saat umpan data perubahan diaktifkan, kesalahan dilemparkan yang menunjukkan bahwa umpan data perubahan tidak diaktifkan.
Contoh sintaks berikut menunjukkan menggunakan opsi versi awal dan akhir dengan pembacaan batch:
SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
Python
# version as ints or longs
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
// version as ints or longs
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Catatan
Secara default, jika pengguna meneruskan versi atau stempel waktu yang melebihi penerapan terakhir pada tabel, kesalahan timestampGreaterThanLatestCommit
akan muncul. Di Databricks Runtime 11.3 LTS ke atas, umpan data perubahan dapat menangani kasus versi di luar rentang jika pengguna mengatur konfigurasi berikut ke true
:
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Jika Anda memberikan versi awal yang lebih besar dari penerapan terakhir pada tabel atau stempel waktu mulai yang lebih baru dari penerapan terakhir pada tabel, maka saat konfigurasi sebelumnya diaktifkan, hasil baca kosong akan ditampilkan.
Jika Anda memberikan versi akhir yang lebih besar dari penerapan terakhir pada tabel atau stempel waktu akhir yang lebih baru dari penerapan terakhir pada tabel, maka saat konfigurasi sebelumnya diaktifkan dalam mode baca batch, semua perubahan antara versi awal dan penerapan terakhir akan ditampilkan.
Apa skema untuk umpan data perubahan?
Saat Anda membaca dari umpan data perubahan untuk tabel, skema untuk versi tabel terbaru digunakan.
Catatan
Sebagian besar operasi perubahan skema dan evolusi didukung penuh. Tabel dengan pemetaan kolom diaktifkan tidak mendukung semua kasus penggunaan dan menunjukkan perilaku yang berbeda. Lihat Mengubah batasan umpan data untuk tabel dengan pemetaan kolom diaktifkan.
Selain kolom data dari skema tabel Delta, ubah umpan data berisi kolom metadata yang mengidentifikasi jenis peristiwa perubahan:
Nama kolom | Jenis | Nilai |
---|---|---|
_change_type |
String | insert , , update_preimage update_postimage , delete (1) |
_commit_version |
Panjang | Log Delta atau versi tabel yang berisi perubahan. |
_commit_timestamp |
Tanda Waktu | Stempel waktu terkait saat penerapan dibuat. |
(1) preimage
adalah nilai sebelum pembaruan, postimage
adalah nilai setelah pembaruan.
Catatan
Anda tidak dapat mengaktifkan umpan data perubahan pada tabel jika skema berisi kolom dengan nama yang sama dengan kolom yang ditambahkan ini. Ganti nama kolom dalam tabel untuk mengatasi konflik ini sebelum mencoba mengaktifkan umpan data perubahan.
Mengaktifkan umpan data perubahan
Anda hanya dapat membaca umpan data perubahan untuk tabel yang diaktifkan. Anda harus secara eksplisit mengaktifkan opsi ubah umpan data menggunakan salah satu metode berikut:
Tabel baru: Atur properti
delta.enableChangeDataFeed = true
tabel dalamCREATE TABLE
perintah.CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
Tabel yang sudah ada: Atur properti
delta.enableChangeDataFeed = true
tabel dalamALTER TABLE
perintah.ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Semua tabel baru:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Penting
Hanya perubahan yang dilakukan setelah Anda mengaktifkan umpan data perubahan yang direkam. Perubahan sebelumnya pada tabel tidak diambil.
Mengubah penyimpanan data
Mengaktifkan umpan data perubahan menyebabkan sedikit peningkatan biaya penyimpanan untuk tabel. Rekaman data perubahan dihasilkan saat kueri berjalan, dan umumnya jauh lebih kecil dari ukuran total file yang ditulis ulang.
Catatan Azure Databricks mengubah data bagi operasi UPDATE
, DELETE
, dan MERGE
pada folder _change_data
di bawah direktori tabel Delta. Beberapa operasi, seperti operasi khusus sisipan dan penghapusan partisi penuh, tidak menghasilkan data di _change_data
direktori karena Azure Databricks dapat secara efisien menghitung umpan data perubahan langsung dari log transaksi.
Semua bacaan terhadap file data dalam _change_data
folder harus melalui API Delta Lake yang didukung.
File di _change_data
folder mengikuti kebijakan retensi tabel. Mengubah data umpan data dihapus saat VACUUM
perintah berjalan.
Bisakah saya menggunakan umpan data perubahan untuk memutar ulang seluruh riwayat tabel?
Umpan data perubahan tidak dimaksudkan untuk berfungsi sebagai rekaman permanen dari semua perubahan pada tabel. Ubah umpan data hanya merekam perubahan yang terjadi setelah diaktifkan.
Mengubah umpan data dan Delta Lake memungkinkan Anda untuk selalu membangun ulang rekam jepret lengkap tabel sumber, yang berarti Anda dapat memulai baca streaming baru terhadap tabel dengan umpan data perubahan diaktifkan dan mengambil versi tabel tersebut saat ini dan semua perubahan yang terjadi setelahnya.
Anda harus memperlakukan rekaman dalam umpan data perubahan sebagai sementara dan hanya dapat diakses untuk jendela retensi tertentu. Log transaksi Delta menghapus versi tabel dan versi umpan data perubahan yang sesuai secara berkala. Saat versi dihapus dari log transaksi, Anda tidak dapat lagi membaca umpan data perubahan untuk versi tersebut.
Jika kasus penggunaan Anda mengharuskan mempertahankan riwayat permanen semua perubahan pada tabel, Anda harus menggunakan logika inkremental untuk menulis rekaman dari umpan data perubahan ke tabel baru. Contoh kode berikut menunjukkan penggunaan trigger.AvailableNow
, yang memanfaatkan pemrosesan inkremental Streaming Terstruktur tetapi memproses data yang tersedia sebagai beban kerja batch. Anda dapat menjadwalkan beban kerja ini secara asinkron dengan alur pemrosesan utama Anda untuk membuat cadangan umpan data perubahan untuk tujuan audit atau pemutaran ulang penuh.
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Mengubah batasan umpan data untuk tabel dengan pemetaan kolom diaktifkan
Dengan pemetaan kolom diaktifkan pada tabel Delta, Anda dapat menghilangkan atau mengganti nama kolom dalam tabel tanpa menulis ulang file data untuk data yang sudah ada. Dengan pemetaan kolom diaktifkan, umpan data perubahan memiliki batasan setelah melakukan perubahan skema non-aditif seperti mengganti nama atau menghilangkan kolom, mengubah jenis data, atau perubahan nullability.
Penting
- Anda tidak dapat membaca mengubah umpan data untuk transaksi atau rentang di mana perubahan skema non-aditif terjadi menggunakan semantik batch.
- Di Databricks Runtime 12.2 LTS dan di bawahnya, tabel dengan pemetaan kolom diaktifkan yang mengalami perubahan skema non-aditif tidak mendukung bacaan streaming pada umpan data perubahan. Untuk detailnya, lihat Streaming dengan pemetaan kolom dan perubahan skema.
- Di Databricks Runtime 11.3 LTS dan di bawahnya, Anda tidak dapat membaca mengubah umpan data untuk tabel dengan pemetaan kolom diaktifkan yang telah mengalami penggantian nama kolom atau penurunan.
Di Databricks Runtime 12.2 LTS ke atas, Anda dapat melakukan pembacaan batch pada umpan data perubahan untuk tabel dengan pemetaan kolom diaktifkan yang telah mengalami perubahan skema non-aditif. Alih-alih menggunakan skema tabel versi terbaru, operasi baca menggunakan skema versi akhir tabel yang ditentukan dalam kueri. Kueri masih gagal jika rentang versi yang ditentukan mencakup perubahan skema non-aditif.