變更資料擷取 (CDC) 是一種資料整合型樣,可擷取對來源系統中資料所做的變更,例如插入、更新及刪除。 這些變更以清單表示,通常稱為 CDC 摘要。 如果您在 CDC 摘要上操作,而不是讀取整個來源資料集,則可以更快地處理資料。 交易式資料庫 (例如 SQL Server、MySQL 和 Oracle) 會產生 CDC 摘要。 Delta 資料表會產生自己的 CDC 摘要,稱為變更資料摘要 (CDF)。
下圖顯示,當來源資料表中包含員工資料的資料列更新時,它會在 CDC 摘要中產生一組新的資料列,其中 僅 包含變更。 CDC 摘要的每一行通常都包含其他元資料,包括 UPDATE 的作業,例如,以及可用來確定地對 CDC 摘要中的每一行進行排序的資料行,以便您可以處理亂序更新。 例如, sequenceNum 下圖中的欄會決定 CDC 摘要中的資料列順序:
處理變更資料摘要:只保留最新資料,而不是保留資料的歷史版本
變更的資料饋送處理被稱為緩慢變動維度 (SCD)。 當您處理 CDC 資料流時,您可以做出選擇:
- 您是否只保留最新的資料(即覆蓋現有資料)? 這稱為 SCD 類型 1。
- 或者,您是否保留資料變更的歷程記錄? 這稱為 SCD 類型 2。
SCD 類型 1 處理涉及每當發生變更時,以新資料改寫舊資料。 這表示不會保留任何變更歷程記錄。 只有最新版本的資料可用。 這是一種簡單的方法,通常在變更歷史記錄不重要時使用,例如更正錯誤或更新客戶電子郵件地址等非關鍵欄位。
SCD 類型 2 處理會透過建立其他記錄來擷取一段時間內不同版本的資料,以維護資料變更的歷程記錄。 每個版本的資料都會加上時間戳記或標記中繼資料,讓使用者追蹤變更發生的時間。 當追蹤資料的演變很重要時,例如追蹤客戶地址隨時間的變化以進行分析,這非常有用。
使用 Lakeflow Spark 宣告式管線進行 SCD 類型 1 和類型 2 處理的範例
本節中的範例顯示如何使用SCD Type 1和Type 2。
步驟 1:準備範例資料
在此範例中,您將建立範例CDC feed。 首先,建立一個筆記本,並將以下程式碼貼到其中。 請將程式碼區塊開頭的變數更新為您有權限在其中建立表格和檢視的目錄和資料庫結構。
此程式碼建立一個包含多個變更記錄的 Delta 表格。 結構描述如下:
-
id- 整數,此員工的唯一識別碼 -
name- 字串,員工姓名 -
role- 字串,員工角色 -
country- 字串、國家/地區代碼、員工工作地點 -
operation- 變更類型 (例如,INSERT、UPDATE或DELETE) -
sequenceNum- 整數,識別來源資料中 CDC 事件的邏輯順序。 Lakeflow Spark 宣告式管道使用此順序來處理無序到達的變更事件。
# update these to the catalog and schema where you have permissions
# to create tables and views.
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
def write_employees_cdf_to_delta():
data = [
(1, "Alex", "chef", "FR", "INSERT", 1),
(2, "Jessica", "owner", "US", "INSERT", 2),
(3, "Mikhail", "security", "UK", "INSERT", 3),
(4, "Gary", "cleaner", "UK", "INSERT", 4),
(5, "Chris", "owner", "NL", "INSERT", 6),
# out of order update, this should be dropped from SCD Type 1
(5, "Chris", "manager", "NL", "UPDATE", 5),
(6, "Pat", "mechanic", "NL", "DELETE", 8),
(6, "Pat", "mechanic", "NL", "INSERT", 7)
]
columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")
write_employees_cdf_to_delta()
您可以使用下列 SQL 命令預覽此資料:
SELECT *
FROM mycatalog.myschema.employees_cdf
步驟 2:使用 SCD 類型 1 僅保留最新資料
建議您在 Lakeflow Spark 的宣告式管線中使用 AUTO CDC API,將變更數據提要處理為 SCD 類型 1 資料表。
- 建立新的筆記本。
- 將以下代碼粘貼到其中。
- 建立並連線到管線。
函式會將我們剛在上面建立的資料表以資料串流的形式讀取,因為用於變更資料擷取處理的 API 預期接受變更資料的串流作為輸入。 您用裝飾器 @dp.temporary_view 包裹它,因為您不想將此流實體化成表格。
然後,您可以使用 dp.create_target_table 來建立一個串流處理資料表,其中包含處理此變更資料饋送的結果。
最後,您使用 dp.create_auto_cdc_flow 來處理變更資料提要。 讓我們來看看每個論點:
-
target- 您先前定義的目標串流表格。 -
source- 您先前定義的變更記錄串流視圖。 -
keys- 識別變更摘要中的唯一資料列。 因為您使用id作為唯一識別碼,所以只需提供id作為唯一的識別資料行即可。 -
sequence_by- 指定來源資料中 CDC 事件邏輯順序的直欄名稱。 您需要此排序來處理無序到達的變更事件。 指定sequenceNum作為排序欄位。 -
apply_as_deletes- 因為範例資料包含刪除作業,所以您可以使用apply_as_deletes來指出何時應該將 CDC 事件視為DELETE而不是更新或插入。 -
except_column_list- 包含您不想包含在目標表格中的欄位清單。 在此範例中,您將使用此引數來排除sequenceNum和operation。 -
stored_as_scd_type- 指出您要使用的 SCD 類型。
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"
@dp.temporary_view
def employees_cdf():
return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")
dp.create_target_table(f"{catalog}.{schema}.{employees_table_current}")
dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_current}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
按一下 [開始] 來運行此管道。
然後,在 SQL 編輯器中執行下列查詢,以確認變更記錄已正確處理:
SELECT *
FROM mycatalog.myschema.employees_current
備註
員工 Chris 的異常更新已正確捨棄,因為其角色仍設定為 [所有者] 而不是 [經理]。
步驟 3:使用 SCD 類型 2 保留歷程資料
在此範例中,您會建立第二個目標表格,稱為 employees_historical,其中包含員工記錄變更的完整歷程記錄。
將此程式碼新增至您的管線。 這裡唯一的區別是設定 stored_as_scd_type 為 2 而不是 1。
dp.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")
dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_historical}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 2
)
按一下 [開始] 來運行此管道。
然後,在 SQL 編輯器中執行下列查詢,以確認變更記錄已正確處理:
SELECT *
FROM mycatalog.myschema.employees_historical
您將看到所有員工的變更,包括已刪除的員工,例如 Pat。
步驟 4:清理資源
完成時,請依照下列步驟清除資源:
刪除管線:
備註
當您刪除管線時,它會自動刪除
employees和employees_historical資料表。- 按一下 工作與管線,然後尋找要刪除的管線名稱。
- 按一下
在同一列中,輸入管線名稱,然後按一下刪除。
刪除筆記本。
刪除包含變更資料摘要的資料表:
- 按一下 [新增 > 查詢]。
- 貼上並執行下列 SQL 程式碼,視需要調整目錄和結構描述:
DROP TABLE mycatalog.myschema.employees_cdf
使用 MERGE INTO 和 foreachBatch 進行變更資料擷取的缺點
Databricks 提供的 MERGE INTO SQL 命令可與 foreachBatch API 一起使用,將資料列「更新或插入」至 Delta 資料表。 本節探討如何將此技術用於簡單的用例,但當應用於現實場景時,這種方法會變得越來越複雜和脆弱。
在此範例中,您將使用與先前範例相同的變更資料摘要。
基本實作 MERGE INTO 和 foreachBatch
建立筆記本,並將下列程式碼複製到其中。 視需要變更 catalog、 schema和 employees_table 變數。
catalog和schema變數應該設定為 Unity 目錄中可以建立資料表的位置。
當您執行筆記本時,它會執行下列動作:
- 在
create_table中建立目標表。 與自動處理此步驟的create_auto_cdc_flow不同,您必須指定結構描述。 - 將變更資料饋送讀取為串流。 每個微批次都使用
upsertToDelta方法進行處理,該方法會執行MERGE INTO命令。
catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"
def upsertToDelta(microBatchDF, batchId):
microBatchDF.createOrReplaceTempView("updates")
microBatchDF.sparkSession.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING)
""")
create_table()
cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")
cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()
若要查看結果,請執行下列 SQL 查詢:
SELECT *
FROM mycatalog.myschema.employees_merge
不幸的是,結果不正確,如下所示:
對相同微批次中相同金鑰的多次更新
第一個問題是程式碼無法處理對同一微批次中相同索引鍵的多次更新。 例如,您使用 INSERT 插入員工 Chris,然後將其角色從「所有者」更新為「經理」。 這應該會產生一列,但實際上有兩列。
當微批次中的相同索引鍵有多個更新時,哪個變更會勝出?
邏輯變得更加複雜。 下列程式碼範例會擷取最新的資料列 sequenceNum ,並僅將該資料合併到目標資料表中,如下所示:
- 根據主鍵
id分組。 - 取得該鍵值相對應的批次中具有最大
sequenceNum值的資料列的所有欄位。 - 將該行擴展回原狀。
將方法 upsertToDelta 更新如下,然後執行程式:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
當您查詢目標資料表時,您會看到名為 Chris 的員工具有正確的角色,但仍有其他問題需要解決,因為目標資料表中仍顯示已刪除的記錄。
跨微批次的無序更新
本節探討跨微批次的亂序更新問題。 下圖說明問題:如果 Chris 的資料列在第一個微批次中有一個UPDATE作業,然後在後續的微批次中有一個INSERT作業,該怎麼辦? 程式碼無法正確處理此問題。
當多個微批次中相同金鑰的更新順序不正確時,哪個變更會獲勝?
若要修正此問題,請展開程式碼以在每一列中儲存版本,如下所示:
- 儲存
sequenceNum上次更新資料列的時間。 - 針對每個新資料列,檢查時間戳記是否大於儲存的時間戳記,然後套用下列邏輯:
- 如果較大,請使用目標中的新資料。
- 否則,請將資料保留在來源中。
首先,更新 createTable 方法以儲存 sequenceNum,因為您將使用它來對每一列進行版本控制:
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING, sequenceNum INT)
""")
接下來,更新 upsertToDelta 以處理列版本。
UPDATE SET的MERGE INTO子句需要分別處理每一列。
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
處理刪除
不幸的是,程式碼仍然有問題。 它不處理 DELETE 作業,這一點可以從員工 Pat 仍在目標資料表中看出。
假設刪除與微批次同時到達。 若要處理它們,請再次更新 upsertToDelta 方法,以便在變更資料記錄指示刪除時刪除資料列,如下所示:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
處理刪除後無序抵達的更新
不幸的是,上面的程式碼仍然不太正確,因為它無法處理在微批次中,DELETE 後接著出現順序錯亂的 UPDATE 的情況。
處理此案例的演算法需要保留刪除記錄,以便處理後續的順序錯亂的更新。 要達成此目的:
- 不要立即刪除資料列,而是使用時間戳記或
sequenceNum進行軟刪除。 虛刪除的資料列會被標記為墓碑狀態。 - 將所有使用者重新導向至過濾掉墓碑的檢視。
- 建立清理工作,以隨著時間移除墓碑。
使用下列程式碼:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
您的使用者無法直接使用目標資料表,因此請建立可供查詢的檢視:
CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL
最後,建立定期移除墓碑資料列的清除作業。
DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY