TERAPKAN PERUBAHAN API: Menyederhanakan perubahan pengambilan data di Tabel Langsung Delta
Tabel Langsung Delta menyederhanakan pengubahan pengambilan data (CDC) dengan APPLY CHANGES
API. Sebelumnya, pernyataan ini MERGE INTO
umumnya digunakan untuk memproses rekaman CDC di Azure Databricks. Namun, MERGE INTO
dapat menghasilkan hasil yang salah karena rekaman yang tidak berurutan, atau memerlukan logika kompleks untuk memesan ulang rekaman.
Dengan menangani rekaman yang tidak berurutan secara otomatis, APPLY CHANGES
API dalam Tabel Langsung Delta memastikan pemrosesan rekaman CDC yang benar dan menghapus kebutuhan untuk mengembangkan logika kompleks untuk menangani rekaman yang tidak berurutan.
APPLY CHANGES
API didukung di antarmuka Delta Live Tables SQL dan Python, termasuk dukungan untuk memperbarui tabel dengan SCD jenis 1 dan tipe 2:
- Gunakan SCD jenis 1 untuk memperbarui rekaman secara langsung. Riwayat tidak dipertahankan bagi rekaman yang diperbarui.
- Gunakan SCD tipe 2 untuk menyimpan riwayat rekaman, baik pada semua pembaruan atau pembaruan untuk sekumpulan kolom tertentu.
Untuk sintaks dan referensi lainnya, lihat:
- Mengubah pengambilan data dengan Python di Delta Live Tables
- Mengubah pengambilan data dengan SQL di Tabel Langsung Delta
- Mengontrol manajemen batu nisan untuk kueri SCD tipe 1
Catatan
Artikel ini menjelaskan cara memperbarui tabel di saluran Delta Live Tables Anda berdasarkan perubahan pada data sumber. Untuk mempelajari cara merekam dan mengkueri informasi perubahan tingkat baris untuk tabel Delta, lihat Menggunakan umpan data perubahan Delta Lake di Azure Databricks.
Bagaimana CDC diimplementasikan dengan Delta Live Tables?
Anda harus menentukan kolom dalam data sumber untuk mengurutkan rekaman, yang ditafsirkan Delta Live Tables sebagai representasi yang meningkat secara monoton dari pengurutan data sumber yang tepat. Tabel Langsung Delta secara otomatis menangani data yang tiba di luar urutan. Untuk perubahan SCD Tipe 2, Tabel Langsung Delta menyebarluaskan nilai urutan yang sesuai ke __START_AT
kolom dan __END_AT
tabel target. Harus ada satu pembaruan yang berbeda per kunci pada setiap nilai urutan, dan nilai urutan NULL tidak didukung.
Untuk melakukan pemrosesan CDC dengan Tabel Langsung Delta, Anda terlebih dahulu membuat tabel streaming, lalu menggunakan APPLY CHANGES INTO
pernyataan untuk menentukan sumber, kunci, dan pengurutan umpan perubahan. Untuk membuat tabel streaming target, gunakan CREATE OR REFRESH STREAMING TABLE
pernyataan di SQL atau create_streaming_table()
fungsi di Python. Untuk membuat pernyataan yang menentukan pemrosesan CDC, gunakan APPLY CHANGES
pernyataan di SQL atau apply_changes()
fungsi di Python. Untuk detail sintaks, lihat Mengubah pengambilan data dengan SQL di Tabel Langsung Delta atau Mengubah pengambilan data dengan Python di Tabel Langsung Delta.
Objek data apa yang digunakan untuk pemrosesan CDC Delta Live Tables?
Saat Anda mendeklarasikan tabel target di metastore Apache Hive, dua struktur data dibuat:
- Tampilan menggunakan nama yang ditetapkan ke tabel target.
- Tabel dukungan internal yang digunakan oleh Delta Live Tables untuk mengelola pemrosesan CDC. Tabel ini dinamai dengan prepending
__apply_changes_storage_
ke nama tabel target.
Misalnya, jika Anda mendeklarasikan tabel target bernama dlt_cdc_target
, Anda akan melihat tampilan bernama dlt_cdc_target
dan tabel bernama __apply_changes_storage_dlt_cdc_target
di metastore. Membuat tampilan memungkinkan Tabel Langsung Delta memfilter informasi tambahan (misalnya, batu nisan dan versi) yang diperlukan untuk menangani data yang tidak berurutan. Untuk menampilkan data yang diproses, kueri tampilan target. Karena skema __apply_changes_storage_
tabel mungkin berubah untuk mendukung fitur atau penyempurnaan di masa mendatang, Anda tidak boleh mengkueri tabel untuk penggunaan produksi. Jika Anda menambahkan data secara manual ke tabel, rekaman diasumsikan datang sebelum perubahan lain karena kolom versi hilang.
Jika alur diterbitkan ke Katalog Unity, tabel dukungan internal tidak dapat diakses oleh pengguna.
Mendapatkan data tentang rekaman yang diproses oleh kueri CDC Tabel Langsung Delta
Metrik berikut diambil oleh apply changes
kueri:
num_upserted_rows
: Jumlah baris output yang di-upsert ke dalam himpunan data selama pembaruan.num_deleted_rows
: Jumlah baris output yang ada yang dihapus dari himpunan data selama pembaruan.
num_output_rows
Metrik, yang merupakan output untuk alur non-CDC, tidak diambil untuk apply changes
kueri.
Batasan
Target APPLY CHANGES INTO
kueri atau apply_changes
fungsi tidak dapat digunakan sebagai sumber untuk tabel streaming. Tabel yang membaca dari target APPLY CHANGES INTO
kueri atau apply_changes
fungsi harus berupa tampilan materialisasi.
SCD jenis 1 dan SCD tipe 2 di Azure Databricks
Bagian berikut ini menyediakan contoh yang menunjukkan kueri SCD Tabel Langsung Delta tipe 1 dan tipe 2 yang memperbarui tabel target berdasarkan peristiwa sumber yang:
- Buat catatan pengguna baru.
- Menghapus catatan pengguna.
- Memperbarui rekaman milik pengguna. Dalam contoh SCD jenis 1, operasi terakhir
UPDATE
datang terlambat dan dihilangkan dari tabel target, menunjukkan penanganan peristiwa yang tidak berurutan.
Contoh berikut mengasumsikan keakraban dengan mengonfigurasi dan memperbarui alur Delta Live Tables. Lihat Tutorial: Menjalankan alur Tabel Langsung Delta pertama Anda.
Untuk menjalankan contoh ini, Anda harus mulai dengan membuat himpunan data sampel. Lihat Membuat data pengujian.
Berikut ini adalah catatan input untuk contoh tersebut:
userId | nama | kota | operasi | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Jika Anda membatalkan komentar baris akhir dalam contoh data, baris tersebut akan menyisipkan catatan berikut yang menentukan di mana rekaman harus dipotong:
userId | nama | kota | operasi | sequenceNum |
---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
Catatan
Semua contoh berikut mencakup opsi untuk menentukan operasi DELETE
dan TRUNCATE
, tetapi masing-masing bersifat opsional.
Proses pembaruan SCD tipe 1
Contoh kode berikut menunjukkan pemrosesan pembaruan SCD jenis 1:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Setelah menjalankan SCD jenis 1 dalam contoh ini, tabel target akan berisi catatan berikut:
userId | nama | kota |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
Setelah menjalankan contoh SCD jenis 1 dengan catatan tambahan TRUNCATE
, catatan 124
dan 126
dipotong karena operasi TRUNCATE
di sequenceNum=3
, dan tabel target berisi catatan berikut:
userId | nama | kota |
---|---|---|
125 | Mercedes | Guadalajara |
Proses pembaruan SCD tipe 2
Contoh kode berikut menunjukkan pemrosesan pembaruan SCD jenis 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Setelah menjalankan contoh SCD jenis 2, tabel target akan berisi catatan berikut:
userId | nama | kota | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | nihil |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | nihil |
126 | Lily | Cancun | 2 | nihil |
Kueri SCD tipe 2 juga dapat menentukan subset kolom output yang akan dilacak untuk riwayat dalam tabel target. Perubahan pada kolom lain diperbarui di tempat daripada menghasilkan rekaman riwayat baru. Contoh berikut menunjukkan mengecualikan city
kolom dari pelacakan:
Contoh berikut menunjukkan menggunakan riwayat trek dengan SCD tipe 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Setelah menjalankan contoh ini tanpa rekaman tambahan TRUNCATE
, tabel target berisi rekaman berikut:
userId | nama | kota | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | nihil |
125 | Mercedes | Guadalajara | 2 | nihil |
126 | Lily | Cancun | 2 | nihil |
Hasilkan data pengujian
Kode di bawah ini disediakan untuk menghasilkan contoh himpunan data untuk digunakan dalam contoh kueri yang ada dalam tutorial ini. Dengan asumsi Bahwa Anda memiliki kredensial yang tepat untuk membuat skema baru dan membuat tabel baru, Anda dapat menjalankan pernyataan ini dengan notebook atau Databricks SQL. Kode berikut tidak dimaksudkan untuk dijalankan sebagai bagian dari alur Delta Live Tables:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Menambahkan, mengubah, atau menghapus data dalam tabel streaming target
Jika alur Anda menerbitkan tabel ke Unity Catalog, Anda dapat menggunakan pernyataan bahasa manipulasi data (DML), termasuk pernyataan sisipkan, perbarui, hapus, dan gabungkan, untuk mengubah tabel streaming target yang dibuat oleh APPLY CHANGES INTO
pernyataan.
Catatan
- Pernyataan DML yang mengubah skema tabel tabel streaming tidak didukung. Pastikan bahwa pernyataan DML Anda tidak mencoba mengembangkan skema tabel.
- Pernyataan DML yang memperbarui tabel streaming hanya dapat dijalankan di kluster Unity Catalog bersama atau gudang SQL menggunakan Databricks Runtime 13.3 LTS ke atas.
- Karena streaming memerlukan sumber data khusus tambahan, jika pemrosesan Anda memerlukan streaming dari tabel streaming sumber dengan perubahan (misalnya, menurut pernyataan DML), atur bendera skipChangeCommits saat membaca tabel streaming sumber. Saat
skipChangeCommits
diatur, transaksi yang menghapus atau mengubah rekaman pada tabel sumber diabaikan. Jika pemrosesan Anda tidak memerlukan tabel streaming, Anda dapat menggunakan tampilan materialisasi (yang tidak memiliki batasan khusus tambahan) sebagai tabel target.
Karena Tabel Langsung Delta menggunakan kolom tertentu dan menyebarluaskan SEQUENCE BY
nilai urutan yang sesuai ke __START_AT
kolom dan __END_AT
tabel target (untuk SCD tipe 2), Anda harus memastikan bahwa pernyataan DML menggunakan nilai yang valid untuk kolom ini untuk mempertahankan urutan rekaman yang tepat. Lihat Bagaimana CDC diimplementasikan dengan Delta Live Tables?.
Untuk informasi selengkapnya tentang menggunakan pernyataan DML dengan tabel streaming, lihat Menambahkan, mengubah, atau menghapus data dalam tabel streaming.
Contoh berikut menyisipkan rekaman aktif dengan urutan awal 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);