Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Berlaku untuk:
Databricks SQL
Penting
Fitur ini ada di Beta. Memerlukan Databricks Runtime 17.3 ke atas.
FLOW AUTO CDC Gunakan klausa dengan CREATE STREAMING TABLE untuk memproses perubahan rekaman pengambilan data (CDC) dari sumber ke dalam tabel streaming.
Sebelumnya, pernyataan ini MERGE INTO umumnya digunakan untuk memproses rekaman CDC di Azure Databricks. Namun, MERGE INTO dapat menghasilkan hasil yang salah karena rekaman yang tidak berurutan atau memerlukan logika kompleks untuk mengurutkan ulang rekaman.
AUTO CDC menyederhanakan CDC dengan menangani rekaman yang tidak berurutan secara otomatis. Anda menentukan kunci untuk mengidentifikasi rekaman, kolom urutan untuk pemesanan, dan apakah akan menyimpan hasil sebagai SCD tipe 1 (pembaruan langsung) atau SCD tipe 2 (pelacakan riwayat).
Sintaksis
CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Perilaku default untuk INSERT peristiwa dan UPDATE adalah meningkatkan peristiwa CDC dari sumbernya: memperbarui baris apa pun dalam tabel target yang cocok dengan kunci yang ditentukan atau menyisipkan baris baru saat rekaman yang cocok tidak ada di tabel target. Penanganan untuk DELETE peristiwa dapat ditentukan dengan kondisi APPLY AS DELETE WHEN.
Parameter-parameternya
sourceSumber data. Sumber harus berupa sumber streaming. Gunakan kata kunci
STREAMuntuk memanfaatkan semantik streaming dalam membaca dari sumber. Jika pembacaan mengalami perubahan atau penghapusan pada rekaman yang ada, akan menghasilkan kesalahan. Paling aman untuk membaca dari sumber statis atau yang hanya bisa ditambahkan.Untuk informasi selengkapnya tentang data streaming, lihat Mengubah data dengan alur.
KEYSKolom atau kombinasi kolom yang secara unik mengidentifikasi baris dalam data sumber. Nilai dalam kolom ini digunakan untuk mengidentifikasi peristiwa CDC mana yang berlaku untuk rekaman tertentu dalam tabel target.
Untuk menentukan kombinasi kolom, gunakan daftar kolom yang dipisahkan koma.
Klausa ini diperlukan.
IGNORE NULL UPDATESMemungkinkan pengambilan pembaruan yang berisi sebagian dari kolom target. Saat peristiwa CDC cocok dengan baris yang ada dan
IGNORE NULL UPDATESditentukan, kolom dengannullnilai mempertahankan nilai yang ada dalam target. Ini juga berlaku untuk kolom bersarang dengan nilainull.Klausa ini bersifat opsional.
Secara default, menimpa kolom yang ada dengan nilai
null.APPLY AS DELETE WHENMenentukan kapan peristiwa CDC harus diperlakukan sebagai
DELETEbukan upsert.Untuk sumber SCD tipe 2, untuk menangani data yang tidak berurutan, baris yang dihapus untuk sementara dipertahankan sebagai batu nisan dalam tabel Delta yang mendasar, dan tampilan dibuat di metastore yang memfilter batu nisan ini. Interval retensi dapat dikonfigurasi dengan
pipelines.cdc.tombstoneGCThresholdInSecondsproperti tabel.Klausa ini bersifat opsional.
APPLY AS TRUNCATE WHENMenentukan kapan peristiwa CDC harus diperlakukan sebagai tabel lengkap
TRUNCATE. Karena klausa ini memicu pemotongan penuh tabel target, klausul ini harus digunakan hanya untuk kasus penggunaan tertentu yang memerlukan fungsionalitas ini.Klausa
APPLY AS TRUNCATE WHENhanya didukung untuk SCD tipe 1. SCD tipe 2 tidak mendukung operasi pemotongan.Klausa ini bersifat opsional.
SEQUENCE BYNama kolom yang menentukan urutan logis peristiwa CDC dalam data sumber. Pemrosesan alur menggunakan urutan ini untuk menangani peristiwa perubahan yang tiba tidak berurutan.
Jika beberapa kolom diperlukan untuk pengurutan, gunakan
STRUCTekspresi: kolom akan diurutkan berdasarkan bidang struct pertama terlebih dahulu, lalu dengan bidang kedua jika ada dasi, dan sebagainya.Kolom yang ditentukan harus tipe data yang dapat diurutkan.
Klausa ini diperlukan.
COLUMNSMenentukan subset kolom untuk disertakan dalam tabel target. Anda dapat:
- Tentukan daftar lengkap kolom yang akan disertakan:
COLUMNS (userId, name, city). - Tentukan daftar kolom yang akan dikecualikan:
COLUMNS * EXCEPT (operation, sequenceNum)
Klausa ini bersifat opsional.
Defaultnya adalah menyertakan semua kolom dalam tabel target saat
COLUMNSklausul tidak ditentukan.- Tentukan daftar lengkap kolom yang akan disertakan:
STORED ASApakah akan menyimpan rekaman sebagai SCD tipe 1 atau SCD tipe 2.
Klausa ini bersifat opsional.
Defaultnya adalah SCD tipe 1.
TRACK HISTORY ONMenentukan subset kolom output untuk menghasilkan rekaman riwayat saat ada perubahan pada kolom yang ditentukan. Anda dapat:
- Tentukan daftar lengkap kolom yang akan dilacak:
COLUMNS (userId, name, city). - Tentukan daftar kolom yang akan dikecualikan dari pelacakan:
COLUMNS * EXCEPT (operation, sequenceNum)
Klausa ini bersifat opsional. Defaultnya adalah melacak riwayat untuk semua kolom output ketika ada perubahan apa pun, setara dengan
TRACK HISTORY ON *.- Tentukan daftar lengkap kolom yang akan dilacak:
Examples
-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;
-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;
-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);