使用 Delta Live Tables 管理資料品質
您可以使用預期來定義資料集內容的資料品質限制式。 預期可讓您保證傳送至資料表中的資料符合資料品質要求,並可讓您深入瞭解每次管線更新的資料品質。 您可以使用 Python 裝飾項目或 SQL 限制式子句將預期套用至查詢。
Delta Live Tables 的預期為何?
預期您新增到 Delta Live Tables 資料集聲明中的選用子句,可在經過查詢的每條記錄上套用資料品質檢查。
預期包含三件事:
- 描述,做為唯一識別碼,並可讓您追蹤限制式的計量。
- 布林陳述式,一律會根據某些陳述條件傳回 true 或 false。
- 當記錄失敗預期時要採取的動作,表示布林值會傳回 false。
下列矩陣顯示您可以套用至無效記錄的三個動作:
動作 | 結果 |
---|---|
warn (預設值) | 無效的記錄會寫入目標;失敗會回報為資料集的計量。 |
卸除 | 將資料寫入目標之前,會捨棄無效的記錄;失敗會回報為資料集的計量。 |
fail | 無效的記錄可防止更新成功。 在重新處理之前,需要手動介入。 |
您可以藉由查詢 Delta Live Tables 事件記錄來檢視資料品質計量,例如違反預期記錄的記錄數目。 請參閱監視差異即時資料表管線。
如需 Delta Live Tables 資料集宣告語法的完整參考,請參閱 Delta Live Tables Python 語言參考 或 Delta Live Tables SQL 語言參考。
注意
- 雖然您可以在任何期望中包含多個子句,但只有 Python 支援根據多個期望來定義動作。 請參閱多個期望。
- 預期必須使用 SQL 運算式來定義。 在定義預期時,您無法使用非 SQL 語法 (例如 Python 函式)。
保留無效的記錄
當您想要保留違反預期之記錄時,請使用 expect
運算子。 違反預期記錄的記錄會連同有效的記錄一起新增至目標資料集:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
捨棄無效的記錄
使用 expect or drop
運算子來防止進一步處理無效的記錄。 違反預期的記錄會從目標資料集捨棄:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
無效的記錄失敗
當無效的記錄無法接受時,請使用 expect or fail
運算子,在記錄驗證失敗時立即停止執行。 如果作業是資料表更新,系統會以不可部分完成的方式復原交易:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
重要
如果您在管線中定義了多個平行流程,單一流程失敗並不會造成其他流程失敗。
當管線因為預期違規而失敗時,您必須修正管線程式碼,以在重新執行管線之前正確處理無效的資料。
失敗預期會修改轉換的 Spark 查詢計劃,以追蹤偵測和報告違規所需的資訊。 對於許多查詢,您可以使用這項資訊來識別導致違規的輸入記錄。 以下是一個範例例外。
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
多個期望。
您可以在 Python 管線中使用一或多個資料品質限制式來定義預期。 這些裝飾項目接受 Python 字典做為引數,其中索引鍵是預期名稱,而值是預期限制式。
當目標資料集中應包含失敗驗證的記錄時,請使用 expect_all
指定多個資料品質限制式:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
當應該從目標資料集捨棄驗證失敗的記錄時,請使用 expect_all_or_drop
指定多個資料品質限制式:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
當失敗驗證的記錄應該停止管線執行時,請使用 expect_all_or_fail
來指定多個資料品質限制式:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
您也可以將預期集合定義為變數,並將其傳遞至管線中的一或多個查詢:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
隔離無效的資料
下列範例會使用預期搭配臨時表和檢視。 此模式提供您在管線更新期間通過預期檢查的記錄計量,並提供一種方式,透過不同的下游路徑處理有效和無效的記錄。
注意
此範例會讀取 Databricks 資料集中包含的範例資料。 由於發佈至 Unity 目錄的管線不支援 Databricks 資料集,因此此範例只適用於設定為發行至 Hive 中繼存放區的管線。 不過,此模式也適用於已啟用 Unity 目錄的管線,但您必須從 外部位置讀取資料。 若要深入了解搭配 Delta Live Tables 使用 Unity 目錄,請參閱搭配您的 Delta Live Tables 管線使用 Unity 目錄 (英文)。
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
dlt.read("raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=true")
)
驗證跨資料表的資料列計數
您可以將其他資料表新增至管線,以定義預期來比較兩個具體化檢視或串流資料表之間的資料列計數。 此預期的結果會出現在事件記錄檔和 Delta Live Tables UI 中。 下列範例會驗證 tbla
和 tblb
資料表之間的相等資料列計數:
CREATE OR REFRESH MATERIALIZED VIEW count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
使用 Delta Live Tables 預期執行進階驗證
您可以使用匯總和聯結查詢來定義具體化檢視,並使用這些查詢的結果作為預期檢查的一部分。 如果您想要執行複雜的資料品質檢查,例如,確保衍生資料表包含來源資料表中的所有記錄,或保證資料表之間的數值資料行相等,這非常有用。
下列範例會驗證 report
資料表中是否有所有預期的記錄:
CREATE MATERIALIZED VIEW report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
下列範例會使用匯總來確保主索引鍵的唯一性:
CREATE MATERIALIZED VIEW report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
讓預期可攜且可重複使用
您可以將資料品質規則與管線實作分開維護。
Databricks 建議將規則儲存在 Delta 資料表中,每個規則都會依標記分類。 您可以在資料集定義中使用此標籤來判斷要套用的規則。
下列範例會建立名為 rules
的規則:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
下列 Python 範例會根據儲存在資料表中的 rules
規則來定義資料品質預期。 get_rules()
函式會從 rules
資料表讀取規則,並傳回 Python 字典,其中包含符合傳遞至函式之 tag
引數的規則。 字典會套用在 @dlt.expect_all_*()
裝飾項目中,以強制執行資料品質限制式。 例如,任何失敗規則的記錄都會從 raw_farmers_market
資料表中捨棄標記 validity
的規則:
注意
此範例會讀取 Databricks 資料集中包含的範例資料。 由於發佈至 Unity 目錄的管線不支援 Databricks 資料集,因此此範例只適用於設定為發行至 Hive 中繼存放區的管線。 不過,此模式也適用於已啟用 Unity 目錄的管線,但您必須從 外部位置讀取資料。 若要深入了解搭配 Delta Live Tables 使用 Unity 目錄,請參閱搭配您的 Delta Live Tables 管線使用 Unity 目錄 (英文)。
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
例如,您可以在與筆記本相同資料夾中名為 rules_module.py
的檔案中,建立名為 rules
的資料表來維護規則,而不是在主要規則中建立 Python 模組:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
然後,藉由匯入模組並將 get_rules()
函式變更為從模組讀取,而不是從 rules
資料表中讀取,以修改上述筆記本:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)