Share via


套用變更 API:簡化差異實時數據表中的異動數據擷取

Delta Live Tables 可簡化 API 的 APPLY CHANGES 異動數據擷取 (CDC)。 先前, MERGE INTO 語句通常用於處理 Azure Databricks 上的 CDC 記錄。 不過, MERGE INTO 由於順序錯亂的記錄,或需要複雜的邏輯來重新排序記錄,可能會產生不正確的結果。

藉由自動處理順序外記錄, APPLY CHANGES Delta 即時數據表中的 API 可確保正確處理 CDC 記錄,並移除開發處理順序外記錄的複雜邏輯的需求。

差異 APPLY CHANGES 實時數據表 SQL 和 Python 介面支援 API,包括支援使用 SCD 類型 1 和類型 2 更新資料表:

  • 使用 SCD 類型 1 直接更新記錄。 更新的記錄不會保留記錄。
  • 使用 SCD 類型 2 來保留記錄的歷程記錄,無論是在所有更新或更新到一組指定的數據行上。

如需語法和其他參考,請參閱:

注意

本文說明如何根據源數據變更,更新 Delta Live Tables 管線中的數據表。 若要瞭解如何記錄和查詢 Delta 數據表的數據列層級變更資訊,請參閱 在 Azure Databricks 上使用 Delta Lake 變更數據摘要。

CDC 如何使用 Delta Live Tables 來實作?

您必須在源數據中指定要排序記錄的數據行,而 Delta Live Tables 會將其解譯為單調增加源數據順序的表示法。 Delta Live Tables 會自動處理依序抵達的數據。 針對 SCD 類型 2 變更,Delta Live Tables 會將適當的排序值傳播至 __START_AT 目標數據表的 和 __END_AT 數據行。 每個排序值的每個索引鍵都應該有一個不同的更新,而且不支援 NULL 排序值。

若要使用 Delta Live Tables 執行 CDC 處理,請先建立串流數據表,然後使用 APPLY CHANGES INTO 語句來指定變更摘要的來源、索引鍵和排序。 若要建立目標串流數據表,請在 SQL 中使用 CREATE OR REFRESH STREAMING TABLE 語句,或在 Python 中使用 函 create_streaming_table() 式。 若要建立定義 CDC 處理的語句,請在 APPLY CHANGES SQL 中使用 語句,或在 apply_changes() Python 中使用 函式。 如需語法詳細數據,請參閱 在 Delta Live Tables 中使用 SQL 變更數據擷取,或在 Delta Live Tables 中使用 Python 變更數據擷取。

哪些數據對象用於 Delta Live Tables CDC 處理?

當您在Hive中繼存放區中宣告目標數據表時,會建立兩個數據結構:

  • 使用指派給目標數據表的名稱檢視。
  • Delta Live Tables 用來管理 CDC 處理的內部備份數據表。 此數據表是以目標數據表名稱前面加上 __apply_changes_storage_ 來命名。

例如,如果您宣告名為 dlt_cdc_target的目標數據表,您會看到名為 dlt_cdc_target 的檢視和中繼存放區中名為 __apply_changes_storage_dlt_cdc_target 的數據表。 建立檢視可讓 Delta Live Tables 篩選出處理順序錯亂數據所需的額外資訊(例如,墓碑和版本)。 若要檢視已處理的數據,請查詢目標檢視。 因為數據表的 __apply_changes_storage_ 架構可能會變更以支持未來的功能或增強功能,因此您不應該查詢數據表以供生產環境使用。 如果您手動將數據新增至數據表,則會假設記錄在其他變更之前,因為版本數據行遺失。

如果管線發佈至 Unity 目錄,則使用者無法存取內部備份數據表。

取得差異實時數據表 CDC 查詢所處理記錄的相關數據

查詢會 apply changes 擷取下列計量:

  • num_upserted_rows:更新期間,插入數據集的輸出數據列數目。
  • num_deleted_rows:更新期間從數據集刪除的現有輸出數據列數目。

num_output_rows 擷取非 CDC 流程輸出的計量不會擷取查詢 apply changes

限制

查詢或apply_changes函式的目標APPLY CHANGES INTO不能做為串流數據表的來源。 從查詢或apply_changes函式目標讀取的APPLY CHANGES INTO數據表必須是具體化檢視。

Azure Databricks 上的 SCD 類型 1 和 SCD 類型 2

下列各節提供示範 Delta Live Tables SCD 類型 1 和類型 2 查詢的範例,這些查詢會根據來源事件來更新目標數據表:

  1. 建立新的用戶記錄。
  2. 刪除用戶記錄。
  3. 更新用戶記錄。 在 SCD 類型 1 範例中,最後一個 UPDATE 作業遲到且從目標數據表卸除,示範如何處理順序錯亂的事件。

下列範例假設熟悉設定和更新 Delta Live Tables 管線。 請參閱 教學課程:執行您的第一個 Delta Live Tables 管線

若要執行這些範例,您必須從建立範例數據集開始。 請參閱 產生測試數據

以下是這些範例的輸入記錄:

userId NAME 市/鎮 作業 sequenceNum
124 勞爾 瓦哈卡州 INSERT 1
123 伊莎貝爾 蒙特雷 INSERT 1
125 梅 賽 德 斯 提 華納 INSERT 2
126 莉莉 坎昆 INSERT 2
123 null null DELETE 6
125 梅 賽 德 斯 瓜達拉哈拉 UPDATE 6
125 梅 賽 德 斯 Mexicali UPDATE 5
123 伊莎貝爾 吉 娃娃 UPDATE 5

如果您取消批注範例數據中的最後一個數據列,它會插入下列記錄,指定應截斷記錄的位置:

userId NAME 市/鎮 作業 sequenceNum
null null null TRUNCATE 3

注意

下列所有範例都包含可同時指定 DELETETRUNCATE 作業的選項,但每個選項都是選擇性的。

處理 SCD 類型 1 更新

下列程式代碼範例示範處理 SCD 型態 1 更新:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

執行 SCD 類型 1 範例之後,目標資料表會包含下列記錄:

userId NAME 市/鎮
124 勞爾 瓦哈卡州
125 梅 賽 德 斯 瓜達拉哈拉
126 莉莉 坎昆

使用其他TRUNCATE記錄執行 SCD 類型 1 範例之後,會124126因為 TRUNCATE 位於sequenceNum=3的作業而截斷 和 ,而目標數據表包含下列記錄:

userId NAME 市/鎮
125 梅 賽 德 斯 瓜達拉哈拉

處理 SCD 類型 2 更新

下列程式代碼範例示範處理 SCD 型態 2 更新:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

執行 SCD 類型 2 範例之後,目標資料表會包含下列記錄:

userId NAME 市/鎮 __START_AT __END_AT
123 伊莎貝爾 蒙特雷 1 5
123 伊莎貝爾 吉 娃娃 5 6
124 勞爾 瓦哈卡州 1 null
125 梅 賽 德 斯 提 華納 2 5
125 梅 賽 德 斯 Mexicali 5 6
125 梅 賽 德 斯 瓜達拉哈拉 6 null
126 莉莉 坎昆 2 null

SCD 類型 2 查詢也可以指定要追蹤目標資料表中記錄的輸出資料行子集。 其他數據行的變更會就地更新,而不是產生新的記錄記錄。 下列範例示範從追蹤中排除數據 city 行:

下列範例示範搭配 SCD 類型 2 使用追蹤歷程記錄:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

在沒有其他 TRUNCATE 記錄的情況下執行此範例之後,目標數據表會包含下列記錄:

userId NAME 市/鎮 __START_AT __END_AT
123 伊莎貝爾 吉 娃娃 1 6
124 勞爾 瓦哈卡州 1 null
125 梅 賽 德 斯 瓜達拉哈拉 2 null
126 莉莉 坎昆 2 null

產生測試數據

下列程式代碼會提供來產生範例數據集,以用於本教學課程中的範例查詢。 假設您有適當的認證來建立新的架構並建立新的數據表,您可以使用筆記本或 Databricks SQL 來執行這些語句。 下列程式代碼 不是 要當做 Delta Live Tables 管線的一部分執行:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

在目標串流數據表中新增、變更或刪除數據

如果您的管線將數據表發佈至 Unity 目錄,您可以使用 資料操作語言 (DML) 語句,包括插入、更新、刪除和合併語句,修改語句所 APPLY CHANGES INTO 建立的目標串流數據表。

注意

  • 不支援修改串流數據表之數據表架構的 DML 語句。 請確定您的 DML 語句不會嘗試演進數據表架構。
  • 更新串流數據表的 DML 語句只能在共用的 Unity 目錄叢集或 SQL 倉儲中使用 Databricks Runtime 13.3 LTS 和更新版本來執行。
  • 因為串流需要僅附加數據源,如果您的處理需要從具有變更的來源串流數據表進行串流處理(例如 DML 語句),請在讀取來源串流數據表時設定 skipChangeCommits 旗 標。 設定時 skipChangeCommits ,會忽略刪除或修改源數據表上記錄的交易。 如果您的處理不需要串流數據表,您可以使用具體化檢視表(沒有僅附加限制)作為目標數據表。

由於 Delta Live Tables 會使用指定的 SEQUENCE BY 數據行,並將適當的排序值傳播至 __START_AT 目標數據表的 和 __END_AT 數據行(針對 SCD 類型 2),因此您必須確保 DML 語句使用這些數據行的有效值,以維護記錄的適當順序。 請參閱 CDC 如何使用 Delta Live Tables 實作?

如需搭配串流數據表使用 DML 語句的詳細資訊,請參閱 在串流數據表中新增、變更或刪除數據。

下列範例會插入使用中記錄,開始序列為 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);