Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Mereplikasi tabel RDBMS eksternal menggunakan
Halaman ini memandu Anda melalui cara mereplikasi tabel dari sistem manajemen database relasional eksternal (RDBMS) ke Azure Databricks menggunakan AUTO CDC API dalam alur. Anda akan mempelajari:
- Pola umum untuk menyiapkan sumber.
- Cara melakukan penyalinan penuh satu kali dari data yang ada menggunakan alur kerja
once. - Cara terus menyerap perubahan baru menggunakan
changealur.
Pola ini sangat ideal untuk membangun tabel dimensi (SCD) yang berubah perlahan atau menjaga tabel target tetap sinkron dengan sistem rekaman eksternal.
Sebelum Anda mulai
Panduan ini mengasumsikan bahwa Anda memiliki akses ke himpunan data berikut dari sumber Anda:
- Rekam jepret lengkap tabel sumber di penyimpanan cloud. Himpunan data ini digunakan untuk beban awal.
- Umpan perubahan berkelanjutan, diisi ke lokasi penyimpanan cloud yang sama (misalnya, menggunakan Debezium, Kafka, atau CDC berbasis log). Umpan ini adalah input untuk proses yang sedang
AUTO CDCberlangsung.
Menyiapkan tampilan sumber
Pertama, tentukan dua tampilan sumber untuk mengisi rdbms_orders tabel target dari jalur orders_snapshot_path penyimpanan cloud. Keduanya dibangun sebagai tampilan streaming atas data mentah di penyimpanan cloud. Menggunakan tampilan memberikan efisiensi yang lebih tinggi karena data tidak harus ditulis sebelum digunakan dalam AUTO CDC proses.
- Tampilan sumber pertama adalah rekam jepret lengkap (
full_orders_snapshot) - Yang kedua adalah umpan perubahan berkelanjutan (
rdbms_orders_change_feed).
Contoh dalam panduan ini menggunakan penyimpanan cloud sebagai sumbernya, tetapi Anda dapat menggunakan sumber apa pun yang didukung oleh tabel streaming.
full_orders_snapshot()
Langkah ini membuat pipeline dengan tampilan yang membaca cuplikan lengkap awal data pesanan.
Phyton
Contoh Python berikut:
- Menggunakan
spark.readStreamdengan Auto Loader (format("cloudFiles")) - Membaca file JSON dari direktori yang ditentukan oleh
orders_snapshot_path - Mengonfigurasi
includeExistingFilesmenjaditrueuntuk memastikan data historis yang sudah ada di jalur diproses -
inferColumnTypesMengatur ketrueuntuk menyimpulkan skema secara otomatis - Mengembalikan semua kolom dengan
.select("\*")
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
SQL
Contoh SQL berikut meneruskan opsi sebagai peta pasangan kunci-nilai string.
orders_snapshot_path harus tersedia sebagai variabel SQL (misalnya, didefinisikan menggunakan parameter alur atau diinterpolasi secara manual).
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
Langkah ini membuat tampilan kedua yang membaca data perubahan inkremental (misalnya, dari log CDC atau mengubah tabel). Ini membaca dari orders_cdc_path dan mengasumsikan bahwa file JSON bergaya CDC ditempatkan ke jalur ini secara teratur.
Phyton
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
SQL
Dalam contoh SQL berikut, ${orders_cdc_path} adalah variabel dan dapat diinterpolasi dengan mengatur nilai dalam pengaturan alur Anda atau secara eksplisit mengatur variabel dalam kode Anda.
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
Hidrasi awal (setelah aliran)
Sekarang setelah sumber disiapkan, AUTO CDC logika menggabungkan kedua sumber ke dalam tabel streaming target. Pertama, gunakan alur satu kali AUTO CDC dengan ONCE=TRUE untuk menyalin konten lengkap tabel RDBMS ke dalam tabel streaming. Ini menyiapkan tabel target dengan data historis tanpa memutarnya kembali di pembaruan mendatang.
Phyton
from pyspark import pipelines as dp
# Step 1: Create the target streaming table
dp.create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Alur once hanya dijalankan satu kali. File baru yang ditambahkan ke full_orders_snapshot setelah pembuatan alur diabaikan.
Penting
Melakukan refresh penuh pada tabel streaming rdbms_orders menjalankan alur once kembali. Jika data snapshot awal dalam penyimpanan cloud telah dihapus, ini mengakibatkan kehilangan data.
Umpan perubahan berkelanjutan (alur perubahan)
Setelah beban rekam jepret awal, gunakan alur lain AUTO CDC untuk terus menyerap perubahan dari umpan CDC RDBMS. Ini membuat tabel Anda rdbms_orders tetap terbarui dengan sisipan, pembaruan, dan penghapusan.
Phyton
from pyspark import pipelines as dp
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Pertimbangan
| Idempotensi pengisian ulang data | Alur once hanya berjalan kembali saat tabel target sepenuhnya di-refresh. |
|---|---|
| Beberapa alur | Anda dapat menggunakan beberapa alur perubahan untuk menggabungkan koreksi, data yang terlambat tiba, atau umpan alternatif, tetapi semua harus berbagi skema dan kunci. |
| Pembaruan penuh | Refresh penuh pada tabel rdbms_orders streaming memulai ulang proses once. Ini dapat menyebabkan kehilangan data jika lokasi penyimpanan cloud awal telah menghapus data snapshot awal. |
| Urutan eksekusi alur | Urutan pelaksanaan alur tidak penting. Hasil akhirnya sama. |
Sumber daya tambahan
- Konektor SQL Server yang dikelola sepenuhnya di Lakeflow Connect