Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Penting
Fungsionalitas ini ada di Pratinjau Umum.
Fungsi create_auto_cdc_from_snapshot_flow ini membuat alur yang menggunakan fungsionalitas change data capture (CDC) dari Lakeflow Spark Declarative Pipelines untuk memproses data sumber dari cuplikan database. Lihat Bagaimana CDC diimplementasikan dengan AUTO CDC FROM SNAPSHOT API?.
Nota
Fungsi ini menggantikan fungsi apply_changes_from_snapshot()sebelumnya . Kedua fungsi memiliki tanda tangan yang sama. Databricks merekomendasikan pembaruan untuk menggunakan nama baru.
Penting
Anda harus memiliki tabel streaming target untuk operasi ini.
Untuk membuat tabel target yang diperlukan, Anda bisa menggunakan fungsi create_streaming_table().
Syntax
from pyspark import pipelines as dp
dp.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Nota
Untuk AUTO CDC FROM SNAPSHOT pemrosesan, perilaku defaultnya adalah menyisipkan baris baru saat rekaman yang cocok dengan kunci yang sama tidak ada di target. Jika rekaman yang cocok memang ada, rekaman tersebut hanya diperbarui jika salah satu nilai dalam baris telah berubah. Baris dengan kunci yang ada di target tetapi tidak lagi ada di sumber akan dihapus.
Untuk mempelajari selengkapnya tentang pemrosesan CDC dengan rekam jepret, lihat API CDC OTOMATIS: Menyederhanakan perubahan pengambilan data dengan alur. Untuk contoh penggunaan create_auto_cdc_from_snapshot_flow() fungsi, lihat contoh penyerapan rekam jepret berkala dan penyerapan rekam jepret historis .
Parameter-parameternya
| Pengaturan | Tipe | Description |
|---|---|---|
target |
str |
Dibutuhkan. Nama tabel yang akan diperbarui. Anda dapat menggunakan fungsi create_streaming_table() untuk membuat tabel target sebelum menjalankan create_auto_cdc_from_snapshot_flow() fungsi. |
source |
str atau lambda function |
Dibutuhkan. Baik nama tabel atau view untuk diambil cuplikannya secara berkala atau fungsi lambda Python yang mengembalikan DataFrame cuplikan yang akan diproses dan versi cuplikan tersebut. Lihat Menerapkan source argumen. |
keys |
list |
Dibutuhkan. Kolom atau kombinasi kolom yang secara unik mengidentifikasi baris dalam data sumber. Ini digunakan untuk mengidentifikasi peristiwa CDC mana yang berlaku untuk rekaman tertentu dalam tabel target. Anda dapat menentukan:
|
stored_as_scd_type |
str atau int |
Apakah akan menyimpan rekaman sebagai SCD tipe 1 atau SCD tipe 2. Atur ke 1 untuk SCD tipe 1 atau 2 untuk SCD tipe 2. Defaultnya adalah SCD tipe 1. |
track_history_column_list atau track_history_except_column_list |
list |
Sebagian kolom keluaran yang perlu dilacak untuk riwayat dalam tabel sasaran. Gunakan track_history_column_list untuk menentukan daftar lengkap kolom yang akan dilacak. Gunakan track_history_except_column_list untuk menentukan kolom yang akan dikecualikan dari pelacakan. Anda dapat mendeklarasikan nilai sebagai daftar string atau sebagai fungsi Spark SQL col() :
Argumen ke col() fungsi tidak dapat menyertakan kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId). Secara default, semua kolom dalam tabel target disertakan ketika tidak ada argumen track_history_column_list atau track_history_except_column_list diberikan ke fungsi. |
source Menerapkan argumen
Fungsi create_auto_cdc_from_snapshot_flow() mengandung argumen source. Untuk memproses rekam jepret historis, source argumen diharapkan menjadi fungsi lambda Python yang mengembalikan dua nilai ke create_auto_cdc_from_snapshot_flow() fungsi: Python DataFrame yang berisi data rekam jepret yang akan diproses dan versi rekam jepret.
Berikut ini adalah tanda tangan fungsi lambda:
lambda Any => Optional[(DataFrame, Any)]
- Argumen untuk fungsi lambda adalah versi rekam jepret yang terakhir diproses.
- Nilai pengembalian fungsi lambda adalah
Noneatau tuple dari dua nilai: Nilai pertama tuple adalah DataFrame yang berisi rekam jepret yang akan diproses. Nilai kedua dari tuple adalah versi cuplikan yang mewakili urutan logis dari cuplikan tersebut.
Contoh yang mengimplementasikan dan memanggil fungsi lambda:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)
Runtime Lakeflow Spark Declarative Pipelines melakukan langkah-langkah berikut setiap kali alur kerja yang mengandung fungsi create_auto_cdc_from_snapshot_flow() dipicu:
-
next_snapshot_and_versionMenjalankan fungsi untuk memuat DataFrame rekam jepret berikutnya dan versi rekam jepret yang sesuai. - Jika tidak ada DataFrame yang dikembalikan, eksekusi dihentikan dan pembaruan alur ditandai sebagai selesai.
- Mendeteksi perubahan dalam rekam jepret baru dan menerapkannya secara bertahap ke tabel target.
- Kembali ke langkah #1 untuk memuat rekam jepret berikutnya dan versinya.