Bagikan melalui


Menggunakan umpan data perubahan Delta Lake pada Azure Databricks

Mengubah umpan data memungkinkan Azure Databricks melacak perubahan tingkat baris antara versi tabel Delta. Saat diaktifkan pada tabel Delta, runtime bahasa umum mencatat peristiwa perubahan untuk semua data yang ditulis pada tabel. Ini mencakup data baris bersama dengan metadata yang menunjukkan apakah baris yang ditentukan disisipkan, dihapus, atau diperbarui.

Penting

Ubah umpan data bekerja bersama dengan riwayat tabel untuk memberikan informasi perubahan. Karena mengkloning tabel Delta membuat riwayat terpisah, umpan data perubahan pada tabel kloning tidak cocok dengan tabel asli.

Memproses perubahan data secara bertahap

Databricks merekomendasikan penggunaan umpan data perubahan dalam kombinasi dengan Streaming Terstruktur untuk memproses perubahan secara bertahap dari tabel Delta. Anda harus menggunakan Streaming Terstruktur untuk Azure Databricks untuk melacak versi secara otomatis untuk umpan data perubahan tabel Anda.

Catatan

Tabel Langsung Delta menyediakan fungsionalitas untuk penyebaran data perubahan yang mudah dan menyimpan hasil sebagai tabel SCD (dimensi yang berubah perlahan) tipe 1 atau tipe 2. Lihat TERAPKAN PERUBAHAN API: Menyederhanakan perubahan pengambilan data dengan Delta Live Tables.

Untuk membaca umpan data perubahan dari tabel, Anda harus mengaktifkan umpan data perubahan pada tabel tersebut. Lihat Mengaktifkan umpan data perubahan.

Atur opsi readChangeFeed ke true saat mengonfigurasi aliran terhadap tabel untuk membaca umpan data perubahan, seperti yang ditunjukkan dalam contoh sintaks berikut:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Secara default, aliran mengembalikan rekam jepret terbaru tabel saat aliran pertama kali dimulai sebagai INSERT perubahan dan di masa mendatang saat mengubah data.

Ubah penerapan data sebagai bagian dari transaksi Delta Lake, dan menjadi tersedia pada saat yang sama data baru berkomitmen pada tabel.

Anda dapat secara opsional menentukan versi awal. Lihat Haruskah saya menentukan versi awal?.

Umpan data perubahan juga mendukung eksekusi batch, yang mengharuskan menentukan versi awal. Lihat Membaca perubahan dalam kueri batch.

Opsi seperti batas tarif (maxFilesPerTrigger, maxBytesPerTrigger) dan excludeRegex juga didukung saat membaca data perubahan.

Pembatasan tarif dapat menjadi atom untuk versi selain versi snapshot awal. Artinya, seluruh versi komit akan dibatasi tarif atau seluruh komit akan dikembalikan.

Haruskah saya menentukan versi awal?

Anda dapat secara opsional menentukan versi awal jika Anda ingin mengabaikan perubahan yang terjadi sebelum versi tertentu. Anda dapat menentukan versi menggunakan tanda waktu atau nomor ID versi yang direkam dalam log transaksi Delta.

Catatan

Versi awal diperlukan untuk pembacaan batch, dan banyak pola batch dapat memperoleh manfaat dari pengaturan versi akhir opsional.

Saat Anda mengonfigurasi beban kerja Streaming Terstruktur yang melibatkan umpan data perubahan, penting untuk memahami bagaimana menentukan versi awal memengaruhi pemrosesan.

Banyak beban kerja streaming, terutama alur pemrosesan data baru, mendapat manfaat dari perilaku default. Dengan perilaku default, batch pertama diproses saat aliran pertama kali merekam semua rekaman yang ada dalam tabel sebagai INSERT operasi dalam umpan data perubahan.

Jika tabel target Anda sudah berisi semua rekaman dengan perubahan yang sesuai hingga titik tertentu, tentukan versi awal untuk menghindari pemrosesan status tabel sumber sebagai INSERT peristiwa.

Contoh sintaks berikut memulihkan dari kegagalan streaming di mana titik pemeriksaan rusak. Dalam contoh ini, asumsikan kondisi berikut:

  1. Umpan data perubahan diaktifkan pada tabel sumber pada pembuatan tabel.
  2. Tabel hilir target telah memproses semua perubahan hingga dan termasuk versi 75.
  3. Riwayat versi untuk tabel sumber tersedia untuk versi 70 ke atas.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

Dalam contoh ini, Anda juga harus menentukan lokasi titik pemeriksaan baru.

Penting

Jika Anda menentukan versi awal, aliran gagal dimulai dari titik pemeriksaan baru jika versi awal tidak lagi ada dalam riwayat tabel. Delta Lake membersihkan versi historis secara otomatis, yang berarti bahwa semua versi awal yang ditentukan akhirnya dihapus.

Lihat Bisakah saya menggunakan umpan data perubahan untuk memutar ulang seluruh riwayat tabel?.

Membaca perubahan dalam kueri batch

Anda dapat menggunakan sintaks kueri batch untuk membaca semua perubahan mulai dari versi tertentu atau membaca perubahan dalam rentang versi tertentu.

Anda menentukan versi sebagai bilangan bulat dan stempel waktu sebagai string dalam format yyyy-MM-dd[ HH:mm:ss[.SSS]].

Versi awal dan akhir sudah termasuk dalam kueri. Untuk membaca perubahan dari versi mulai tertentu ke versi terbaru tabel, tentukan hanya versi awal.

Jika Anda memberikan versi yang lebih rendah atau stempel waktu yang lebih tua dari yang telah mencatat peristiwa perubahan, yaitu saat umpan data perubahan diaktifkan, kesalahan dilemparkan yang menunjukkan bahwa umpan data perubahan tidak diaktifkan.

Contoh sintaks berikut menunjukkan menggunakan opsi versi awal dan akhir dengan pembacaan batch:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Catatan

Secara default, jika pengguna meneruskan versi atau stempel waktu yang melebihi penerapan terakhir pada tabel, kesalahan timestampGreaterThanLatestCommit akan muncul. Di Databricks Runtime 11.3 LTS ke atas, umpan data perubahan dapat menangani kasus versi di luar rentang jika pengguna mengatur konfigurasi berikut ke true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Jika Anda memberikan versi awal yang lebih besar dari penerapan terakhir pada tabel atau stempel waktu mulai yang lebih baru dari penerapan terakhir pada tabel, maka saat konfigurasi sebelumnya diaktifkan, hasil baca kosong akan ditampilkan.

Jika Anda memberikan versi akhir yang lebih besar dari penerapan terakhir pada tabel atau stempel waktu akhir yang lebih baru dari penerapan terakhir pada tabel, maka saat konfigurasi sebelumnya diaktifkan dalam mode baca batch, semua perubahan antara versi awal dan penerapan terakhir akan ditampilkan.

Apa skema untuk umpan data perubahan?

Saat Anda membaca dari umpan data perubahan untuk tabel, skema untuk versi tabel terbaru digunakan.

Catatan

Sebagian besar operasi perubahan skema dan evolusi didukung penuh. Tabel dengan pemetaan kolom diaktifkan tidak mendukung semua kasus penggunaan dan menunjukkan perilaku yang berbeda. Lihat Mengubah batasan umpan data untuk tabel dengan pemetaan kolom diaktifkan.

Selain kolom data dari skema tabel Delta, ubah umpan data berisi kolom metadata yang mengidentifikasi jenis peristiwa perubahan:

Nama kolom Jenis Nilai
_change_type String insert, , update_preimage update_postimage, delete (1)
_commit_version Panjang Log Delta atau versi tabel yang berisi perubahan.
_commit_timestamp Tanda Waktu Stempel waktu terkait saat penerapan dibuat.

(1) preimage adalah nilai sebelum pembaruan, postimage adalah nilai setelah pembaruan.

Catatan

Anda tidak dapat mengaktifkan umpan data perubahan pada tabel jika skema berisi kolom dengan nama yang sama dengan kolom yang ditambahkan ini. Ganti nama kolom dalam tabel untuk mengatasi konflik ini sebelum mencoba mengaktifkan umpan data perubahan.

Mengaktifkan umpan data perubahan

Anda hanya dapat membaca umpan data perubahan untuk tabel yang diaktifkan. Anda harus secara eksplisit mengaktifkan opsi ubah umpan data menggunakan salah satu metode berikut:

  • Tabel baru: Atur properti delta.enableChangeDataFeed = true tabel dalam CREATE TABLE perintah.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tabel yang sudah ada: Atur properti delta.enableChangeDataFeed = true tabel dalam ALTER TABLE perintah.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Semua tabel baru:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Penting

Hanya perubahan yang dilakukan setelah Anda mengaktifkan umpan data perubahan yang direkam. Perubahan sebelumnya pada tabel tidak diambil.

Mengubah penyimpanan data

Mengaktifkan umpan data perubahan menyebabkan sedikit peningkatan biaya penyimpanan untuk tabel. Rekaman data perubahan dihasilkan saat kueri berjalan, dan umumnya jauh lebih kecil dari ukuran total file yang ditulis ulang.

Catatan Azure Databricks mengubah data bagi operasi UPDATE, DELETE, dan MERGE pada folder _change_data di bawah direktori tabel Delta. Beberapa operasi, seperti operasi khusus sisipan dan penghapusan partisi penuh, tidak menghasilkan data di _change_data direktori karena Azure Databricks dapat secara efisien menghitung umpan data perubahan langsung dari log transaksi.

Semua bacaan terhadap file data dalam _change_data folder harus melalui API Delta Lake yang didukung.

File di _change_data folder mengikuti kebijakan retensi tabel. Mengubah data umpan data dihapus saat VACUUM perintah berjalan.

Bisakah saya menggunakan umpan data perubahan untuk memutar ulang seluruh riwayat tabel?

Umpan data perubahan tidak dimaksudkan untuk berfungsi sebagai rekaman permanen dari semua perubahan pada tabel. Ubah umpan data hanya merekam perubahan yang terjadi setelah diaktifkan.

Mengubah umpan data dan Delta Lake memungkinkan Anda untuk selalu membangun ulang rekam jepret lengkap tabel sumber, yang berarti Anda dapat memulai baca streaming baru terhadap tabel dengan umpan data perubahan diaktifkan dan mengambil versi tabel tersebut saat ini dan semua perubahan yang terjadi setelahnya.

Anda harus memperlakukan rekaman dalam umpan data perubahan sebagai sementara dan hanya dapat diakses untuk jendela retensi tertentu. Log transaksi Delta menghapus versi tabel dan versi umpan data perubahan yang sesuai secara berkala. Saat versi dihapus dari log transaksi, Anda tidak dapat lagi membaca umpan data perubahan untuk versi tersebut.

Jika kasus penggunaan Anda mengharuskan mempertahankan riwayat permanen semua perubahan pada tabel, Anda harus menggunakan logika inkremental untuk menulis rekaman dari umpan data perubahan ke tabel baru. Contoh kode berikut menunjukkan penggunaan trigger.AvailableNow, yang memanfaatkan pemrosesan inkremental Streaming Terstruktur tetapi memproses data yang tersedia sebagai beban kerja batch. Anda dapat menjadwalkan beban kerja ini secara asinkron dengan alur pemrosesan utama Anda untuk membuat cadangan umpan data perubahan untuk tujuan audit atau pemutaran ulang penuh.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Mengubah batasan umpan data untuk tabel dengan pemetaan kolom diaktifkan

Dengan pemetaan kolom diaktifkan pada tabel Delta, Anda dapat menghilangkan atau mengganti nama kolom dalam tabel tanpa menulis ulang file data untuk data yang sudah ada. Dengan pemetaan kolom diaktifkan, umpan data perubahan memiliki batasan setelah melakukan perubahan skema non-aditif seperti mengganti nama atau menghilangkan kolom, mengubah jenis data, atau perubahan nullability.

Penting

  • Anda tidak dapat membaca mengubah umpan data untuk transaksi atau rentang di mana perubahan skema non-aditif terjadi menggunakan semantik batch.
  • Di Databricks Runtime 12.2 LTS dan di bawahnya, tabel dengan pemetaan kolom diaktifkan yang mengalami perubahan skema non-aditif tidak mendukung bacaan streaming pada umpan data perubahan. Untuk detailnya, lihat Streaming dengan pemetaan kolom dan perubahan skema.
  • Di Databricks Runtime 11.3 LTS dan di bawahnya, Anda tidak dapat membaca mengubah umpan data untuk tabel dengan pemetaan kolom diaktifkan yang telah mengalami penggantian nama kolom atau penurunan.

Di Databricks Runtime 12.2 LTS ke atas, Anda dapat melakukan pembacaan batch pada umpan data perubahan untuk tabel dengan pemetaan kolom diaktifkan yang telah mengalami perubahan skema non-aditif. Alih-alih menggunakan skema tabel versi terbaru, operasi baca menggunakan skema versi akhir tabel yang ditentukan dalam kueri. Kueri masih gagal jika rentang versi yang ditentukan mencakup perubahan skema non-aditif.