Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Fungsi ini create_auto_cdc_flow() membuat alur yang menggunakan fungsionalitas Lakeflow Spark Declarative Pipelines change data capture (CDC) untuk memproses data sumber dari umpan data perubahan (CDF).
Nota
Fungsi ini menggantikan fungsi apply_changes()sebelumnya . Kedua fungsi memiliki tanda tangan yang sama. Databricks merekomendasikan pembaruan untuk menggunakan nama baru.
Penting
Anda harus menentukan tabel streaming target tempat menerapkan perubahan. Anda dapat secara opsional menentukan skema untuk tabel target Anda. Saat menentukan skema tabel target create_auto_cdc_flow(), Anda harus menyertakan kolom __START_AT dan __END_AT dengan tipe data yang sama seperti bidang sequence_by.
Untuk membuat tabel target yang diperlukan, Anda dapat menggunakan fungsi create_streaming_table() di antarmuka Python alur.
Syntax
from pyspark import pipelines as dp
dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)
Untuk pemrosesan create_auto_cdc_flow, perilaku default untuk kejadian INSERT dan UPDATE adalah melakukan upsert untuk kejadian CDC dari sumber: memperbarui baris apa pun dalam tabel target yang cocok dengan kunci yang ditentukan, atau menyisipkan baris baru saat tidak ada catatan yang cocok dalam tabel target. Penanganan untuk DELETE peristiwa dapat ditentukan dengan apply_as_deletes parameter .
Untuk mempelajari selengkapnya tentang pemrosesan CDC dengan umpan perubahan, lihat API CDC OTOMATIS: Menyederhanakan perubahan pengambilan data dengan alur. Untuk contoh penggunaan create_auto_cdc_flow() fungsi, lihat Contoh: pemrosesan SCD tipe 1 dan SCD tipe 2 dengan data sumber CDF.
Parameter-parameternya
| Pengaturan | Tipe | Description |
|---|---|---|
target |
str |
Dibutuhkan. Nama tabel yang akan diperbarui. Anda dapat menggunakan fungsi create_streaming_table() untuk membuat tabel target sebelum menjalankan create_auto_cdc_flow() fungsi. |
source |
str |
Dibutuhkan. Sumber data yang memuat catatan CDC. |
keys |
list |
Dibutuhkan. Kolom atau kombinasi kolom yang secara unik mengidentifikasi baris dalam data sumber. Ini digunakan untuk mengidentifikasi peristiwa CDC mana yang berlaku untuk rekaman tertentu dalam tabel target. Anda dapat menentukan:
|
sequence_by |
str, col(), atau struct() |
Dibutuhkan. Nama kolom yang menentukan urutan logis peristiwa CDC dalam data sumber. Lakeflow Spark Declarative Pipelines menggunakan urutan ini untuk menangani peristiwa perubahan yang tiba tidak berurutan. Kolom yang ditentukan harus berupa tipe data yang dapat diurutkan. Anda dapat menentukan:
|
ignore_null_updates |
bool |
Izinkan pengolahan pembaruan yang berisi subset kolom target. Saat peristiwa CDC cocok dengan baris yang ada dan ignore_null_updates adalah True, kolom dengan null mempertahankan nilai yang ada dalam target. Ini juga berlaku untuk kolom berlapis dengan nilai null. Ketika ignore_null_updates adalah False, nilai yang ada ditimpa dengan nilai null.Defaultnya adalah False. |
apply_as_deletes |
str atau expr() |
Menentukan kapan peristiwa CDC harus diperlakukan sebagai DELETE bukan upsert. Anda dapat menentukan:
Untuk menangani data yang tidak berurutan, baris yang dihapus untuk sementara dipertahankan sebagai batu nisan dalam tabel Delta yang mendasar, dan tampilan dibuat di metastore yang memfilter batu nisan ini. Interval retensi secara default adalah dua hari, dan dapat dikonfigurasi ulang dengan properti tabel pipelines.cdc.tombstoneGCThresholdInSeconds. |
apply_as_truncates |
str atau expr() |
Menentukan kapan peristiwa CDC harus diperlakukan sebagai tabel lengkap TRUNCATE. Anda dapat menentukan:
Karena klausa ini memicu pemotongan penuh tabel target, klausul ini harus digunakan hanya untuk kasus penggunaan tertentu yang memerlukan fungsionalitas ini. Parameter apply_as_truncates hanya didukung untuk SCD tipe 1. SCD tipe 2 tidak mendukung operasi truncasi. |
column_list atau except_column_list |
list |
Subset kolom yang akan disertakan dalam tabel target. Gunakan column_list untuk menentukan daftar lengkap kolom yang akan disertakan. Gunakan except_column_list untuk menentukan kolom yang akan dikecualikan. Anda dapat mendeklarasikan nilai sebagai daftar string atau sebagai fungsi Spark SQL col() :
Argumen ke col() fungsi tidak dapat menyertakan kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId). Secara default, ketika tidak ada column_list atau except_column_list argumen diteruskan ke fungsi, semua kolom disertakan ke dalam tabel target. |
stored_as_scd_type |
str atau int |
Apakah akan menyimpan rekaman sebagai SCD tipe 1 atau SCD tipe 2. Atur ke 1 untuk SCD tipe 1 atau 2 untuk SCD tipe 2. Defaultnya adalah SCD tipe 1. |
track_history_column_list atau track_history_except_column_list |
list |
Sebagian kolom keluaran yang perlu dilacak untuk riwayat dalam tabel sasaran. Gunakan track_history_column_list untuk menentukan daftar lengkap kolom yang akan dilacak. Gunakan track_history_except_column_list untuk menentukan kolom yang akan dikecualikan dari pelacakan. Anda dapat mendeklarasikan nilai sebagai daftar string atau sebagai fungsi Spark SQL col() :
Argumen ke col() fungsi tidak dapat menyertakan kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId). Secara default, ketika tidak ada track_history_column_list atau track_history_except_column_list argumen diteruskan ke fungsi, semua kolom disertakan ke dalam tabel target. |
name |
str |
Nama alur. Jika tidak disediakan, default ke nilai yang sama dengan target. |
once |
bool |
Secara opsional, tentukan alur sebagai alur satu kali, seperti isi ulang. Dengan menggunakan once=True, alurnya dapat diubah dengan dua cara:
|