Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Gunakan pernyataan AUTO CDC ... INTO untuk membuat alur yang memanfaatkan fungsionalitas change data capture (CDC) dari Lakeflow Spark Declarative Pipelines. Pernyataan ini membaca perubahan dari sumber CDC dan menerapkannya ke target streaming.
- Untuk mempelajari tentang CDC, lihat Apa itu pengubahan pengambilan data (CDC)?.
- Untuk detail selengkapnya tentang menggunakan
AUTO CDC, lihat API CDC OTOMATIS: Menyederhanakan perubahan pengambilan data dengan alur. - Untuk informasi lebih lanjut mengenai
CREATE FLOW, lihat CREATE FLOW (pipelines).
Syntax
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Anda menentukan batasan kualitas data untuk target menggunakan klausul yang sama CONSTRAINT dengan kueri alur lainnya. Lihat Mengelola kualitas data dengan ekspektasi alur kerja.
Perilaku default untuk INSERT peristiwa dan UPDATE adalah meningkatkan peristiwa CDC dari sumbernya: memperbarui baris apa pun dalam tabel target yang cocok dengan kunci yang ditentukan atau menyisipkan baris baru saat rekaman yang cocok tidak ada di tabel target. Penanganan untuk DELETE peristiwa dapat ditentukan dengan kondisi APPLY AS DELETE WHEN.
Penting
Anda harus mendeklarasikan tabel streaming target untuk menerapkan perubahan ke dalamnya. Anda dapat secara opsional menentukan skema untuk tabel target Anda. Untuk tabel SCD tipe 2, saat menentukan skema tabel target, Anda juga harus menyertakan kolom __START_AT dan __END_AT dengan tipe data yang sama dengan bidang sequence_by.
Lihat API CDC OTOMATIS: Menyederhanakan perubahan pengambilan data dengan alur.
Parameter-parameternya
flow_nameNama alur yang akan dibuat.
sourceSumber data. Sumber harus berupa sumber streaming . Gunakan kata kunci STREAM untuk menggunakan semantik streaming untuk membaca dari sumbernya. Jika pembacaan mengalami perubahan atau penghapusan pada rekaman yang ada, akan menghasilkan kesalahan. Paling aman untuk membaca dari sumber statis atau yang hanya bisa ditambahkan. Untuk memasukkan data yang memiliki komit perubahan, Anda dapat menggunakan Python dan opsi
SkipChangeCommitsuntuk menangani kesalahan.Untuk informasi selengkapnya tentang data streaming, lihat Mengubah data dengan alur.
KEYSKolom atau kombinasi kolom yang secara unik mengidentifikasi baris dalam data sumber. Nilai dalam kolom ini digunakan untuk mengidentifikasi peristiwa CDC mana yang berlaku untuk rekaman tertentu dalam tabel target.
Untuk menentukan kombinasi kolom, gunakan daftar kolom yang dipisahkan koma.
Klausul ini diperlukan.
IGNORE NULL UPDATESMemungkinkan pengambilan pembaruan yang berisi sebagian dari kolom target. Ketika peristiwa CDC cocok dengan baris yang ada dan pengaturan IGNORE NULL UPDATES telah ditentukan, kolom yang mempunyai nilai
nullakan mempertahankan nilai yang ada di target. Ini juga berlaku untuk kolom bersarang dengan nilainull.Klausa ini bersifat opsional.
Secara default, menimpa kolom yang ada dengan nilai
null.APPLY AS DELETE WHENMenentukan kapan peristiwa CDC harus diperlakukan sebagai
DELETEbukan upsert.Untuk sumber SCD tipe 2, untuk menangani data yang tidak berurutan, baris yang dihapus untuk sementara dipertahankan sebagai batu nisan dalam tabel Delta yang mendasar, dan tampilan dibuat di metastore yang memfilter batu nisan ini. Interval retensi dapat dikonfigurasi dengan
pipelines.cdc.tombstoneGCThresholdInSecondsproperti tabel.Klausa ini bersifat opsional.
APPLY AS TRUNCATE WHENMenentukan kapan peristiwa CDC harus diperlakukan sebagai tabel lengkap
TRUNCATE. Karena klausa ini memicu pemotongan penuh tabel target, klausul ini harus digunakan hanya untuk kasus penggunaan tertentu yang memerlukan fungsionalitas ini.Klausa
APPLY AS TRUNCATE WHENhanya didukung untuk SCD tipe 1. SCD tipe 2 tidak mendukung operasi pemotongan.Klausa ini bersifat opsional.
SEQUENCE BYNama kolom yang menentukan urutan logis peristiwa CDC dalam data sumber. Pemrosesan alur menggunakan urutan ini untuk menangani peristiwa perubahan yang tiba tidak berurutan.
Jika beberapa kolom diperlukan untuk pengurutan, gunakan
STRUCTekspresi: kolom akan diurutkan berdasarkan bidang struct pertama terlebih dahulu, lalu dengan bidang kedua jika ada dasi, dan sebagainya.Kolom yang ditentukan harus tipe data yang dapat diurutkan.
Klausa ini diperlukan.
COLUMNSMenentukan subset kolom untuk disertakan dalam tabel target. Anda dapat:
- Tentukan daftar lengkap kolom yang akan disertakan:
COLUMNS (userId, name, city). - Tentukan daftar kolom yang akan dikecualikan:
COLUMNS * EXCEPT (operation, sequenceNum)
Klausa ini bersifat opsional.
Defaultnya adalah menyertakan semua kolom dalam tabel target saat
COLUMNSklausul tidak ditentukan.- Tentukan daftar lengkap kolom yang akan disertakan:
STORED ASApakah akan menyimpan rekaman sebagai SCD tipe 1 atau SCD tipe 2.
Klausa ini bersifat opsional.
Defaultnya adalah SCD tipe 1.
TRACK HISTORY ONMenentukan subset kolom output untuk menghasilkan rekaman riwayat saat ada perubahan pada kolom yang ditentukan. Anda dapat:
- Tentukan daftar lengkap kolom yang akan dilacak:
COLUMNS (userId, name, city). - Tentukan daftar kolom yang akan dikecualikan dari pelacakan:
COLUMNS * EXCEPT (operation, sequenceNum)
Klausa ini bersifat opsional. Defaultnya adalah melacak riwayat untuk semua kolom output ketika ada perubahan apa pun, setara dengan
TRACK HISTORY ON *.- Tentukan daftar lengkap kolom yang akan dilacak:
Examples
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);