Bagikan melalui


create_auto_cdc_flow

Fungsi ini create_auto_cdc_flow() membuat alur yang menggunakan fungsionalitas Lakeflow Spark Declarative Pipelines change data capture (CDC) untuk memproses data sumber dari umpan data perubahan (CDF).

Nota

Fungsi ini menggantikan fungsi apply_changes()sebelumnya . Kedua fungsi memiliki tanda tangan yang sama. Databricks merekomendasikan pembaruan untuk menggunakan nama baru.

Penting

Anda harus menentukan tabel streaming target tempat menerapkan perubahan. Anda dapat secara opsional menentukan skema untuk tabel target Anda. Saat menentukan skema tabel target create_auto_cdc_flow(), Anda harus menyertakan kolom __START_AT dan __END_AT dengan tipe data yang sama seperti bidang sequence_by.

Untuk membuat tabel target yang diperlukan, Anda dapat menggunakan fungsi create_streaming_table() di antarmuka Python alur.

Syntax

from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = <bool>,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None,
  name = None,
  once = <bool>
)

Untuk pemrosesan create_auto_cdc_flow, perilaku default untuk kejadian INSERT dan UPDATE adalah melakukan upsert untuk kejadian CDC dari sumber: memperbarui baris apa pun dalam tabel target yang cocok dengan kunci yang ditentukan, atau menyisipkan baris baru saat tidak ada catatan yang cocok dalam tabel target. Penanganan untuk DELETE peristiwa dapat ditentukan dengan apply_as_deletes parameter .

Untuk mempelajari selengkapnya tentang pemrosesan CDC dengan umpan perubahan, lihat API CDC OTOMATIS: Menyederhanakan perubahan pengambilan data dengan alur. Untuk contoh penggunaan create_auto_cdc_flow() fungsi, lihat Contoh: pemrosesan SCD tipe 1 dan SCD tipe 2 dengan data sumber CDF.

Parameter-parameternya

Pengaturan Tipe Description
target str Dibutuhkan. Nama tabel yang akan diperbarui. Anda dapat menggunakan fungsi create_streaming_table() untuk membuat tabel target sebelum menjalankan create_auto_cdc_flow() fungsi.
source str Dibutuhkan. Sumber data yang memuat catatan CDC.
keys list Dibutuhkan. Kolom atau kombinasi kolom yang secara unik mengidentifikasi baris dalam data sumber. Ini digunakan untuk mengidentifikasi peristiwa CDC mana yang berlaku untuk rekaman tertentu dalam tabel target. Anda dapat menentukan:
  • Daftar string: ["userId", "orderId"]
  • Daftar fungsi Spark SQL col() : [col("userId"), col("orderId")]. Argumen ke col() fungsi tidak dapat menyertakan kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId).
sequence_by str, col(), atau struct() Dibutuhkan. Nama kolom yang menentukan urutan logis peristiwa CDC dalam data sumber. Lakeflow Spark Declarative Pipelines menggunakan urutan ini untuk menangani peristiwa perubahan yang tiba tidak berurutan. Kolom yang ditentukan harus berupa tipe data yang dapat diurutkan. Anda dapat menentukan:
  • Sebuah string: "sequenceNum"
  • Fungsi Spark SQL col() : col("sequenceNum"). Argumen ke col() fungsi tidak dapat menyertakan kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId).
  • Menggabungkan struct() beberapa kolom untuk memecahkan kebuntuan: struct("timestamp_col", "id_col"), akan diurutkan berdasarkan field struct pertama terlebih dahulu, lalu dengan field kedua jika ada kebuntuan, dan sebagainya.
ignore_null_updates bool Izinkan pengolahan pembaruan yang berisi subset kolom target. Saat peristiwa CDC cocok dengan baris yang ada dan ignore_null_updates adalah True, kolom dengan null mempertahankan nilai yang ada dalam target. Ini juga berlaku untuk kolom berlapis dengan nilai null. Ketika ignore_null_updates adalah False, nilai yang ada ditimpa dengan nilai null.
Defaultnya adalah False.
apply_as_deletes str atau expr() Menentukan kapan peristiwa CDC harus diperlakukan sebagai DELETE bukan upsert. Anda dapat menentukan:
  • Sebuah string: "Operation = 'DELETE'"
  • Fungsi Spark SQL expr() : expr("Operation = 'DELETE'")

Untuk menangani data yang tidak berurutan, baris yang dihapus untuk sementara dipertahankan sebagai batu nisan dalam tabel Delta yang mendasar, dan tampilan dibuat di metastore yang memfilter batu nisan ini. Interval retensi secara default adalah dua hari, dan dapat dikonfigurasi ulang dengan properti tabel pipelines.cdc.tombstoneGCThresholdInSeconds.
apply_as_truncates str atau expr() Menentukan kapan peristiwa CDC harus diperlakukan sebagai tabel lengkap TRUNCATE. Anda dapat menentukan:
  • Sebuah string: "Operation = 'TRUNCATE'"
  • Fungsi Spark SQL expr() : expr("Operation = 'TRUNCATE'")

Karena klausa ini memicu pemotongan penuh tabel target, klausul ini harus digunakan hanya untuk kasus penggunaan tertentu yang memerlukan fungsionalitas ini. Parameter apply_as_truncates hanya didukung untuk SCD tipe 1. SCD tipe 2 tidak mendukung operasi truncasi.
column_list atau except_column_list list Subset kolom yang akan disertakan dalam tabel target. Gunakan column_list untuk menentukan daftar lengkap kolom yang akan disertakan. Gunakan except_column_list untuk menentukan kolom yang akan dikecualikan. Anda dapat mendeklarasikan nilai sebagai daftar string atau sebagai fungsi Spark SQL col() :
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Argumen ke col() fungsi tidak dapat menyertakan kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId). Secara default, ketika tidak ada column_list atau except_column_list argumen diteruskan ke fungsi, semua kolom disertakan ke dalam tabel target.
stored_as_scd_type str atau int Apakah akan menyimpan rekaman sebagai SCD tipe 1 atau SCD tipe 2. Atur ke 1 untuk SCD tipe 1 atau 2 untuk SCD tipe 2. Defaultnya adalah SCD tipe 1.
track_history_column_list atau track_history_except_column_list list Sebagian kolom keluaran yang perlu dilacak untuk riwayat dalam tabel sasaran. Gunakan track_history_column_list untuk menentukan daftar lengkap kolom yang akan dilacak. Gunakan track_history_except_column_list untuk menentukan kolom yang akan dikecualikan dari pelacakan. Anda dapat mendeklarasikan nilai sebagai daftar string atau sebagai fungsi Spark SQL col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumen ke col() fungsi tidak dapat menyertakan kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId). Secara default, ketika tidak ada track_history_column_list atau track_history_except_column_list argumen diteruskan ke fungsi, semua kolom disertakan ke dalam tabel target.
name str Nama alur. Jika tidak disediakan, default ke nilai yang sama dengan target.
once bool Secara opsional, tentukan alur sebagai alur satu kali, seperti isi ulang. Dengan menggunakan once=True, alurnya dapat diubah dengan dua cara:
  • Nilai yang dikembalikan. streaming-query. harus berupa DataFrame batch dalam hal ini, bukan DataFrame streaming.
  • Alur dijalankan satu kali secara default. Jika alur diperbarui dengan pembaruan lengkap, maka ONCE alur akan berjalan kembali untuk membuat ulang data.