Mereplikasi tabel RDBMS eksternal menggunakan AUTO CDC

Halaman ini memandu Anda melalui cara mereplikasi tabel dari sistem manajemen database relasional eksternal (RDBMS) ke Azure Databricks menggunakan AUTO CDC API dalam alur. Anda akan mempelajari:

  • Pola umum untuk menyiapkan sumber.
  • Cara melakukan penyalinan penuh satu kali dari data yang ada menggunakan alur kerja once.
  • Cara terus menyerap perubahan baru menggunakan change alur.

Pola ini sangat ideal untuk membangun tabel dimensi (SCD) yang berubah perlahan atau menjaga tabel target tetap sinkron dengan sistem rekaman eksternal.

Sebelum Anda mulai

Panduan ini mengasumsikan bahwa Anda memiliki akses ke himpunan data berikut dari sumber Anda:

  • Rekam jepret lengkap tabel sumber di penyimpanan cloud. Himpunan data ini digunakan untuk beban awal.
  • Umpan perubahan berkelanjutan, diisi ke lokasi penyimpanan cloud yang sama (misalnya, menggunakan Debezium, Kafka, atau CDC berbasis log). Umpan ini adalah input untuk proses yang sedang AUTO CDC berlangsung.

Menyiapkan tampilan sumber

Pertama, tentukan dua tampilan sumber untuk mengisi rdbms_orders tabel target dari jalur orders_snapshot_path penyimpanan cloud. Keduanya dibangun sebagai tampilan streaming atas data mentah di penyimpanan cloud. Menggunakan tampilan memberikan efisiensi yang lebih tinggi karena data tidak harus ditulis sebelum digunakan dalam AUTO CDC proses.

  • Tampilan sumber pertama adalah rekam jepret lengkap (full_orders_snapshot)
  • Yang kedua adalah umpan perubahan berkelanjutan (rdbms_orders_change_feed).

Contoh dalam panduan ini menggunakan penyimpanan cloud sebagai sumbernya, tetapi Anda dapat menggunakan sumber apa pun yang didukung oleh tabel streaming.

full_orders_snapshot()

Langkah ini membuat pipeline dengan tampilan yang membaca cuplikan lengkap awal data pesanan.

Phyton

Contoh Python berikut:

  • Menggunakan spark.readStream dengan Auto Loader (format("cloudFiles"))
  • Membaca file JSON dari direktori yang ditentukan oleh orders_snapshot_path
  • Mengonfigurasi includeExistingFiles menjadi true untuk memastikan data historis yang sudah ada di jalur diproses
  • inferColumnTypes Mengatur ke true untuk menyimpulkan skema secara otomatis
  • Mengembalikan semua kolom dengan .select("\*")
@dp.view()
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(orders_snapshot_path)
        .select("*")
    )

SQL

Contoh SQL berikut meneruskan opsi sebagai peta pasangan kunci-nilai string. orders_snapshot_path harus tersedia sebagai variabel SQL (misalnya, didefinisikan menggunakan parameter alur atau diinterpolasi secara manual).

CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
  "cloudFiles.includeExistingFiles", "true",
  "cloudFiles.inferColumnTypes", "true"
));

rdbms_orders_change_feed()

Langkah ini membuat tampilan kedua yang membaca data perubahan inkremental (misalnya, dari log CDC atau mengubah tabel). Ini membaca dari orders_cdc_path dan mengasumsikan bahwa file JSON bergaya CDC ditempatkan ke jalur ini secara teratur.

Phyton

@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)

SQL

Dalam contoh SQL berikut, ${orders_cdc_path} adalah variabel dan dapat diinterpolasi dengan mengatur nilai dalam pengaturan alur Anda atau secara eksplisit mengatur variabel dalam kode Anda.

CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));

Hidrasi awal (setelah aliran)

Sekarang setelah sumber disiapkan, AUTO CDC logika menggabungkan kedua sumber ke dalam tabel streaming target. Pertama, gunakan alur satu kali AUTO CDC dengan ONCE=TRUE untuk menyalin konten lengkap tabel RDBMS ke dalam tabel streaming. Ini menyiapkan tabel target dengan data historis tanpa memutarnya kembali di pembaruan mendatang.

Phyton

from pyspark import pipelines as dp

# Step 1: Create the target streaming table

dp.create_streaming_table("rdbms_orders")

# Step 2: Once Flow — Load initial snapshot of full RDBMS table

dp.create_auto_cdc_flow(
  flow_name = "initial_load_orders",
  once = True,  # one-time load
  target = "rdbms_orders",
  source = "full_orders_snapshot",  # e.g., ingested from JDBC into bronze
  keys = ["order_id"],
  sequence_by = "timestamp",
  stored_as_scd_type = "1"
)

SQL


-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;

-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

Alur once hanya dijalankan satu kali. File baru yang ditambahkan ke full_orders_snapshot setelah pembuatan alur diabaikan.

Penting

Melakukan refresh penuh pada tabel streaming rdbms_orders menjalankan alur once kembali. Jika data snapshot awal dalam penyimpanan cloud telah dihapus, ini mengakibatkan kehilangan data.

Umpan perubahan berkelanjutan (alur perubahan)

Setelah beban rekam jepret awal, gunakan alur lain AUTO CDC untuk terus menyerap perubahan dari umpan CDC RDBMS. Ini membuat tabel Anda rdbms_orders tetap terbarui dengan sisipan, pembaruan, dan penghapusan.

Phyton

from pyspark import pipelines as dp

# Step 3: Change Flow — Ingest ongoing CDC stream from source system

dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

SQL

-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

Pertimbangan

Idempotensi pengisian ulang data Alur once hanya berjalan kembali saat tabel target sepenuhnya di-refresh.
Beberapa alur Anda dapat menggunakan beberapa alur perubahan untuk menggabungkan koreksi, data yang terlambat tiba, atau umpan alternatif, tetapi semua harus berbagi skema dan kunci.
Pembaruan penuh Refresh penuh pada tabel rdbms_orders streaming memulai ulang proses once. Ini dapat menyebabkan kehilangan data jika lokasi penyimpanan cloud awal telah menghapus data snapshot awal.
Urutan eksekusi alur Urutan pelaksanaan alur tidak penting. Hasil akhirnya sama.

Sumber daya tambahan