Lakeflow Spark 聲明式管線(SDP)使用AUTO CDC 和 AUTO CDC FROM SNAPSHOT API 簡化變更資料擷取(CDC)。
備註
AUTO CDC API 會取代 APPLY CHANGES API,並具有相同的語法。
APPLY CHANGES API 仍然可用,但 Databricks 建議使用 AUTO CDC API 取而代之。
您使用的介面取決於變更資料的來源:
- 用
AUTO CDC來處理來自變更資料匯流排(CDF)的變更。 - 使用
AUTO CDC FROM SNAPSHOT(公開預覽,且僅適用於 Python) 來處理資料庫快照集中的變更。
先前,此 MERGE INTO 陳述式通常用於處理 Azure Databricks 上的 CDC 記錄。 不過,可能會 MERGE INTO 因為記錄順序不正確而產生不正確的結果,或需要複雜的邏輯來重新排序記錄。
在管道 SQL 和 Python 介面中支援AUTO CDC API。 Python 介面支援 AUTO CDC FROM SNAPSHOT API。 Apache Spark 宣告式管線不支援這些 AUTO CDC API。
兩者都支持 AUTO CDC 和 AUTO CDC FROM SNAPSHOT 使用 SCD 類型 1 及類型 2 更新資料表。
- 使用 SCD 類型 1 來直接更新記錄。 更新記錄不會保存歷史。
- 使用 SCD 第二類型來保留記錄歷史,不論是針對所有更新,還是針對一組指定欄位的更新。
如需語法和其他參考,請參閱適用於管線的 AUTO CDC (SQL)、適用於管線的 AUTO CDC (Python) 和適用於管線的 AUTO CDC FROM SNAPSHOT (Python)。
備註
本文說明如何根據來源資料的變更來更新管線中的資料表。 若要瞭解如何記錄和查詢 Delta 資料表的資料列層級變更資訊,請參閱在 Azure Databricks 上使用 Delta Lake 變更資料摘要。
需求
若要使用 CDC API,您的管線必須設定為使用 無伺服器 SDP 或 SDP Pro 或 Advanced版本。
如何使用 AUTO CDC API 實現 CDC?
透過自動處理順序不對的記錄,AUTO CDC API 可確保正確處理 CDC 記錄,並消除開發處理順序不對的記錄的複雜邏輯的需要。 您必須在來源資料中指定要排序記錄的直欄,API 會將其解譯為來源資料正確排序的單調遞增表示法。 管線會自動處理無序到達的資料。 對於 SCD 類型 2 變更,管線會將適當的排序值傳播至目標資料表的 __START_AT 和 __END_AT 資料行。 每個排序值的每個索引鍵都應該有一個不同的更新,而且不支援 NULL 排序值。
若要使用 AUTO CDC 執行 CDC 處理,請先建立串流資料表,然後在 SQL 中使用 AUTO CDC ... INTO 陳述式或在 Python 中使用 create_auto_cdc_flow() 函式來指定變更摘要的來源、索引鍵和排序。 若要建立目標串流資料表,請使用 CREATE OR REFRESH STREAMING TABLE SQL 中的陳述式或 create_streaming_table() Python 中的函數。 請參閱 SCD 類型 1 及類型 2 處理 範例。
如需語法詳細資料,請參閱管線 SQL 參考或Python 參考。
CDC 如何使用 API 實 AUTO CDC FROM SNAPSHOT 作?
這很重要
API AUTO CDC FROM SNAPSHOT 處於 公開預覽狀態。
AUTO CDC FROM SNAPSHOT 是一個宣告式 API,可透過比較一系列依序快照來有效地判斷來源資料的變更,然後執行 CDC 處理快照中記錄所需的處理。
AUTO CDC FROM SNAPSHOT 僅受 Python 管線介面支援。
AUTO CDC FROM SNAPSHOT 支援從多個來源類型擷取快照:
- 利用定期的快照匯入,將現有資料表或檢視中的快照匯入。
AUTO CDC FROM SNAPSHOT具有簡單、簡化的介面,可支援定期從現有資料庫物件擷取快照。 每次管線更新都會載入新的快照,且載入時間會作為快照版本。 當管線以連續模式執行時,會擷取多個快照集,每個管線都會在包含處理之流程的AUTO CDC FROM SNAPSHOT設定所決定的期間內更新。 - 使用歷史快照擷取來處理包含資料庫快照的檔案,例如從 Oracle 或 MySQL 資料庫或資料倉儲產生的快照。
若要使用 AUTO CDC FROM SNAPSHOT從任何來源類型執行 CDC 處理,請先建立串流資料表,然後使用 create_auto_cdc_from_snapshot_flow() Python 中的函數來指定實作處理所需的快照、索引鍵和其他引數。 請參閱 定期快照擷取 和 歷史快照擷取 範例。
傳遞給 API 的快照必須依版本遞增順序。 如果 SDP 偵測到無序快照,則會拋出錯誤。
如需語法詳細資料,請參閱資料處理管道的 Python 參考。
使用多欄進行定序
您可以按多個列進行排序(例如,時間戳記和 ID 以中斷平局),您可以使用 STRUCT 將它們組合起來:它首先按 STRUCT 的第一個欄位排序,如果出現平局,則考慮第二個欄位,依此類推。
SQL 中的範例:
SEQUENCE BY STRUCT(timestamp_col, id_col)
Python 範例:
sequence_by = struct("timestamp_col", "id_col")
局限性
用於排序的直欄必須是可排序的資料類型。
範例:使用 CDF 來源資料的 SCD 類型 1 及 SCD 類型 2 處理
下列各節提供 SCD 類型 1 和類型 2 查詢的範例,這些查詢會參照變更資料饋送中的來源事件來更新目標資料表:
- 建立新的使用者記錄。
- 刪除使用者記錄。
- 更新使用者記錄。 在 SCD 類型 1 範例中,最後的
UPDATE作業順序延遲到達,並被從目標表格中捨棄,這展示了亂序事件的處理方式。
下列範例假設您熟悉設定和更新管線。 請參閱 教學課程:使用變更資料擷取建置 ETL 管線。
若要執行這些範例,您必須先建立範例資料集。 請參閱 產生測試資料。
以下是這些範例的輸入記錄:
| userId | 名稱 | city | 作業 | 序列編號 |
|---|---|---|---|---|
| 124 | 勞爾 | 瓦哈卡州 | INSERT | 1 |
| 123 | Isabel | 蒙特雷 | INSERT | 1 |
| 125 | 梅賽德斯 | 蒂華納 | INSERT | 2 |
| 126 | 莉莉 | 坎昆 | INSERT | 2 |
| 123 | null | null | 刪除 | 6 |
| 125 | 梅賽德斯 | Guadalajara | UPDATE | 6 |
| 125 | 梅賽德斯 | 墨西卡利 | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
如果您取消註解範例資料中的最後一列,它會插入下列記錄,以指定記錄應截斷的位置:
| userId | 名稱 | city | 作業 | 序列編號 |
|---|---|---|---|---|
| null | null | null | 截斷 | 3 |
備註
下列所有範例都包含指定DELETE和TRUNCATE作業的選項,但每個選項都是可選的。
處理 SCD 類型 1 更新
下列範例示範處理 SCD 類型 1 更新項目:
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flowname AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
執行 SCD 類型 1 範例之後,目標表格包含下列記錄:
| userId | 名稱 | city |
|---|---|---|
| 124 | 勞爾 | 瓦哈卡州 |
| 125 | 梅賽德斯 | Guadalajara |
| 126 | 莉莉 | 坎昆 |
使用額外的TRUNCATE記錄執行 SCD 類型 1 範例之後,因為124作業在126處理,TRUNCATE和 sequenceNum=3 的記錄被截斷,而目標表格包含下列記錄:
| userId | 名稱 | city |
|---|---|---|
| 125 | 梅賽德斯 | Guadalajara |
處理 SCD 類型 2 更新
下列範例示範如何處理 SCD 類型 2 更新:
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
執行 SCD 類型 2 範例之後,目標表格包含下列記錄:
| userId | 名稱 | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | 蒙特雷 | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | 勞爾 | 瓦哈卡州 | 1 | null |
| 125 | 梅賽德斯 | 蒂華納 | 2 | 5 |
| 125 | 梅賽德斯 | 墨西卡利 | 5 | 6 |
| 125 | 梅賽德斯 | Guadalajara | 6 | null |
| 126 | 莉莉 | 坎昆 | 2 | null |
SCD 類型 2 查詢也可以指定要在目標表格中追蹤歷程的輸出欄位子集。 對其他欄位的變更會直接更新,而不是生成新的歷史紀錄。 下列範例示範如何排除追蹤中的 city 欄:
下列範例示範如何使用 SCD Type 2 來追蹤歷程:
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
在沒有其他 TRUNCATE 記錄的情況下執行此範例之後,目標表格會包含下列記錄:
| userId | 名稱 | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | 勞爾 | 瓦哈卡州 | 1 | null |
| 125 | 梅賽德斯 | Guadalajara | 2 | null |
| 126 | 莉莉 | 坎昆 | 2 | null |
產生測試資料
下列程式碼是用來產生範例資料集,以用於本教學課程中的範例查詢。 假設您擁有建立新結構描述和建立新資料表的適當認證,您可以使用筆記本或 Databricks SQL 執行這些陳述式。 以下程式碼不被設計為管線定義的一部分執行:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
範例:定期快照處理
下列範例示範 SCD 類型 2 處理,該處理程序會擷取儲存在 mycatalog.myschema.mytable的資料表快照集。 處理結果會寫入名為 target的表格。
mycatalog.myschema.mytable 時間戳記的記錄 2024-01-01 00:00:00
| Key | 價值觀 |
|---|---|
| 1 | a1 |
| 2 | a2 |
mycatalog.myschema.mytable 時間戳記處的記錄 2024-01-01 12:00:00
| Key | 價值觀 |
|---|---|
| 2 | B2 |
| 3 | a3 |
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
處理快照之後,目標資料表會包含下列記錄:
| Key | 價值觀 | __START_AT | __END_AT |
|---|---|---|---|
| 1 | a1 | 2024年01月01日 00時00分00秒 | 2024-01-01 12:00:00 |
| 2 | a2 | 2024年01月01日 00時00分00秒 | 2024-01-01 12:00:00 |
| 2 | B2 | 2024-01-01 12:00:00 | null |
| 3 | a3 | 2024-01-01 12:00:00 | null |
範例:歷史快照處理
下列範例示範 SCD 類型 2 處理程序,該處理程序會根據儲存在雲端儲存體系統中的兩個 Snapshot 的來源事件來更新目標表格:
快照位於 timestamp,儲存在 /<PATH>/filename1.csv
| Key | 追蹤欄 | 不追蹤欄 |
|---|---|---|
| 1 | a1 | b1 |
| 2 | a2 | B2 |
| 4 | 答覆4 | B4 |
快照位於 timestamp + 5,儲存在 /<PATH>/filename2.csv
| Key | 追蹤欄 | 不追蹤欄 |
|---|---|---|
| 2 | a2_new | B2 |
| 3 | a3 | B3 |
| 4 | 答覆4 | b4_new |
下列程式碼範例示範如何使用這些快照集處理 SCD 類型 2 更新:
from pyspark import pipelines as dp
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dp.create_streaming_live_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
處理快照之後,目標資料表會包含下列記錄:
| Key | 追蹤欄 | 不追蹤欄 | __START_AT | __END_AT |
|---|---|---|---|---|
| 1 | a1 | b1 | 1 | 2 |
| 2 | a2 | B2 | 1 | 2 |
| 2 | a2_new | B2 | 2 | null |
| 3 | a3 | B3 | 2 | null |
| 4 | 答覆4 | b4_new | 1 | null |
新增、變更或刪除目標串流資料表中的資料
如果您的管線將資料表發佈至 Unity 目錄,您可以使用 資料操作語言 (DML) 陳述式,包括插入、更新、刪除和合併陳述式,來修改陳述式所 AUTO CDC ... INTO 建立的目標串流資料表。
備註
- 不支援修改串流數據表之數據表架構的 DML 語句。 請確定您的 DML 語句不會嘗試演進數據表架構。
- 更新串流數據表的 DML 語句只能在共用的 Unity 目錄叢集或 SQL 倉儲中使用 Databricks Runtime 13.3 LTS 和更新版本來執行。
- 因為串流處理需要只能附加的資料來源,如果您的處理需要從具有變更的來源串流資料表進行串流處理(例如 DML 語句),請在讀取來源串流資料表時設定 skipChangeCommits 旗標。 設定
skipChangeCommits時,會忽略刪除或修改源數據表上記錄的交易。 如果您的處理不需要串流數據表,您可以使用具象化視圖(不受僅附加限制)作為目標數據表。
由於 Lakeflow Spark 宣告式管線會使用指定的 SEQUENCE BY 資料行,並將適當的排序值傳播至 __START_AT 目標資料表的 和 __END_AT 資料行 (適用於 SCD 類型 2),因此您必須確保 DML 陳述式使用這些資料行的有效值,以維護記錄的正確順序。 請參閱如何使用 AUTO CDC API 實作 CDC?。
如需搭配串流資料表使用 DML 陳述式的詳細資訊,請參閱 新增、變更或刪除串流資料表中的資料。
下列範例會插入起始序列為 5 的有效記錄:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
從 AUTO CDC 目標資料表讀取變更資料摘要
在 Databricks Runtime 15.2 和更新版本中,您可以從作為 AUTO CDC 或 AUTO CDC FROM SNAPSHOT 查詢目標的串流資料表中,讀取變更資料提要,其方式與從其他 Delta 表讀取變更資料提要的方法相同。 從目標串流資料表讀取變更資料提要需要下列項目:
- 目標串流資料表必須發佈至 Unity 目錄。 請參閱 使用 Unity 目錄搭配管線。
- 若要從目標串流數據表讀取變更數據摘要,您必須使用 Databricks Runtime 15.2 或更新版本。 若要讀取不同管線中的變更數據摘要,管線必須設定為使用 Databricks Runtime 15.2 或更新版本。
您可以從在 Lakeflow Spark 宣告式管線中建立的目標串流資料表讀取變更資料摘要,其方式與從其他 Delta 資料表讀取變更資料摘要的方式相同。 若要深入瞭解如何使用 Delta 變更資料摘要功能,包括 Python 和 SQL 中的範例,請參閱在 Azure Databricks 上使用 Delta Lake 變更資料摘要。
備註
變更資料摘要記錄包含識別變更事件類型的 中繼資料 。 在表格中更新記錄時,相關聯變更記錄的中繼資料通常包括 _change_type 設定為 update_preimage 和 update_postimage 事件的值。
不過, _change_type 如果對目標串流資料表進行更新,包括變更主索引鍵值,則值會有所不同。 當變更包含主索引鍵的更新時, _change_type 中繼資料欄位會設定為 insert 和 delete 事件。 當使用 UPDATE 或 MERGE 陳述式手動更新其中一個鍵欄位時,或者對於 SCD 類型 2 表格,當 __start_at 欄位變更以反映較早的啟動順序值時,可能會發生主鍵的變更。
查詢會決定主鍵值,這些值在 SCD 類型 1 和 SCD 類型 2 的處理過程中有所不同:
- 對於 SCD 類型 1 處理及管線 Python 介面,主鍵是
keys函數中create_auto_cdc_flow()參數的值。 對於 SQL 介面,主鍵是由KEYS子句在AUTO CDC ... INTO陳述式中定義的欄。 - 對於 SCD 類型 2,主鍵是
keys參數或KEYS子句加上coalesce(__START_AT, __END_AT)作業的傳回值,其中__START_AT和__END_AT是目標串流表格中的對應欄位。
取得管線中 CDC 查詢所處理之記錄的相關資料
備註
下列指標只會由 AUTO CDC 查詢擷取,而不是由 AUTO CDC FROM SNAPSHOT 查詢擷取。
查詢 AUTO CDC 會收集以下指標:
-
num_upserted_rows:更新期間更新插入資料集的輸出列數。 -
num_deleted_rows:更新期間從資料集中刪除的現有輸出列數。
num_output_rows 指標(非 CDC 流程的輸出)不會針對 AUTO CDC 查詢擷取。
哪些資料物件用於管線中的 CDC 處理?
備註
- 這些資料結構僅適用於
AUTO CDC處理,不適用於AUTO CDC FROM SNAPSHOT處理。 - 只有在目標資料表發佈至 Hive 中繼存放區時,才會套用這些資料結構。 如果管線發佈至 Unity 目錄,使用者就無法存取內部備份資料表。
當您在 Hive 中繼存放區中宣告目標資料表時,會建立兩個資料結構:
- 使用目標表格所指派名稱的檢視。
- 管線用來管理 CDC 處理的內部備份資料表。 此表格的命名方式是將
__apply_changes_storage_添加在目標表格名稱之前。
例如,如果您宣告名為 dp_cdc_target的目標資料表,您會在中繼存放區中看到名為 dp_cdc_target 的檢視和名為 __apply_changes_storage_dp_cdc_target 的資料表。 建立檢視可讓 Lakeflow Spark 宣告式管線篩選掉為了處理亂序資料而需要的額外資訊(例如,墓碑記錄和版本資訊)。 若要檢視已處理的資料,請查詢目標檢視。 因為資料表的 __apply_changes_storage_ 結構描述可能會變更以支援未來的功能或增強功能,所以您不應該查詢資料表以供生產使用。 如果您手動將資料新增至資料表,則會假設記錄先於其他變更,因為缺少版本直欄。