Bagikan melalui


AUTO CDC INTO (jalur)

Gunakan pernyataan AUTO CDC ... INTO untuk membuat alur yang memanfaatkan fungsionalitas change data capture (CDC) dari Lakeflow Spark Declarative Pipelines. Pernyataan ini membaca perubahan dari sumber CDC dan menerapkannya ke target streaming.

Syntax

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Anda menentukan batasan kualitas data untuk target menggunakan klausul yang sama CONSTRAINT dengan kueri alur lainnya. Lihat Mengelola kualitas data dengan ekspektasi alur kerja.

Perilaku default untuk INSERT peristiwa dan UPDATE adalah meningkatkan peristiwa CDC dari sumbernya: memperbarui baris apa pun dalam tabel target yang cocok dengan kunci yang ditentukan atau menyisipkan baris baru saat rekaman yang cocok tidak ada di tabel target. Penanganan untuk DELETE peristiwa dapat ditentukan dengan kondisi APPLY AS DELETE WHEN.

Penting

Anda harus mendeklarasikan tabel streaming target untuk menerapkan perubahan ke dalamnya. Anda dapat secara opsional menentukan skema untuk tabel target Anda. Untuk tabel SCD tipe 2, saat menentukan skema tabel target, Anda juga harus menyertakan kolom __START_AT dan __END_AT dengan tipe data yang sama dengan bidang sequence_by.

Lihat API CDC OTOMATIS: Menyederhanakan perubahan pengambilan data dengan alur.

Parameter-parameternya

  • flow_name

    Nama alur yang akan dibuat.

  • source

    Sumber data. Sumber harus berupa sumber streaming . Gunakan kata kunci STREAM untuk menggunakan semantik streaming untuk membaca dari sumbernya. Jika pembacaan mengalami perubahan atau penghapusan pada rekaman yang ada, akan menghasilkan kesalahan. Paling aman untuk membaca dari sumber statis atau yang hanya bisa ditambahkan. Untuk memasukkan data yang memiliki komit perubahan, Anda dapat menggunakan Python dan opsi SkipChangeCommits untuk menangani kesalahan.

    Untuk informasi selengkapnya tentang data streaming, lihat Mengubah data dengan alur.

  • KEYS

    Kolom atau kombinasi kolom yang secara unik mengidentifikasi baris dalam data sumber. Nilai dalam kolom ini digunakan untuk mengidentifikasi peristiwa CDC mana yang berlaku untuk rekaman tertentu dalam tabel target.

    Untuk menentukan kombinasi kolom, gunakan daftar kolom yang dipisahkan koma.

    Klausul ini diperlukan.

  • IGNORE NULL UPDATES

    Memungkinkan pengambilan pembaruan yang berisi sebagian dari kolom target. Ketika peristiwa CDC cocok dengan baris yang ada dan pengaturan IGNORE NULL UPDATES telah ditentukan, kolom yang mempunyai nilai null akan mempertahankan nilai yang ada di target. Ini juga berlaku untuk kolom bersarang dengan nilai null.

    Klausa ini bersifat opsional.

    Secara default, menimpa kolom yang ada dengan nilai null.

  • APPLY AS DELETE WHEN

    Menentukan kapan peristiwa CDC harus diperlakukan sebagai DELETE bukan upsert.

    Untuk sumber SCD tipe 2, untuk menangani data yang tidak berurutan, baris yang dihapus untuk sementara dipertahankan sebagai batu nisan dalam tabel Delta yang mendasar, dan tampilan dibuat di metastore yang memfilter batu nisan ini. Interval retensi dapat dikonfigurasi dengan pipelines.cdc.tombstoneGCThresholdInSecondsproperti tabel.

    Klausa ini bersifat opsional.

  • APPLY AS TRUNCATE WHEN

    Menentukan kapan peristiwa CDC harus diperlakukan sebagai tabel lengkap TRUNCATE. Karena klausa ini memicu pemotongan penuh tabel target, klausul ini harus digunakan hanya untuk kasus penggunaan tertentu yang memerlukan fungsionalitas ini.

    Klausa APPLY AS TRUNCATE WHEN hanya didukung untuk SCD tipe 1. SCD tipe 2 tidak mendukung operasi pemotongan.

    Klausa ini bersifat opsional.

  • SEQUENCE BY

    Nama kolom yang menentukan urutan logis peristiwa CDC dalam data sumber. Pemrosesan alur menggunakan urutan ini untuk menangani peristiwa perubahan yang tiba tidak berurutan.

    Jika beberapa kolom diperlukan untuk pengurutan, gunakan STRUCT ekspresi: kolom akan diurutkan berdasarkan bidang struct pertama terlebih dahulu, lalu dengan bidang kedua jika ada dasi, dan sebagainya.

    Kolom yang ditentukan harus tipe data yang dapat diurutkan.

    Klausa ini diperlukan.

  • COLUMNS

    Menentukan subset kolom untuk disertakan dalam tabel target. Anda dapat:

    • Tentukan daftar lengkap kolom yang akan disertakan: COLUMNS (userId, name, city).
    • Tentukan daftar kolom yang akan dikecualikan: COLUMNS * EXCEPT (operation, sequenceNum)

    Klausa ini bersifat opsional.

    Defaultnya adalah menyertakan semua kolom dalam tabel target saat COLUMNS klausul tidak ditentukan.

  • STORED AS

    Apakah akan menyimpan rekaman sebagai SCD tipe 1 atau SCD tipe 2.

    Klausa ini bersifat opsional.

    Defaultnya adalah SCD tipe 1.

  • TRACK HISTORY ON

    Menentukan subset kolom output untuk menghasilkan rekaman riwayat saat ada perubahan pada kolom yang ditentukan. Anda dapat:

    • Tentukan daftar lengkap kolom yang akan dilacak: COLUMNS (userId, name, city).
    • Tentukan daftar kolom yang akan dikecualikan dari pelacakan: COLUMNS * EXCEPT (operation, sequenceNum)

    Klausa ini bersifat opsional. Defaultnya adalah melacak riwayat untuk semua kolom output ketika ada perubahan apa pun, setara dengan TRACK HISTORY ON *.

Examples

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);