Umpan Perubahan Data Lakebase

Note

Fitur Lakebase Change Data Feed berada dalam Pratinjau Publik.

Apa itu Umpan Perubahan Data Lakebase?

Lakebase memperkenalkan Change Data Feed (CDF) bawaan, yang membuka akses ke data operasional Anda untuk pipeline, model, dan aplikasi downstream. Setiap penyisipan, pembaruan, dan penghapusan pada tabel Postgres Lakebase dicatat dari log write-ahead dan disimpan sebagai baris baru dalam tabel Delta terkelola Unity Catalog, diproses secara batch dan ditulis setiap ~15 detik. Riwayat perubahan disimpan dalam format terbuka yang dapat dibaca oleh mesin komputasi apa pun.

Tabel tujuan mengikuti bentuk yang sama dengan Delta Change Data Feed: setiap baris membawa _pg_change_type, LSN, ID transaksi, dan tanda waktu. Perubahan operasional menjadi sumber utama untuk ETL, audit, dan konsumen downstream — tanpa perlu menyiapkan stack CDC eksternal.

Data Lakebase CDF mengalir dari Postgres melalui wal2delta ke tabel Delta di Unity Catalog.

Skenario penggunaan

Lakebase CDF membawa data operasional ke lakehouse sehingga pipeline dan aplikasi hilir dapat bereaksi terhadap perubahan secara real time.

Skenario penggunaan Description
Pipeline ETL Gunakan Lakebase sebagai sumber perunggu untuk alur medali. Bangun pekerjaan SDP atau Spark Structured Streaming bertahap terhadap umpan perubahan dan perbarui tabel perak dan emas hilir.
Audit log Pertahankan riwayat lengkap yang dapat dikueri dari setiap sisipan, pembaruan, dan penghapusan pada tabel Lakebase untuk kepatuhan dan forensik. Sejarahnya adalah Delta yang tidak dapat diubah.
Sistem eksternal Simpan data perubahan Lakebase dalam format terbuka yang dapat dikonsumsi mesin apa pun. Karena tujuannya adalah tabel Delta di Unity Catalog, sistem eksternal dan pembaca non-Databricks dapat mengakses umpan secara langsung.

Aktifkan pratinjau ini

Administrator ruang kerja harus mengaktifkan pratinjau Lakebase Change Data Feed dari halaman Pratinjau pada ruang kerja.

Requirements

  • Autoscaling Lakebase:Proyek Autoscaling Lakebase yang menjalankan Postgres 17.
  • Database sumber: Tabel harus berada di databricks_postgres database di Lakebase. Setiap proyek dibuat dengan database default ini. Ini adalah keterbatasan yang diketahui.
  • Katalog Unity: Identitas yang mengonfigurasi CDF membutuhkan USE CATALOG, , USE SCHEMAdan CREATE TABLE pada katalog dan skema tujuan. Lihat Memberikan izin pada objek.
  • Penyimpanan default: Katalog tujuan yang dikonfigurasi dengan penyimpanan default tidak didukung.
  • Proyek Lakebase: Peran Postgres Anda memerlukan izin CAN MANAGE pada proyek Lakebase. Pemilik proyek memiliki DAPAT MENGELOLA secara bawaan. Lihat Mengelola izin proyek.
  • Jenis data: Lihat Pemetaan jenis data. Tipe data yang tidak memiliki padanan langsung dalam Delta akan disimpan sebagai STRING.

Siapkan Lakebase CDF

Untuk memulai, atur identitas replika penuh pada tabel yang Anda inginkan di umpan (Langkah 1), lalu mulai CDF di aplikasi Lakebase (Langkah 2). Data Anda muncul sebagai lb_<table_name>_history tabel Delta di katalog dan skema Unity Catalog yang Anda pilih.

Langkah 1: Atur identitas replika penuh

Agar tabel Lakebase dapat berpartisipasi dalam CDF, tabel tersebut harus menetapkan REPLICA IDENTITY FULL. Secara default Postgres hanya mencatat kunci utama saat baris diperbarui atau dihapus. Menetapkan full identity memberi tahu Postgres untuk merekam status baris sebelum dan sesudah perubahan dalam write-ahead log, yang diperlukan CDF untuk membangun riwayat perubahan yang lengkap.

Anda dapat menjalankan perintah ini di Editor Lakebase SQL atau klien Postgres apa pun.

Tabel tunggal

ALTER TABLE <table_name> REPLICA IDENTITY FULL;

Semua tabel yang ada dalam skema

Untuk mengatur identitas replika pada setiap tabel yang ada dalam skema (public dalam contoh ini), jalankan:

DO $$
DECLARE r record;
BEGIN
  FOR r IN
    SELECT table_schema, table_name
    FROM information_schema.tables
    WHERE table_schema = 'public'
      AND table_type = 'BASE TABLE'
  LOOP
    EXECUTE format(
      'ALTER TABLE %I.%I REPLICA IDENTITY FULL;',
      r.table_schema, r.table_name
    );
  END LOOP;
END $$;

Terapkan otomatis ke tabel mendatang

Untuk membuat setiap tabel yang baru dibuat secara otomatis menerima REPLICA IDENTITY FULL, instal pemicu peristiwa Postgres. Ini berjalan setelah setiap CREATE TABLE dan mengatur identitas pada tabel baru:

CREATE OR REPLACE FUNCTION public.set_full_replica_identity()
RETURNS event_trigger
LANGUAGE plpgsql
AS $$
DECLARE
  obj record;
BEGIN
  FOR obj IN
    SELECT * FROM pg_event_trigger_ddl_commands()
    WHERE command_tag = 'CREATE TABLE'
  LOOP
    EXECUTE format(
      'ALTER TABLE %s REPLICA IDENTITY FULL;',
      obj.object_identity
    );
  END LOOP;
END $$;

CREATE EVENT TRIGGER set_full_replica_identity_on_create
ON ddl_command_end
WHEN TAG IN ('CREATE TABLE')
EXECUTE FUNCTION public.set_full_replica_identity();

Gabungkan pemicu peristiwa dengan perulangan di tab sebelumnya untuk mencakup tabel yang sudah ada maupun yang akan datang dalam satu konfigurasi.

Periksa tabel mana yang memiliki set identitas replika

Untuk melihat tabel mana dalam skema yang memiliki identitas replika yang dikonfigurasi, jalankan:

SELECT n.nspname AS table_schema,
       c.relname AS table_name,
       CASE c.relreplident
         WHEN 'd' THEN 'default'
         WHEN 'n' THEN 'nothing'
         WHEN 'f' THEN 'full'
         WHEN 'i' THEN 'index'
       END AS replica_identity
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
  AND n.nspname = 'public'
ORDER BY n.nspname, c.relname;

Hanya baris dengan replica_identity = 'full' yang siap untuk CDF.

Langkah 2: Mulai umpan data perubahan

Lakebase CDF dikonfigurasi pada tingkat skema. Setelah dimulai, setiap tabel saat ini dan di masa mendatang dalam skema sumber disertakan dalam umpan.

  1. Di ruang kerja Azure Databricks Anda, buka Lakebase Postgres dari pengalih aplikasi (kanan atas).
  2. Pilih proyek Lakebase Anda dan cabang yang ingin Anda gunakan (misalnya, produksi atau utama).
  3. Buka Gambaran umum cabang dan klik tab Ubah Umpan Data .
  4. Klik Mulai.
  5. Dalam dialog konfigurasi:
    • Database: Default ke nilai databricks_postgres.
    • Skema: Pilih skema Postgres sumber.
    • Katalog Tujuan: Pilih katalog tujuan Unity Catalog.
    • Skema: Pilih skema Unity Catalog tujuan.
  6. Klik Mulai untuk memulai umpan.

Ikhtisar cabang dengan tab Change Data Feed yang menampilkan Start dan konfigurasi skema.

Tabel ditampilkan di tujuan sebagai lb_<table_name>_history. Untuk menemukannya, buka Katalog di bar samping, navigasikan ke katalog dan skema tujuan, dan buka tab Tabel .

Tab Ubah Umpan Data di Lakebase memiliki dua sub-tab:

Sub-tab memperlihatkan pemetaan dan kemajuan per tabel.

  • Skema: Mencantumkan setiap skema sumber, katalog tujuan dan skemanya di Katalog Unity, dan status.
  • Tabel: Mencantumkan setiap tabel sumber, tabel tujuan lb_<table_name>_history , status (Streaming atau Snapshotting), LSN yang diterapkan (seberapa jauh umpan telah ditulis ke Delta, ditampilkan saat - masih dalam rekam jepret awal), dan Pembaruan terakhir (terakhir kali tabel menerima perubahan).

Anda juga dapat memeriksa status umpan dari Postgres dengan menjalankan ini di Editor Lakebase SQL:

SELECT * FROM wal2delta.tables;

Hasilnya mencakup table_oid, (statusSTREAMING atau SNAPSHOTTING), committed_lsn, dan last_write_time per tabel.

Important

Apa itu wal2delta? Lakebase CDF didukung oleh ekstensi wal2delta Postgres, yang berjalan di dalam komputasi Lakebase. Ini menggunakan dekode logis untuk menangkap perubahan pada write-ahead log (WAL) dan menuliskannya ke tabel Delta di Unity Catalog.

Skema tabel tujuan

CDF menulis satu tabel Delta per tabel sumber, bernama lb_<table_name>_history di katalog dan skema tujuan Anda. Selain kolom sumber Anda, setiap baris memiliki kolom sistem berikut:

kolom Type Description
_pg_change_type TEKS Jenis operasi: insert, delete, update_preimage, atau update_postimage.
_pg_lsn BIGINT Nomor Urutan Log Postgres.
_pg_xid INTEGER ID Transaksi Postgres.
_timestamp TIMESTAMP Tanda waktu ketika perubahan diproses (tanpa zona waktu).
_sort_by BIGINT Kunci pengurutan monotonik digunakan untuk mengurutkan semua perubahan.

Pola perubahan umum

  • Rekam jepret awal: Pertama kali CDF berjalan pada tabel Lakebase yang ada, setiap baris yang ada ditulis dengan _pg_change_type = 'insert'.
  • Update: Pembaruan menghasilkan dua baris: satu dengan _pg_change_type = 'update_preimage' (baris lama) dan satu dengan _pg_change_type = 'update_postimage' (baris baru).
  • Menghapus: Penghapusan menghasilkan satu baris dengan _pg_change_type = 'delete'.

Ini adalah peristiwa perubahan yang sama dengan Umpan Data Perubahan Delta, sehingga pola hilir yang sama berlaku.

Perilaku operasional

  • Konflik penamaan: Jika dua tabel sumber dipetakan ke nama tujuan yang sama (misalnya, sales.users dan marketing.users keduanya dipetakan ke lb_users_history), CDF menulis yang pertama ke lb_users_history dan secara otomatis menambahkan sufiks pada yang kedua menjadi lb_users_history_1. Anda dapat mengganti nama tabel tujuan di Katalog Unity dan umpan terus berfungsi.
  • Cakupan tingkat skema: Saat Anda memulai CDF pada skema Lakebase, setiap tabel saat ini dan di masa mendatang dalam skema tersebut disertakan. Tabel kosong akan dilewati — tabel harus memiliki setidaknya satu baris agar ditampilkan di tujuan.
  • Tabel sumber yang dihapus: Jika Anda menghapus tabel di Lakebase, tabel Delta tujuan di Unity Catalog tetap dipertahankan.

Membangun alur hilir

Lakebase CDF dirancang untuk alur hilir yang bereaksi terhadap perubahan operasional. Pola di bawah ini menunjukkan tiga cara untuk mengonsumsi umpan, diurutkan dari yang paling sederhana hingga yang paling fleksibel.

Contoh skenario. Aplikasi e-commerce mencatat pesanan ke dalam tabel Postgres orders, dengan setiap baris berisi item_id dan quantity. Tim logistik memerlukan tingkat stok secara real-time. Dengan CDF, setiap perubahan pada orders disimpan dalam tabel Delta lb_orders_history di Unity Catalog. Pipeline downstream membaca feed perubahan tersebut dan memperbarui tabel inventory_levels setiap kali pesanan dibuat, diedit, atau dibatalkan.

Menghitung inventarisasi saat ini dengan tampilan materialisasi

Pola paling sederhana adalah SQL materialized view pada tabel riwayat. MV diperbarui secara bertahap saat event perubahan baru tiba, dan konsumen hilir melakukan kueri padanya seperti pada tabel lain.

CREATE MATERIALIZED VIEW inventory_levels AS
SELECT
  item_id,
  SUM(
    CASE
      -- New orders (and the "new half" of updates) decrement inventory
      WHEN _pg_change_type IN ('insert', 'update_postimage') THEN -quantity
      -- Cancellations (and the "old half" of updates) restore inventory
      WHEN _pg_change_type IN ('delete', 'update_preimage') THEN quantity
      ELSE 0
    END
  ) AS current_inventory,
  MAX(_timestamp) AS last_transaction_ts,
  MAX(_pg_lsn) AS last_lsn
FROM lb_orders_history
GROUP BY item_id;

Dua baris yang dihasilkan untuk setiap pembaruan saling meniadakan satu sama lain kecuali pada perubahan neto, sehingga jumlah kumulatif tetap benar saat pesanan diedit.

Streaming perubahan dengan Spark Declarative Pipelines

Untuk arsitektur medali yang terstruktur, gunakan Spark Declarative Pipelines (SDP) untuk mendeklarasikan tabel perunggu, perak, dan emas. SDP menjalankannya sebagai pipeline yang saling terhubung, dengan checkpoint dan manajemen dependensi ditangani secara otomatis untuk Anda.

import dlt
from pyspark.sql import functions as F

@dlt.table
def inventory_adjustments():
    return (
        spark.readStream.table("<catalog>.<schema>.lb_orders_history")
        .withColumn(
            "delta",
            F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
             .when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
             .otherwise(0),
        )
        .select("item_id", "delta", "_timestamp")
    )

@dlt.expect_or_drop("non_negative_stock", "on_hand >= 0")
@dlt.table
def inventory_levels():
    return (
        spark.read.table("LIVE.inventory_adjustments")
        .groupBy("item_id")
        .agg(F.sum("delta").alias("on_hand"))
    )

inventory_adjustments membaca lb_orders_history secara inkremental dengan readStream dan menghasilkan delta per peristiwa. inventory_levels mengagregasi dengan item_id untuk menghitung stok saat ini. Asumsinya menghapus baris yang akan membuat stok menjadi negatif, yang menandakan adanya bug di proses sebelumnya.

Untuk panduan lengkap menyeluruh, lihat Tutorial: Membangun alur ETL menggunakan change data capture.

Pemrosesan kustom dengan Streaming Terstruktur Spark

Saat Anda memerlukan kontrol penuh — misalnya, penggabungan kustom, efek samping, atau beberapa sink — baca tabel riwayat langsung dengan Spark Structured Streaming dan gunakan foreachBatch untuk menulis ke tujuan Anda.

from pyspark.sql import functions as F
from delta.tables import DeltaTable

def update_inventory(batch_df, batch_id):
    deltas = (
        batch_df
        .withColumn(
            "delta",
            F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
             .when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
             .otherwise(0),
        )
        .groupBy("item_id")
        .agg(F.sum("delta").alias("delta"))
    )

    target = DeltaTable.forName(spark, "<catalog>.<schema>.inventory_levels")
    (target.alias("t")
        .merge(deltas.alias("s"), "t.item_id = s.item_id")
        .whenMatchedUpdate(set={"on_hand": F.expr("t.on_hand + s.delta")})
        .whenNotMatchedInsert(values={"item_id": "s.item_id", "on_hand": "s.delta"})
        .execute())

(spark.readStream.table("<catalog>.<schema>.lb_orders_history")
    .writeStream
    .foreachBatch(update_inventory)
    .option("checkpointLocation", "/Volumes/<catalog>/<schema>/checkpoints/inventory_levels")
    .start())

Setiap microbatch mengagregasi peristiwa perubahan dengan item_id dan menggabungkan delta net menjadi inventory_levels.

Inkremental berdasarkan desain. Setiap lb_<table_name>_history tabel adalah tabel Delta yang hanya dapat ditambahkan. Setiap perubahan sumber dicatat sebagai baris baru dengan _pg_change_type menandai operasi. Tampilan terwujud Databricks SQL , alur Lakeflow Spark Declarative Pipelines, dan job Spark Structured Streaming semuanya memproses baris baru secara inkremental dari log transaksi Delta, sehingga pipeline hilir hanya melakukan pemrosesan yang sebanding dengan perubahan yang terjadi. Anda tidak perlu mengaktifkan Umpan Data Perubahan Delta pada tabel riwayat karena semantik perubahan sudah dikodekan dalam data baris.

Pemetaan jenis data

CDF mendukung sebagian besar jenis primitif PostgreSQL standar. Tipe data yang tidak memiliki padanan langsung dalam Delta akan disimpan sebagai STRING.

Jenis PostgreSQL jenis Delta di Azure Databricks Notes
BOOLEAN BOOLEAN
INT, SMALLINT, BIGINT INT, SMALLINT, BIGINT
TEKS, VARCHAR, KARAKTER STRING
JSONB STRING Disimpan sebagai string JSON.
ENUM STRING Disimpan sebagai label enum.
NUMERIK / DESIMAL desimal atau string Menggunakan presisi/skala sumber jika memungkinkan. Melakukan penskalaan ulang tanpa kerugian untuk nilai presisi/skala yang tidak kompatibel. Kembali ke STRING ketika presisi melebihi 38 atau ketika presisi/skala tidak terdefinisi (NUMERIC tidak terbatas). Semua kolom NUMERIC/DECIMAL dapat bernilai NULL karena nilai NaN dipetakan ke NULL. Lihat Jenis numerik PostgreSQL.
DATE DATE
TIMESTAMP TIMESTAMP_NTZ
TIMESTAMPTZ TIMESTAMP
FLOAT, DOUBLE FLOAT, DOUBLE

Jenis yang disimpan sebagai STRING:

  • Geografi/Geometri (PostGIS): Jenis dari ekstensi PostGIS (misalnya, geometry, geography).
  • Vektor (pgvector): Jenis vector dari ekstensi pgvector.
  • Jenis komposit/struktur: Jenis kustom yang ditentukan dengan CREATE TYPE ... AS (field_name type, ...). Ini adalah tipe mirip baris dengan bidang bernama.
  • Peta: Jenis nilai kunci seperti peta seperti hstore (dari hstore ekstensi). Postgres tidak memiliki jenis peta bawaan. hstore adalah cara umum untuk menyimpan pasangan kunci-nilai dalam kolom.

Mengelola perubahan skema

  • Mengganti nama tabel di Postgres (misalnya, ALTER TABLE users RENAME TO customers) memungkinkan umpan berlanjut. Nama tabel Delta tujuan tidak berubah — tetap lb_users_history.
  • Perubahan skema (menambahkan kolom, menghilangkan kolom, atau mengubah jenis data kolom) memicu rekam jepret ulang tabel yang terpengaruh. CDF membaca ulang seluruh tabel dari Postgres dan menulis ulang tabel Delta tujuan.

Menonaktifkan Lakebase CDF

Menonaktifkan CDF menghentikan umpan untuk semua skema Lakebase dalam proyek.

  1. Di ruang kerja Azure Databricks Anda, buka Lakebase Postgres dari pengalih aplikasi (kanan atas).
  2. Pilih proyek Lakebase dan cabang tempat Anda mengonfigurasi CDF.
  3. Buka Gambaran umum cabang dan klik tab Ubah Umpan Data .
  4. Klik Nonaktifkan. Dalam dialog konfirmasi, tinjau peringatan bahwa perubahan akan berhenti mengalir ke tabel Delta, lalu klik Nonaktifkan lagi untuk mengonfirmasi.

Menonaktifkan CDF tidak memulai ulang komputasi Anda.

Warning

Jika Anda mengaktifkan kembali CDF nanti, sistem tidak melakukan rekam jepret ulang penuh. Setiap perubahan yang terjadi saat CDF dinonaktifkan hilang secara permanen dari tabel Delta tujuan.

Batasan dan pemecahan masalah

Anda dapat melihat status per tabel (rekam jepret, dilewati, atau streaming) di tab Ubah Umpan Data , atau dengan menjalankan ini di Lakebase:

SELECT * FROM wal2delta.tables;

Alasan umum tabel tidak muncul di umpan:

  • REPLICA IDENTITY FULL belum diatur: Jalankan ALTER TABLE <table_name> REPLICA IDENTITY FULL; pada tabel. Lihat Langkah 1: Atur identitas replika penuh.
  • Tabel yang dipartisi: Tabel yang dipartisi Lakebase tidak didukung. Skema yang berisi tabel yang dipartisi menyebabkan tabel tersebut gagal.
  • Tabel kosong: Tabel dengan nol baris dilewati hingga setidaknya satu baris ada.

Langkah berikutnya