Bagikan melalui


TERAPKAN PERUBAHAN API: Menyederhanakan perubahan pengambilan data dengan Tabel Langsung Delta

Tabel Langsung Delta menyederhanakan pengubahan pengambilan data (CDC) dengan APPLY CHANGES API dan APPLY CHANGES FROM SNAPSHOT . Antarmuka yang Anda gunakan tergantung pada sumber data perubahan:

  • Gunakan APPLY CHANGES untuk memproses perubahan dari umpan data perubahan (CDF).
  • Gunakan APPLY CHANGES FROM SNAPSHOT (Pratinjau Umum) untuk memproses perubahan dalam rekam jepret database.

Sebelumnya, pernyataan ini MERGE INTO umumnya digunakan untuk memproses rekaman CDC di Azure Databricks. Namun, MERGE INTO dapat menghasilkan hasil yang salah karena rekaman yang tidak berurutan atau memerlukan logika kompleks untuk mengurutkan ulang rekaman.

APPLY CHANGES API didukung di antarmuka Delta Live Tables SQL dan Python. APPLY CHANGES FROM SNAPSHOT API didukung di antarmuka Python Delta Live Tables.

Baik APPLY CHANGES dan APPLY CHANGES FROM SNAPSHOT dukung pembaruan tabel menggunakan SCD tipe 1 dan tipe 2:

  • Gunakan SCD jenis 1 untuk memperbarui rekaman secara langsung. Riwayat tidak dipertahankan untuk rekaman yang diperbarui.
  • Gunakan SCD tipe 2 untuk menyimpan riwayat rekaman, baik pada semua pembaruan atau pembaruan untuk sekumpulan kolom tertentu.

Untuk sintaks dan referensi lainnya, lihat:

Catatan

Artikel ini menjelaskan cara memperbarui tabel di saluran Delta Live Tables Anda berdasarkan perubahan pada data sumber. Untuk mempelajari cara merekam dan mengkueri informasi perubahan tingkat baris untuk tabel Delta, lihat Menggunakan umpan data perubahan Delta Lake di Azure Databricks.

Persyaratan

Untuk menggunakan CDC API, alur Anda harus dikonfigurasi untuk menggunakan alur DLT tanpa server atau Tabel Pro Langsung Delta atau Advanced edisi.

Bagaimana CDC diimplementasikan dengan APPLY CHANGES API?

Dengan menangani rekaman yang tidak berurutan secara otomatis, APPLY CHANGES API dalam Tabel Langsung Delta memastikan pemrosesan rekaman CDC yang benar dan menghapus kebutuhan untuk mengembangkan logika kompleks untuk menangani rekaman yang tidak berurutan. Anda harus menentukan kolom dalam data sumber untuk mengurutkan rekaman, yang ditafsirkan Delta Live Tables sebagai representasi yang meningkat secara monoton dari pengurutan data sumber yang tepat. Tabel Langsung Delta secara otomatis menangani data yang tiba di luar urutan. Untuk perubahan SCD tipe 2, Tabel Langsung Delta menyebarluaskan nilai urutan yang sesuai ke tabel __START_AT dan __END_AT kolom target. Harus ada satu pembaruan yang berbeda per kunci pada setiap nilai urutan, dan nilai urutan NULL tidak didukung.

Untuk melakukan pemrosesan CDC dengan APPLY CHANGES, Anda terlebih dahulu membuat tabel streaming lalu menggunakan APPLY CHANGES INTO pernyataan di SQL atau apply_changes() fungsi di Python untuk menentukan sumber, kunci, dan pengurutan untuk umpan perubahan. Untuk membuat tabel streaming target, gunakan CREATE OR REFRESH STREAMING TABLE pernyataan di SQL atau create_streaming_table() fungsi di Python. Lihat contoh pemrosesan SCD tipe 1 dan tipe 2.

Untuk detail sintaks, lihat referensi Delta Live Tables SQL atau referensi Python.

Bagaimana CDC diimplementasikan dengan APPLY CHANGES FROM SNAPSHOT API?

Penting

APPLY CHANGES FROM SNAPSHOT API berada di Pratinjau Umum.

APPLY CHANGES FROM SNAPSHOT adalah API deklaratif yang secara efisien menentukan perubahan data sumber dengan membandingkan serangkaian rekam jepret dalam urutan dan kemudian menjalankan pemrosesan yang diperlukan untuk pemrosesan CDC rekaman dalam rekam jepret. APPLY CHANGES FROM SNAPSHOT hanya didukung oleh antarmuka Python Tabel Langsung Delta.

APPLY CHANGES FROM SNAPSHOT mendukung penyerapan rekam jepret dari beberapa jenis sumber:

  • Gunakan penyerapan rekam jepret berkala untuk menyerap rekam jepret dari tabel atau tampilan yang ada. APPLY CHANGES FROM SNAPSHOT memiliki antarmuka yang sederhana dan efisien untuk mendukung penyerapan rekam jepret secara berkala dari objek database yang ada. Rekam jepret baru diserap dengan setiap pembaruan alur, dan waktu penyerapan digunakan sebagai versi rekam jepret. Saat alur dijalankan dalam mode berkelanjutan, beberapa rekam jepret diserap dengan setiap pembaruan alur pada periode yang ditentukan oleh pengaturan interval pemicu untuk alur yang berisi pemrosesan APPLY CHANGES FROM SNAPSHOT.
  • Gunakan penyerapan rekam jepret historis untuk memproses file yang berisi rekam jepret database, seperti rekam jepret yang dihasilkan dari database Oracle atau MySQL atau gudang data.

Untuk melakukan pemrosesan CDC dari jenis sumber apa pun dengan APPLY CHANGES FROM SNAPSHOT, Anda terlebih dahulu membuat tabel streaming lalu menggunakan apply_changes_from_snapshot() fungsi di Python untuk menentukan rekam jepret, kunci, dan argumen lain yang diperlukan untuk mengimplementasikan pemrosesan. Lihat contoh penyerapan rekam jepret berkala dan penyerapan rekam jepret historis.

Rekam jepret yang diteruskan ke API harus dalam urutan naik berdasarkan versi. Jika Tabel Langsung Delta mendeteksi rekam jepret yang tidak berurutan, kesalahan akan muncul.

Untuk detail sintaks, lihat referensi Python Tabel Langsung Delta.

Batasan

Target APPLY CHANGES kueri atau APPLY CHANGES FROM SNAPSHOT tidak dapat digunakan sebagai sumber untuk tabel streaming. Tabel yang membaca dari target APPLY CHANGES kueri atau APPLY CHANGES FROM SNAPSHOT harus berupa tampilan materialisasi.

Contoh: Pemrosesan SCD tipe 1 dan SCD tipe 2 dengan data sumber CDF

Bagian berikut ini menyediakan contoh kueri SCD Delta Live Tables tipe 1 dan tipe 2 yang memperbarui tabel target berdasarkan peristiwa sumber dari umpan data perubahan yang:

  1. Membuat rekaman pengguna baru.
  2. Menghapus rekaman pengguna.
  3. Memperbarui rekaman pengguna. Dalam contoh SCD jenis 1, operasi terakhir UPDATE datang terlambat dan dihilangkan dari tabel target, menunjukkan penanganan peristiwa yang tidak berurutan.

Contoh berikut mengasumsikan keakraban dengan mengonfigurasi dan memperbarui alur Delta Live Tables. Lihat Tutorial: Menjalankan alur Tabel Langsung Delta pertama Anda.

Untuk menjalankan contoh ini, Anda harus mulai dengan membuat himpunan data sampel. Lihat Membuat data pengujian.

Berikut ini adalah catatan input untuk contoh tersebut:

userId nama kota operasi sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Jika Anda membatalkan komentar baris akhir dalam contoh data, baris tersebut akan menyisipkan catatan berikut yang menentukan di mana rekaman harus dipotong:

userId nama kota operasi sequenceNum
null null null TRUNCATE 3

Catatan

Semua contoh berikut menyertakan opsi untuk menentukan operasi DELETE dan TRUNCATE , tetapi masing-masing bersifat opsional.

Proses pembaruan SCD tipe 1

Contoh berikut menunjukkan pemrosesan pembaruan SCD jenis 1:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Setelah menjalankan SCD jenis 1 dalam contoh ini, tabel target akan berisi catatan berikut:

userId nama kota
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancun

Setelah menjalankan contoh SCD jenis 1 dengan catatan tambahan TRUNCATE, catatan 124 dan 126 dipotong karena operasi TRUNCATE di sequenceNum=3, dan tabel target berisi catatan berikut:

userId nama kota
125 Mercedes Guadalajara

Proses pembaruan SCD tipe 2

Contoh berikut menunjukkan pemrosesan pembaruan SCD tipe 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Setelah menjalankan contoh SCD jenis 2, tabel target akan berisi catatan berikut:

userId nama kota __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 nihil
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 nihil
126 Lily Cancun 2 nihil

Kueri SCD tipe 2 juga dapat menentukan subset kolom output yang akan dilacak untuk riwayat dalam tabel target. Perubahan pada kolom lain diperbarui di tempat daripada menghasilkan rekaman riwayat baru. Contoh berikut menunjukkan mengecualikan city kolom dari pelacakan:

Contoh berikut menunjukkan menggunakan riwayat trek dengan SCD tipe 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Setelah menjalankan contoh ini tanpa rekaman tambahan TRUNCATE , tabel target berisi rekaman berikut:

userId nama kota __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 nihil
125 Mercedes Guadalajara 2 nihil
126 Lily Cancun 2 nihil

Hasilkan data pengujian

Kode di bawah ini disediakan untuk menghasilkan contoh himpunan data untuk digunakan dalam contoh kueri yang ada dalam tutorial ini. Dengan asumsi Anda memiliki kredensial yang tepat untuk membuat skema baru dan membuat tabel baru, Anda dapat menjalankan pernyataan ini dengan notebook atau Databricks SQL. Kode berikut tidak dimaksudkan untuk dijalankan sebagai bagian dari alur Delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Contoh: Pemrosesan rekam jepret berkala

Contoh berikut menunjukkan pemrosesan SCD tipe 2 yang menyerap rekam jepret tabel yang disimpan di mycatalog.myschema.mytable. Hasil pemrosesan ditulis ke tabel bernama target.

mycatalog.myschema.mytable rekaman pada tanda waktu 2024-01-01 00:00:00

Tombol Nilai
1 a1
2 a2

mycatalog.myschema.mytable rekaman pada tanda waktu 2024-01-01 12:00:00

Tombol Nilai
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Setelah memproses rekam jepret, tabel target berisi rekaman berikut:

Tombol Nilai __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 nihil
3 a3 2024-01-01 12:00:00 nihil

Contoh: Pemrosesan rekam jepret historis

Contoh berikut menunjukkan pemrosesan SCD tipe 2 yang memperbarui tabel target berdasarkan peristiwa sumber dari dua rekam jepret yang disimpan dalam sistem penyimpanan cloud:

Rekam jepret di timestamp, disimpan di /<PATH>/filename1.csv

Tombol TrackingColumn NonTrackingColumn
1 a1 b1
2 a2 b2
4 a4 b4

Rekam jepret di timestamp + 5, disimpan di /<PATH>/filename2.csv

Tombol TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

Contoh kode berikut menunjukkan pemrosesan pembaruan SCD tipe 2 dengan rekam jepret ini:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Setelah memproses rekam jepret, tabel target berisi rekaman berikut:

Tombol TrackingColumn NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 nihil
3 a3 b3 2 nihil
4 a4 b4_new 1 nihil

Menambahkan, mengubah, atau menghapus data dalam tabel streaming target

Jika alur Anda menerbitkan tabel ke Unity Catalog, Anda dapat menggunakan pernyataan bahasa manipulasi data (DML), termasuk pernyataan sisipkan, perbarui, hapus, dan gabungkan, untuk mengubah tabel streaming target yang dibuat oleh APPLY CHANGES INTO pernyataan.

Catatan

  • Pernyataan DML yang mengubah skema tabel tabel streaming tidak didukung. Pastikan bahwa pernyataan DML Anda tidak mencoba mengembangkan skema tabel.
  • Pernyataan DML yang memperbarui tabel streaming hanya dapat dijalankan di kluster Unity Catalog bersama atau gudang SQL menggunakan Databricks Runtime 13.3 LTS ke atas.
  • Karena streaming memerlukan sumber data khusus tambahan, jika pemrosesan Anda memerlukan streaming dari tabel streaming sumber dengan perubahan (misalnya, menurut pernyataan DML), atur bendera skipChangeCommits saat membaca tabel streaming sumber. Saat skipChangeCommits diatur, transaksi yang menghapus atau mengubah rekaman pada tabel sumber diabaikan. Jika pemrosesan Anda tidak memerlukan tabel streaming, Anda dapat menggunakan tampilan materialisasi (yang tidak memiliki batasan khusus tambahan) sebagai tabel target.

Karena Tabel Langsung Delta menggunakan kolom tertentu dan menyebarluaskan SEQUENCE BY nilai urutan yang sesuai ke __START_AT kolom dan __END_AT tabel target (untuk SCD tipe 2), Anda harus memastikan bahwa pernyataan DML menggunakan nilai yang valid untuk kolom ini untuk mempertahankan urutan rekaman yang tepat. Lihat Bagaimana CDC diimplementasikan dengan APPLY CHANGES API?.

Untuk informasi selengkapnya tentang menggunakan pernyataan DML dengan tabel streaming, lihat Menambahkan, mengubah, atau menghapus data dalam tabel streaming.

Contoh berikut menyisipkan rekaman aktif dengan urutan awal 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Mendapatkan data tentang rekaman yang diproses oleh kueri CDC Tabel Langsung Delta

Catatan

Metrik berikut hanya diambil oleh APPLY CHANGES kueri, dan bukan oleh APPLY CHANGES FROM SNAPSHOT kueri.

Metrik berikut diambil oleh APPLY CHANGES kueri:

  • num_upserted_rows: Jumlah baris output yang di-upsert ke dalam himpunan data selama pembaruan.
  • num_deleted_rows: Jumlah baris output yang ada yang dihapus dari himpunan data selama pembaruan.

num_output_rows Metrik, yang merupakan output untuk alur non-CDC, tidak diambil untuk apply changes kueri.

Objek data apa yang digunakan untuk pemrosesan CDC Delta Live Tables?

Catatan: Struktur data berikut hanya berlaku untuk APPLY CHANGES pemrosesan, bukan APPLY CHANGES FROM SNAPSHOT pemrosesan.

Saat Anda mendeklarasikan tabel target di metastore Apache Hive, dua struktur data dibuat:

  • Tampilan menggunakan nama yang ditetapkan ke tabel target.
  • Tabel dukungan internal yang digunakan oleh Delta Live Tables untuk mengelola pemrosesan CDC. Tabel ini dinamai dengan prepending __apply_changes_storage_ ke nama tabel target.

Misalnya, jika Anda mendeklarasikan tabel target bernama dlt_cdc_target, Anda akan melihat tampilan bernama dlt_cdc_target dan tabel bernama __apply_changes_storage_dlt_cdc_target di metastore. Membuat tampilan memungkinkan Tabel Langsung Delta memfilter informasi tambahan (misalnya, batu nisan dan versi) yang diperlukan untuk menangani data yang tidak berurutan. Untuk menampilkan data yang diproses, kueri tampilan target. Karena skema __apply_changes_storage_ tabel mungkin berubah untuk mendukung fitur atau penyempurnaan di masa mendatang, Anda tidak boleh mengkueri tabel untuk penggunaan produksi. Jika Anda menambahkan data secara manual ke tabel, rekaman diasumsikan datang sebelum perubahan lain karena kolom versi hilang.

Jika alur diterbitkan ke Unity Catalog, tabel dukungan internal tidak dapat diakses oleh pengguna.