Menggunakan umpan data perubahan Delta Lake pada Azure Databricks

Catatan

Mengubah umpan data memungkinkan Azure Databricks melacak perubahan tingkat baris antara versi tabel Delta. Saat diaktifkan pada tabel Delta, runtime bahasa umum mencatat peristiwa perubahan untuk semua data yang ditulis pada tabel. Ini mencakup data baris bersama dengan metadata yang menunjukkan apakah baris yang ditentukan disisipkan, dihapus, atau diperbarui.

Anda bisa membaca peristiwa perubahan dalam kueri batch menggunakan Spark SQL, Apache Spark DataFrames, dan Structured Streaming.

Penting

Ubah umpan data bekerja bersama dengan riwayat tabel untuk memberikan informasi perubahan. Karena mengkloning tabel Delta membuat riwayat terpisah, umpan data perubahan pada tabel kloning tidak cocok dengan tabel asli.

Kasus penggunaan

Umpan data perubahan tidak diaktifkan secara default. Kasus penggunaan berikut akan diaktifkan saat Anda mengaktifkan umpan data perubahan.

  • Tabel Perak dan Emas: Meningkatkan performa Delta dengan hanya memproses perubahan tingkat baris setelah awal operasi MERGE, UPDATE, atau DELETE guna mempercepat dan menyederhanakan operasi ETL dan ELT.
  • Tampilan materialisasi: Buat tampilan informasi yang up-to-date dan agregat untuk digunakan di BI dan analitik tanpa harus memproses ulang tabel dasar lengkap, alih-alih hanya memperbarui di mana perubahan telah terjadi.
  • Transmisikan perubahan: Kirim umpan data perubahan ke sistem hilir seperti Kafka atau RDBMS yang dapat menggunakannya untuk memproses secara bertahap di tahap selanjutnya dari alur data.
  • Tabel jejak audit: Ambil umpan data perubahan sebagai tabel Delta menyediakan penyimpanan abadi dan kemampuan kueri yang efisien untuk melihat semua perubahan dari waktu ke waktu, termasuk kapan penghapusan terjadi dan pembaruan apa yang dibuat.

Mengaktifkan umpan data perubahan

Anda harus secara eksplisit mengaktifkan opsi ubah umpan data menggunakan salah satu metode berikut:

  • Tabel baru: Atur properti delta.enableChangeDataFeed = true tabel dalam CREATE TABLE perintah.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tabel yang sudah ada: Atur properti delta.enableChangeDataFeed = true tabel dalam ALTER TABLE perintah.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Semua tabel baru:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Penting

Hanya perubahan yang dilakukan setelah Anda mengaktifkan umpan data perubahan yang dicatat; perubahan masa lalu ke tabel tidak diambil.

Mengubah penyimpanan data

Catatan Azure Databricks mengubah data bagi operasi UPDATE, DELETE, dan MERGE pada folder _change_data di bawah direktori tabel Delta. Sejumlah operasi, seperti operasi hanya-masukkan dan penghapusan partisi penuh tidak menghasilkan data pada direktori _change_data, karena Azure Databricks dapat menghitung umpan data perubahan secara langsung dari log transaksi.

File di _change_data folder mengikuti kebijakan retensi tabel. Oleh karena itu, jika Anda menjalankan perintah VACUUM, mengubah data umpan data juga dihapus.

Membaca perubahan dalam kueri batch

Anda dapat menyediakan versi atau stempel waktu untuk awal dan akhir. Versi awal dan akhir dan stempel waktu inklusif dalam kueri. Untuk membaca perubahan dari versi mulai tertentu ke versi terbaru tabel, tentukan hanya versi awal atau stempel waktu.

Anda menentukan versi sebagai bilangan bulat dan stempel waktu sebagai string dalam format yyyy-MM-dd[ HH:mm:ss[.SSS]].

Jika Anda memberikan versi yang lebih rendah atau stempel waktu yang lebih tua dari yang telah mencatat peristiwa perubahan, yaitu saat umpan data perubahan diaktifkan, kesalahan dilemparkan yang menunjukkan bahwa umpan data perubahan tidak diaktifkan.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

Membaca perubahan dalam kueri streaming

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Untuk mendapatkan data perubahan saat membaca tabel, atur opsi readChangeFeed ke true. startingVersionataustartingTimestamp bersifat opsional dan jika tidak disediakan, aliran mengembalikan snapshot terbaru tabel pada saat streaming sebagai INSERT perubahan dan masa mendatang sebagai data perubahan. Opsi seperti batas tarif (maxFilesPerTrigger, maxBytesPerTrigger) dan excludeRegex juga didukung saat membaca data perubahan.

Catatan

Pembatasan tarif dapat menjadi atom untuk versi selain versi snapshot awal. Artinya, seluruh versi komit akan dibatasi tarif atau seluruh komit akan dikembalikan.

Secara default, jika pengguna meneruskan versi atau stempel waktu yang melebihi penerapan terakhir pada tabel, kesalahan timestampGreaterThanLatestCommit akan muncul. Di Databricks Runtime 11.3 LTS ke atas, umpan data perubahan dapat menangani kasus versi di luar rentang jika pengguna mengatur konfigurasi berikut ke true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Jika Anda memberikan versi awal yang lebih besar dari penerapan terakhir pada tabel atau stempel waktu mulai yang lebih baru dari penerapan terakhir pada tabel, maka saat konfigurasi sebelumnya diaktifkan, hasil baca kosong akan ditampilkan.

Jika Anda memberikan versi akhir yang lebih besar dari penerapan terakhir pada tabel atau stempel waktu akhir yang lebih baru dari penerapan terakhir pada tabel, maka saat konfigurasi sebelumnya diaktifkan dalam mode baca batch, semua perubahan antara versi awal dan penerapan terakhir akan ditampilkan.

Apa skema untuk umpan data perubahan?

Saat Anda membaca dari umpan data perubahan untuk tabel, skema untuk versi tabel terbaru digunakan.

Catatan

Sebagian besar operasi perubahan skema dan evolusi didukung penuh. Tabel dengan pemetaan kolom diaktifkan tidak mendukung semua kasus penggunaan dan menunjukkan perilaku yang berbeda. Lihat Mengubah batasan umpan data untuk tabel dengan pemetaan kolom diaktifkan.

Selain kolom data dari skema tabel Delta, ubah umpan data berisi kolom metadata yang mengidentifikasi jenis peristiwa perubahan:

Nama kolom Jenis Nilai
_change_type String insert, update_preimage , update_postimage, delete(1)
_commit_version Panjang Log Delta atau versi tabel yang berisi perubahan.
_commit_timestamp Tanda Waktu Stempel waktu terkait saat penerapan dibuat.

(1)preimage adalah nilai sebelum pembaruan, postimage adalah nilai setelah pembaruan.

Catatan

Anda tidak dapat mengaktifkan umpan data perubahan pada tabel jika skema berisi kolom dengan nama yang sama dengan kolom yang ditambahkan ini. Ganti nama kolom dalam tabel untuk mengatasi konflik ini sebelum mencoba mengaktifkan umpan data perubahan.

Mengubah batasan umpan data untuk tabel dengan pemetaan kolom diaktifkan

Dengan pemetaan kolom diaktifkan pada tabel Delta, Anda dapat menghilangkan atau mengganti nama kolom dalam tabel tanpa menulis ulang file data untuk data yang sudah ada. Dengan pemetaan kolom diaktifkan, umpan data perubahan memiliki batasan setelah melakukan perubahan skema non-aditif seperti mengganti nama atau menghilangkan kolom, mengubah jenis data, atau perubahan nullability.

Penting

  • Anda tidak dapat membaca mengubah umpan data untuk transaksi atau rentang di mana perubahan skema non-aditif terjadi menggunakan semantik batch.
  • Di Databricks Runtime 12.2 LTS dan di bawahnya, tabel dengan pemetaan kolom diaktifkan yang mengalami perubahan skema non-aditif tidak mendukung bacaan streaming pada umpan data perubahan. Untuk detailnya, lihat Streaming dengan pemetaan kolom dan perubahan skema.
  • Di Databricks Runtime 11.3 LTS dan di bawahnya, Anda tidak dapat membaca mengubah umpan data untuk tabel dengan pemetaan kolom diaktifkan yang telah mengalami penggantian nama kolom atau penurunan.

Di Databricks Runtime 12.2 LTS ke atas, Anda dapat melakukan pembacaan batch pada umpan data perubahan untuk tabel dengan pemetaan kolom diaktifkan yang telah mengalami perubahan skema non-aditif. Alih-alih menggunakan skema tabel versi terbaru, operasi baca menggunakan skema versi akhir tabel yang ditentukan dalam kueri. Kueri masih gagal jika rentang versi yang ditentukan mencakup perubahan skema non-aditif.

Pertanyaan Umum (FAQ)

Berapa overhead untuk mengaktifkan umpan data perubahan?

Tidak ada dampak yang signifikan. Catatan data perubahan dibuat sejalan selama proses eksekusi kueri, dan umumnya jauh lebih kecil dari ukuran total file yang ditulis ulang.

Apa kebijakan retensi untuk catatan perubahan?

Mengubah catatan mengikuti kebijakan retensi yang sama dengan versi tabel yang sudah kedaluwarsa, dan akan dibersihkan melalui VACUUM jika berada di luar periode retensi yang ditentukan.

Kapan rekaman baru tersedia di feed data perubahan?

Data perubahan dilakukan bersama dengan transaksi Delta Lake, dan akan tersedia pada saat yang sama dengan data baru tersedia dalam tabel.

Contoh buku catatan: Menyebarkan perubahan dengan umpan data perubahan Delta

Notebook ini menunjukkan cara menyebarkan perubahan yang dibuat ke tabel perak jumlah mutlak vaksinasi ke tabel emas tingkat vaksinasi.

Mengubah buku catatan umpan data

Dapatkan buku catatan