ETL di Databricks SQL

Saat berhadapan dengan data dalam jumlah besar, Anda memerlukan alur yang hanya dapat memproses rekaman baru dan yang diubah alih-alih memproses ulang seluruh himpunan data. Ini disebut ETL inkremental. Di Databricks SQL, Anda dapat membangun alur ETL inkremental menggunakan tabel streaming dan tampilan materialisasi, tanpa menulis kode prosedural atau menjadwalkan refresh manual.

Tutorial ini memandu Anda melalui pola umum: melacak perubahan produk dari waktu ke waktu. Anda membuat tabel sumber, menangkap peristiwa perubahan, membangun tabel dimensi yang mempertahankan riwayat lengkap setiap produk, dan menambahkan lapisan pelaporan agregat di atasnya.

Fitur utama dalam tutorial ini adalah AUTO CDC. Di gudang tradisional, Anda akan menulis pernyataan kompleks MERGE INTO untuk menggabungkan operasi penyisipan, pembaruan, dan penghapusan pada tabel target. Pendekatan ini rawan kesalahan, terutama ketika peristiwa tiba di luar urutan. AUTO CDC menangani ini untuk Anda. Anda mendeklarasikan kunci bisnis, kolom urutan, dan apakah Anda ingin SCD Tipe 1 (hanya nilai terbaru) atau SCD Tipe 2 (riwayat lengkap), dan Azure Databricks menerapkan logika penggabungan yang benar secara otomatis. Untuk gambaran umum CDC, lihat API CDC OTOMATIS: Menyederhanakan perubahan pengambilan data dengan alur.

Pada akhir tutorial ini, Anda akan memiliki:

  1. Membuat tabel sumber yang melacak perubahan dengan Change Data Feed.
  2. Memeriksa data perubahan mentah untuk memahami aliran peristiwa CDC.
  3. Digunakan AUTO CDC untuk membangun tabel dimensi SCD Tipe 2 dari peristiwa tersebut.
  4. Peristiwa penghapusan diproses secara bertahap dan berurutan melalui pipeline.
  5. Membuat tampilan materialisasi yang secara bertahap mempertahankan laporan agregat.
  6. SCHEDULE REFRESH EVERY 1 DAY Dikonfigurasi sehingga perubahan menyebar secara otomatis melalui alur.

Persyaratan

Untuk menyelesaikan tutorial ini, Anda harus memenuhi persyaratan berikut:

Langkah 1: Siapkan katalog dan skema Anda

Buka editor Databricks SQL dan atur katalog dan skema kerja Anda. Anda harus memiliki izin untuk mengakses USE katalog dan skema yang Anda pilih.

USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;

Langkah 2: Membuat tabel sumber dan memuat data

Buat tabel products dengan Gunakan umpan data perubahan Delta Lake pada Azure Databricks (CDF) diaktifkan. CDF adalah fitur Delta Lake yang merekam setiap sisipan, pembaruan, dan penghapusan sebagai log perubahan yang dapat dikueri. Ini mirip dengan aliran CDC dari sistem sumber transaksi, kecuali perubahan diambil langsung dalam tabel Delta daripada dari log eksternal. Anda menggunakan CDF di sini untuk menghasilkan peristiwa perubahan yang akan digunakan alur pemrosesan berikutnya.

  1. Buat tabel dan muat rekaman awal:

    CREATE OR REPLACE TABLE products (
      product_id INT,
      product_name STRING,
      category STRING,
      warehouse STRING
    )
    TBLPROPERTIES (delta.enableChangeDataFeed = true);
    
    INSERT INTO products VALUES
      (1, 'Spoon', 'Cutlery', 'Seattle'),
      (2, 'Fork', 'Cutlery', 'Portland'),
      (3, 'Knife', 'Cutlery', 'Denver'),
      (4, 'Chair', 'Furniture', 'Austin'),
      (5, 'Table', 'Furniture', 'Chicago'),
      (6, 'Lamp', 'Lighting', 'Boston'),
      (7, 'Mug', 'Kitchenware', 'Seattle'),
      (8, 'Plate', 'Kitchenware', 'Atlanta'),
      (9, 'Bowl', 'Kitchenware', 'Dallas'),
      (10, 'Glass', 'Kitchenware', 'Phoenix');
    
  2. Simulasikan perubahan hulu, termasuk produk baru, pemindahan gudang, dan penetapan ulang kategori:

    INSERT INTO products VALUES
      (11, 'Napkin', 'Dining', 'San Francisco'),
      (12, 'Coaster', 'Dining', 'New York');
    
    UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1;
    UPDATE products SET category = 'Dining' WHERE product_id = 2;
    

Langkah 3: Mengkueri umpan data perubahan

Sebelum membangun alur hilir, ini membantu melihat peristiwa perubahan mentah sehingga Anda dapat memahami apa yang AUTO CDC akan diproses. Fungsi table_changes() membaca log CDF dan mengembalikan setiap operasi yang ditangkap bersama dengan kolom metadata.

SELECT
  product_id, product_name, warehouse,
  _change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;

Misalnya, Spoon memiliki tiga acara: sebuah insert (Seattle), sebuah update_preimage (Seattle), dan sebuah update_postimage (Los Angeles).

Perhatikan bahwa perubahan logis tunggal (misalnya, memindahkan Sendok ke gudang yang berbeda) menghasilkan beberapa peristiwa: preimage dan postimage. Di gudang tradisional, Anda akan menulis MERGE pernyataan untuk mendamaikan semua peristiwa ini ke dalam tabel target, menangani sisipan, pembaruan, dan penghapusan dengan logika terpisah, dan memastikan peristiwa diterapkan dalam urutan yang benar. Ini adalah kompleksitas yang AUTO CDC menghilangkan pada langkah berikutnya.

Langkah 4: Bangun dimensi SCD Tipe 2 dengan AUTO CDC

Penting

AUTO CDC dalam Beta. Memerlukan Databricks Runtime 17.3 atau lebih tinggi.

Tabel streaming memproses data secara bertahap. Pada setiap refresh, hanya membaca baris baru sejak eksekusi terakhir, sehingga tidak perlu memproses ulang himpunan data lengkap. Ini membuatnya sangat cocok untuk sumber volume tinggi atau sering berubah.

AUTO CDC menambahkan pemrosesan pengambilan data perubahan di atas tabel streaming. Alih-alih menulis pernyataan MERGE INTO yang secara manual menangani sisipan, pembaruan, dan penghapusan, Anda mendeklarasikan kunci bisnis dan kolom pengurutan dan membiarkan Azure Databricks menerapkan logika yang benar. AUTO CDC juga menangani peristiwa yang tidak berurutan secara otomatis, yang merupakan masalah umum saat menggunakan MERGE INTO untuk menangani peristiwa yang tiba dari sistem terdistribusi atau beban batch dengan tanda waktu yang tumpang tindih.

Pernyataan berikut membuat tabel SCD Tipe 2 yang mempertahankan riwayat versi lengkap setiap produk. Setiap versi mendapatkan __START_AT dan __END_AT penanda waktu. NULL in __END_AT menandai versi saat ini.

CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
  • SCHEDULE REFRESH EVERY 1 DAY: memperbarui tabel setiap hari.
  • FLOW AUTO CDC: menyatakan ini sebagai aliran CDC. Azure Databricks menerapkan semantik sisipkan, perbarui, dan hapus secara otomatis.
  • KEYS (product_id): kunci bisnis. Peristiwa dengan kunci yang sama digabungkan ke dalam baris versi.
  • APPLY AS DELETE WHEN _change_type = 'delete': menutup versi saat ini saat peristiwa penghapusan tiba. Ini memungkinkan Anda menentukan kondisi yang mengidentifikasi peristiwa penghapusan.
  • SEQUENCE BY _commit_timestamp: menetapkan pengurutan kejadian. Menangani kedatangan yang tidak berurutan dengan benar.
  • STORED AS SCD TYPE 2: menyimpan riwayat penuh. AUTO CDC mendukung SCD Tipe 1 dan SCD Tipe 2.

Kueri tabel dimensi:

SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
  • Sendok: dua versi. Seattle (ditutup, __END_AT diatur) dan Los Angeles (aktif, __END_AT = NULL).
  • Fork: dua versi. Kategori Alat Makan (tertutup) dan Kategori Ruang Makan (saat ini).
  • Serbet dan Coaster: masing-masing satu versi (baru dimasukkan, __END_AT = NULL).
  • Semua produk lainnya: masing-masing satu versi (__END_AT = NULL).

Langkah 5: Proses menghapus melalui alur

Sekarang simulasikan dua produk yang dihentikan dengan menghapusnya dari tabel sumber:

DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;

Peristiwa penghapusan ini direkam di log CDF tetapi tabel streaming belum melihatnya. Refresh tabel streaming untuk memproses peristiwa baru:

REFRESH STREAMING TABLE products_history;

Kueri tabel dimensi untuk memverifikasi bahwa penghapusan diterapkan:

SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;

Mangkuk dan Gelas dinyatakan tutup dengan __END_AT set, menandakan bahwa mereka telah dihentikan produksi. Semua produk lain saat ini tetap tidak berubah. Tabel streaming hanya memproses peristiwa penghapusan baru tanpa memproses ulang penyisipan dan pembaruan dari refresh sebelumnya.

Langkah 6: Membuat tampilan materialisasi agregat

Sekarang setelah Anda memiliki tabel dimensi yang tetap terkini dengan perubahan sumber, Anda dapat menambahkan lapisan pelaporan di atasnya.

Tampilan materialisasi menyimpan hasil kueri yang telah dihitung sebelumnya sebagai tabel fisik. Tidak seperti tampilan biasa, yang menjalankan ulang kueri setiap kali Anda membaca darinya, tampilan materialisasi menyimpan hasil dan hanya menghitung ulang baris yang terpengaruh oleh perubahan upstream pada setiap refresh. Ini membuatnya sangat cocok untuk dasbor dan laporan di mana performa kueri penting.

CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
  category,
  COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;

SCHEDULE REFRESH EVERY 1 DAY berarti tampilan ini di-refresh pada jadwal harian. Dikombinasikan dengan jadwal yang sama pada tabel streaming, Anda sekarang memiliki pipeline tiga tahap di mana perubahan pada tabel sumber mengalir melalui dimensi dan ke dalam keseluruhan pada setiap siklus penyegaran. Tidak ada refresh manual untuk dijalankan.

SELECT * FROM products_by_category ORDER BY active_products DESC;

Langkah 7: Verifikasi kaskade ujung-ke-ujung

Untuk memverifikasi kaskade alur lengkap, buat perubahan pada tabel sumber:

UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;

Pisau bergerak dari Denver ke Seattle. Perubahan DML tunggal ini memicu sebuah kaskade alur penuh, menunjukkan cara ketiga-tiga tahap bekerja sama.

  1. products merekam peristiwa perubahan melalui CDF.
  2. products_history memproses peristiwa dan menambahkan versi baru untuk Pisau.
  3. products_by_category hanya menghitung ulang baris Cutlery yang terpengaruh.

Memverifikasi:

SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;

SELECT * FROM products_by_category ORDER BY active_products DESC;

Pembersihan

Untuk membersihkan sumber daya yang dibuat oleh tutorial ini, gunakan SQL berikut:

DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;

Sumber daya tambahan