Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Alur Deklaratif Lakeflow Spark menyederhanakan penangkapan perubahan data (CDC) dengan AUTO CDC dan AUTO CDC FROM SNAPSHOT API. API ini mengotomatiskan kompleksitas komputasi dimensi yang berubah secara perlahan (SCD) Tipe 1 dan Tipe 2 dari umpan CDC atau rekam jepret database. Untuk mempelajari selengkapnya tentang konsep ini, lihat Mengubah tangkapan data dan rekam jepret.
Nota
AUTO CDC API menggantikan APPLY CHANGES API dan memiliki sintaks yang sama.
APPLY CHANGES API masih tersedia, tetapi Databricks merekomendasikan penggunaan AUTO CDC API di tempatnya.
API yang Anda gunakan bergantung pada sumber data perubahan Anda:
-
AUTO CDC: Gunakan ini saat database sumber mengaktifkan umpan CDC.AUTO CDCmemproses perubahan dari aliran data perubahan (CDF). Ini didukung dalam antarmuka SQL dan Python yang berbasis pipelining. -
AUTO CDC FROM SNAPSHOT: Gunakan ini ketika CDC tidak diaktifkan pada database sumber dan hanya rekam jepret yang tersedia. API ini membandingkan rekam jepret untuk menentukan perubahan lalu memprosesnya. Ini hanya didukung di antarmuka Python.
Kedua API mendukung pembaruan tabel menggunakan SCD Tipe 1 dan Tipe 2:
- Gunakan SCD Tipe 1 untuk memperbarui rekaman secara langsung. Riwayat tidak dipertahankan untuk rekaman yang diperbarui.
- Gunakan SCD Tipe 2 untuk menyimpan riwayat rekaman, baik pada semua pembaruan atau pembaruan untuk sekumpulan kolom tertentu.
AUTO CDC API tidak didukung oleh Alur Deklaratif Apache Spark.
Untuk sintaksis dan referensi lainnya, lihat AUTO CDC INTO (pipelines), create_auto_cdc_flow, dan create_auto_cdc_from_snapshot_flow.
Nota
Halaman ini menjelaskan cara memperbarui tabel di alur Anda berdasarkan perubahan data sumber. Untuk mempelajari cara merekam dan mengkueri informasi perubahan tingkat baris untuk tabel Delta, lihat Menggunakan umpan data perubahan Delta Lake di Azure Databricks.
Persyaratan
Untuk menggunakan CDC API, alur Anda harus dikonfigurasi untuk menggunakan SDP tanpa server atau SDP Pro atau Advancededisi.
Cara kerja AUTO CDC
Untuk melakukan pemrosesan CDC dengan AUTO CDC, buat tabel streaming lalu gunakan AUTO CDC ... INTO pernyataan di SQL atau create_auto_cdc_flow() fungsi di Python untuk menentukan sumber, kunci, dan pengurutan umpan perubahan. Untuk penjelasan tentang cara kerja pengurutan dan logika SCD, lihat Mengubah pengambilan data dan rekam jepret. Lihat contoh OTOMATIS CDC.
Untuk hidrasi awal dari sumber dengan umpan perubahan, gunakan AUTO CDC dengan once alur lalu lanjutkan pemrosesan umpan perubahan. Lihat Mereplikasi tabel RDBMS eksternal menggunakan CDC OTOMATIS.
Untuk detail sintaks, lihat AUTO CDC INTO (pipelines) atau create_auto_cdc_flow.
Cara kerja AUTO CDC FROM SNAPSHOT
AUTO CDC FROM SNAPSHOT menentukan perubahan dalam data sumber dengan membandingkan rekam jepret dalam urutan. Ini hanya didukung di antarmuka alur Python. Anda dapat membaca rekam jepret dari tabel Delta, file penyimpanan cloud, atau JDBC secara langsung.
Untuk melakukan pemrosesan CDC dengan AUTO CDC FROM SNAPSHOT, buat tabel streaming lalu gunakan create_auto_cdc_from_snapshot_flow() fungsi untuk menentukan rekam jepret, kunci, dan argumen lainnya. Untuk detail tentang dua pola penyerapan dan kapan menggunakan masing-masing pola, lihat Pola pemrosesan rekam jepret. Lihat contoh AUTO CDC FROM SNAPSHOT.
Untuk detail sintaks, lihat create_auto_cdc_from_snapshot_flow.
Menggunakan beberapa kolom untuk pengurutan
Untuk mengurutkan menurut beberapa kolom (misalnya, tanda waktu dan ID untuk memutuskan ikatan), gunakan STRUCT untuk menggabungkannya. API mengurutkan berdasarkan bidang pertama terlebih dahulu, dan jika terjadi kedudukan seri, mempertimbangkan bidang kedua, dan seterusnya.
SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)
Phyton
sequence_by = struct("timestamp_col", "id_col")
Contoh CDC OTOMATIS
Contoh berikut menunjukkan pemrosesan SCD Tipe 1 dan Tipe 2 menggunakan sumber umpan data perubahan. Data sampel membuat rekaman pengguna baru, menghapus rekaman pengguna, dan memperbarui rekaman pengguna. Dalam contoh SCD Tipe 1, operasi terakhir UPDATE datang terlambat dan dihilangkan dari tabel target, menunjukkan penanganan peristiwa yang tidak berurutan.
Berikut ini adalah catatan input yang digunakan dalam contoh ini. Data ini dibuat dengan menjalankan kueri di bagian Buat data sampel .
| userId | Nama | city | operasi | nomorUrut |
|---|---|---|---|---|
| 124 | Raul | Oaxaca | INSERT | 1 |
| 123 | Isabel | Monterrey | INSERT | 1 |
| 125 | Mercedes | Tijuana | INSERT | 2 |
| 126 | Lily | Cancun | INSERT | 2 |
| 123 | nol | nol | DELETE | 6 |
| 125 | Mercedes | Guadalajara | UPDATE | 6 |
| 125 | Mercedes | Mexicali | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
Jika Anda menghapus komentar baris akhir dalam kueri pembuatan data sampel, baris tersebut menambahkan catatan berikut yang menentukan untuk mengosongkan tabel (mengosongkan tabel) di sequenceNum=3.
| userId | Nama | city | operasi | nomorUrut |
|---|---|---|---|---|
| nol | nol | nol | MENGHAPUS | 3 |
Nota
Semua contoh berikut menyertakan opsi untuk menentukan operasi DELETE dan TRUNCATE , tetapi masing-masing bersifat opsional.
Membuat data sampel
Jalankan pernyataan berikut untuk membuat himpunan data sampel. Kode ini tidak dimaksudkan untuk dijalankan sebagai bagian dari definisi alur. Jalankan dari folder eksplorasi alur Anda, bukan folder transformasi.
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Proses pembaruan SCD Tipe 1
SCD Tipe 1 hanya menyimpan versi terbaru dari setiap rekaman. Contoh berikut membaca dari umpan data perubahan yang dibuat di atas dan menerapkan perubahan pada target tabel streaming. Kembangkan Alur Deklaratif Lakeflow Spark untuk menjalankan kode ini.
Phyton
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_current")
dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
CREATE OR REFRESH STREAMING TABLE users_current;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_current
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Setelah menjalankan contoh SCD Tipe 1, tabel target berisi rekaman berikut:
| userId | Nama | city |
|---|---|---|
| 124 | Raul | Oaxaca |
| 125 | Mercedes | Guadalajara |
| 126 | Lily | Cancun |
Pengguna 123 (Isabel) dihapus dan tidak muncul. Pengguna 125 (Mercedes) memperlihatkan hanya kota terbaru (Guadalajara) karena SCD Tipe 1 mengganti nilai sebelumnya. Pembaruan sebelumnya di UPDATE pada sequenceNum=5 dihapus karena pembaruan yang lebih baru di sequenceNum=6 sudah tersedia.
Setelah menjalankan contoh dengan rekaman yang TRUNCATE telah dihapus komentar, tabel dibersihkan di sequenceNum=3. Ini berarti bahwa rekaman 124 dan 126 tidak ada dalam tabel, dan tabel target akhir hanya berisi rekaman berikut:
| userId | Nama | city |
|---|---|---|
| 125 | Mercedes | Guadalajara |
Proses pembaruan SCD Tipe 2
SCD Tipe 2 mempertahankan riwayat lengkap perubahan dengan membuat baris baru untuk setiap versi catatan, dengan kolom __START_AT dan kolom __END_AT yang menunjukkan periode aktif setiap versi.
Phyton
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Setelah menjalankan contoh SCD Tipe 2, tabel target berisi rekaman berikut:
| userId | Nama | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Monterrey | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | Raul | Oaxaca | 1 | nol |
| 125 | Mercedes | Tijuana | 2 | 5 |
| 125 | Mercedes | Mexicali | 5 | 6 |
| 125 | Mercedes | Guadalajara | 6 | nol |
| 126 | Lily | Cancun | 2 | nol |
Tabel menyimpan seluruh riwayat. Pengguna 123 memiliki dua versi (berakhir pada urutan 6 saat dihapus). Pengguna 125 memiliki tiga versi yang menunjukkan perubahan kota. Rekaman dengan __END_AT = null saat ini aktif.
Melacak subkumpulan kolom dengan Tipe 2 SCD
Secara default, SCD Tipe 2 membuat versi baru setiap kali nilai kolom berubah. Anda dapat menentukan subset kolom yang akan dilacak, sehingga perubahan pada kolom lain memperbarui versi saat ini di tempat daripada membuat rekaman riwayat baru.
Contoh berikut mengecualikan city kolom dari pelacakan riwayat:
Phyton
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Karena city perubahan tidak dilacak, update kota menggantikan baris yang ada alih-alih membuat versi baru. Tabel target berisi rekaman berikut:
| userId | Nama | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | Raul | Oaxaca | 1 | nol |
| 125 | Mercedes | Guadalajara | 2 | nol |
| 126 | Lily | Cancun | 2 | nol |
Contoh CDC Otomatis dari Cuplikan
Bagian berikut ini menyediakan contoh penggunaan AUTO CDC FROM SNAPSHOT untuk memproses rekam jepret ke dalam tabel target SCD Tipe 1 atau Tipe 2. Untuk latar belakang tentang kapan menggunakan API ini, lihat Mengubah tangkapan data dan rekam jepret.
Contoh: Memproses cuplikan menggunakan waktu pemasukan ke dalam alur pemrosesan
Gunakan pendekatan ini ketika cuplikan tiba dengan teratur dalam urutan yang tepat dan Anda dapat mengandalkan tanda waktu pelaksanaan alur untuk penentuan versi. Cuplikan baru diintegrasikan ke dalam setiap pembaruan pipeline.
Anda dapat membaca rekam jepret dari beberapa jenis sumber, termasuk tabel Delta, file penyimpanan cloud, dan koneksi JDBC.
Langkah 1: Membuat data sampel
Buat tabel yang berisi data rekam jepret. Jalankan kode berikut dari notebook atau Databricks SQL di explorations folder alur Anda:
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);
INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');
Langkah 2: Jalankan CDC OTOMATIS DARI REKAM JEPRET
Kembangkan Alur Deklaratif Lakeflow Spark untuk menjalankan kode dalam langkah ini.
Pilih jenis sumber untuk tampilan rekam jepret (kode pembuatan sampel menghasilkan tabel Delta):
Opsi A: Membaca dari tabel Delta
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")
Opsi B: Membaca dari penyimpanan awan
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")
Opsi C: Baca dari JDBC (hanya komputasi klasik)
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
Semua opsi, tulis ke target
Kemudian tambahkan tabel dan alur target:
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)
Setelah pemrosesan jalur pertama, semua catatan dimasukkan sebagai baris aktif.
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | nol |
| 2 | Monterrey | 0 | nol |
| 3 | Tijuana | 0 | nol |
Nota
Untuk menggunakan SCD Tipe 1 sebagai gantinya dan hanya mempertahankan status saat ini, atur stored_as_scd_type=1. Dalam hal ini, tabel target tidak menyertakan kolom __START_AT dan __END_AT.
Langkah 3: Mensimulasikan rekam jepret baru dan menjalankan ulang
Perbarui tabel sumber untuk mensimulasikan rekam jepret baru yang tiba (jalankan kode ini dari buku catatan atau file SQL di explorations folder pipline Anda):
TRUNCATE TABLE main.cdc_tutorial.snapshot;
INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');
Jalankan pipeline lagi.
AUTO CDC FROM SNAPSHOT membandingkan rekam jepret baru dengan yang sebelumnya dan mendeteksi bahwa pengguna 1 dihapus, pengguna 2 dan 3 diperbarui, dan pengguna 4 dan 6 dimasukkan. Ini menghasilkan umpan perubahan, dan menggunakan AUTO CDC untuk membuat tabel output.
Setelah eksekusi kedua dengan SCD Tipe 2, tabel target berisi rekaman berikut:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | 1 |
| 2 | Monterrey | 0 | 1 |
| 2 | Carmel | 1 | nol |
| 3 | Tijuana | 0 | 1 |
| 3 | Los Angeles | 1 | nol |
| 4 | Lembah Kematian | 1 | nol |
| 6 | Kings Canyon | 1 | nol |
Pengguna 1 berakhir (dihapus). Pengguna 2 dan 3 masing-masing memiliki dua versi yang menunjukkan perubahan kota mereka. Pengguna 4 dan 6 baru disisipkan.
Setelah eksekusi kedua dengan SCD Tipe 1, tabel target hanya memperlihatkan status saat ini:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Lembah Kematian |
| 6 | Kings Canyon |
Contoh: Memproses rekam jepret menggunakan fungsi versi
Gunakan pendekatan ini saat Anda memerlukan kontrol eksplisit atas pengurutan rekam jepret. Misalnya, gunakan pendekatan ini saat beberapa rekam jepret tiba pada saat yang sama, atau rekam jepret tidak berurutan. Anda menulis fungsi yang menentukan rekam jepret mana yang akan diproses berikutnya dan nomor versinya. API memproses rekam jepret dalam urutan versi naik:
- Jika beberapa rekam jepret berada dalam penyimpanan, semuanya diproses secara berurutan.
- Jika rekam jepret tiba tidak berurutan (misalnya,
snapshot_3tiba setelahsnapshot_4), rekam jepret dilewati. - Jika tidak ada rekam jepret baru, fungsi akan kembali
Nonedan tidak ada pemrosesan yang terjadi.
Langkah 1: Menyiapkan file rekam jepret
Buat file CSV yang berisi data rekam jepret dan tambahkan ke volume atau lokasi penyimpanan cloud. Beri nama file secara kronologis (misalnya, snapshot_1.csv, snapshot_2.csv).
Setiap file harus berisi kolom untuk userId dan city. Contohnya:
snapshot_1.csv:
| userId | city |
|---|---|
| 1 | Oaxaca |
| 2 | Monterrey |
| 3 | Tijuana |
snapshot_2.csv:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Lembah Kematian |
Langkah 2: Jalankan AUTO CDC FROM SNAPSHOT dengan fungsi versi
Buat buku catatan baru dan tempelkan kode alur berikut. Kemudian Kembangkan Alur Deklaratif Lakeflow Spark.
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data
files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]
snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue
snapshot_versions.sort()
if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None
snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)
dp.create_streaming_table("main.cdc_tutorial.target_versioned")
dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
Nota
Untuk menggunakan SCD Tipe 1, atur stored_as_scd_type=1.
Setelah memproses snapshot_1.csv, tabel target berisi rekaman berikut:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | nol |
| 2 | Monterrey | 1 | nol |
| 3 | Tijuana | 1 | nol |
Setelah memproses snapshot_2.csv, tabel target berisi rekaman berikut:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | 2 |
| 2 | Monterrey | 1 | 2 |
| 2 | Carmel | 2 | nol |
| 3 | Tijuana | 1 | 2 |
| 3 | Los Angeles | 2 | nol |
| 4 | Lembah Kematian | 2 | nol |
Nota
Ingatlah bahwa, untuk SCD Tipe 1, tabel terlihat persis seperti rekam jepret terbaru. Perbedaannya adalah bahwa kueri hilir dapat menggunakan umpan perubahan untuk hanya memproses rekaman yang diubah.
Langkah 3: Tambahkan rekam jepret baru
Tambahkan file CSV baru ke lokasi penyimpanan dengan data yang dimodifikasi (misalnya, nilai kota yang diubah, baris baru, atau baris yang dihapus). Kemudian jalankan alur lagi untuk memproses rekam jepret baru.
Keterbatasan
- Kolom pengurutan harus merupakan tipe data yang dapat diurutkan.
NULLnilai urutan tidak didukung. -
AUTO CDC FROM SNAPSHOThanya didukung di antarmuka alur Python; antarmuka SQL tidak didukung.
Sumber daya tambahan
- Mengubah tangkapan data dan rekam jepret: Pelajari tentang konsep CDC, rekam jepret, dan jenis SCD.
-
Replikasi tabel RDBMS eksternal menggunakan
AUTO CDC: Pelajari cara melakukan hidrasi awal denganoncealur lalu lanjutkan pemrosesan perubahan. - Topik CDC OTOMATIS Tingkat Lanjut: Pelajari tentang operasi perubahan pada target CDC OTOMATIS, membaca umpan data perubahan, dan metrik pemrosesan.
- Tutorial: Membangun alur ETL menggunakan penangkapan data perubahan