Share via


使用 Delta Live Tables 管理數據品質

您可以使用預期來定義資料集內容的數據品質條件約束。 預期可讓您保證數據抵達數據表符合數據品質需求,並提供每個管線更新數據品質的深入解析。 您可以使用 Python 裝飾專案或 SQL 條件約束子句將期望套用至查詢。

Delta Live Tables 的期望為何?

預期是選擇性子句,您可以新增至 Delta Live Tables 數據集宣告,以對傳遞查詢的每個記錄套用數據質量檢查。

預期包含三件事:

  • 描述,做為唯一標識符,並可讓您追蹤條件約束的計量。
  • 布爾語句,一律會根據某些陳述條件傳回 true 或 false。
  • 當記錄失敗預期時要採取的動作,表示布爾值會傳回 false。

下列矩陣顯示您可以套用至無效記錄的三個動作:

動作 結果
warn (預設值) 無效的記錄會寫入目標;失敗會回報為數據集的計量。
下降 將數據寫入目標之前,會卸除無效的記錄;失敗會回報為數據集的計量。
fail 無效的記錄可防止更新成功。 在重新處理之前,需要手動介入。

您可以藉由查詢 Delta Live Tables 事件記錄來檢視數據品質計量,例如違反預期記錄的記錄數目。 請參閱 監視 Delta Live Tables 管線

如需 Delta Live Tables 數據集宣告語法的完整參考,請參閱 Delta Live Tables Python 語言參考Delta Live Tables SQL 語言參考

注意

雖然您可以在任何期望中包含多個子句,但只有 Python 支援根據多個期望來定義動作。 請參閱 多個預期

保留無效的記錄

expect當您想要保留違反預期之記錄時,請使用 運算符。 違反預期記錄的記錄會連同有效的記錄一起新增至目標數據集:

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

卸除無效的記錄

expect or drop使用運算符來防止進一步處理無效的記錄。 違反預期的記錄會從目標數據集卸除:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

無效的記錄失敗

當無效的記錄無法接受時,請使用 expect or fail 運算元,在記錄驗證失敗時立即停止執行。 如果作業是數據表更新,系統會以不可部分完成的方式回復交易:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

當管線因為預期違規而失敗時,您必須修正管線程序代碼,以在重新執行管線之前正確處理無效的數據。

失敗預期會修改轉換的 Spark 查詢計劃,以追蹤偵測和報告違規所需的資訊。 對於許多查詢,您可以使用這項資訊來識別導致違規的輸入記錄。 以下是範例例外狀況:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

多重預期

您可以在 Python 管線中使用一或多個資料品質條件約束來定義預期。 這些裝飾專案接受 Python 字典做為自變數,其中索引鍵是預期名稱,而值是預期條件約束。

當目標資料集中應包含失敗驗證的記錄時,請使用 expect_all 指定多個資料品質條件約束:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

當應該從目標數據集卸除驗證失敗的記錄時,請使用 expect_all_or_drop 指定多個數據品質條件約束:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

當失敗驗證的記錄應該停止管線執行時,請使用 expect_all_or_fail 來指定多個數據品質條件約束:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

您也可以將預期集合定義為變數,並將其傳遞至管線中的一或多個查詢:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

隔離無效的數據

下列範例會使用預期搭配臨時表和檢視表。 此模式提供您在管線更新期間通過預期檢查的記錄計量,並提供一種方式,透過不同的下游路徑處理有效和無效的記錄。

注意

此範例會讀取 Databricks 數據集中包含的範例數據。 由於發佈至 Unity 目錄的管線不支援 Databricks 數據集,因此此範例只適用於設定為發行至 Hive 中繼存放區的管線。 不過,此模式也適用於已啟用 Unity 目錄的管線,但您必須從 外部位置讀取數據。 若要深入了解搭配 Delta Live Tables 使用 Unity 目錄,請參閱搭配您的 Delta Live Tables 管線使用 Unity 目錄 (英文)。

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

驗證跨數據表的數據列計數

您可以將額外的數據表新增至管線,以定義預期來比較兩個實時數據表之間的數據列計數。 此預期的結果會出現在事件記錄檔和 Delta Live Tables UI 中。 下列範例會驗證 和 tblb 資料表之間的tbla相等數據列計數:

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

使用 Delta Live Tables 預期執行進階驗證

您可以使用匯總和聯結查詢來定義即時數據表,並使用這些查詢的結果作為預期檢查的一部分。 如果您想要執行複雜的數據質量檢查,例如,確保衍生數據表包含源數據表中的所有記錄,或保證數據表之間的數值數據行相等,這非常有用。 您可以使用 TEMPORARY 關鍵詞來防止這些數據表發佈至目標架構。

下列範例會驗證數據表中 report 是否有所有預期的記錄:

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

下列範例會使用匯總來確保主鍵的唯一性:

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

讓預期可攜式且可重複使用

您可以將資料質量規則與管線實作分開維護。

Databricks 建議將規則儲存在 Delta 數據表中,每個規則都會依標記分類。 您可以在資料集定義中使用此標籤來判斷要套用的規則。

下列範例會建立名為 rules 的數據表,以維護規則:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

下列 Python 範例會根據儲存在數據表中的 rules 規則來定義數據質量預期。 函 get_rules() 式會從 rules 數據表讀取規則,並傳回 Python 字典,其中包含符合 tag 傳遞至函式之自變數的規則。 字典會套用在裝飾專案中 @dlt.expect_all_*() ,以強制執行數據品質條件約束。 例如,任何失敗規則的記錄都會從raw_farmers_market數據表中卸除標記的規則validity

注意

此範例會讀取 Databricks 數據集中包含的範例數據。 由於發佈至 Unity 目錄的管線不支援 Databricks 數據集,因此此範例只適用於設定為發行至 Hive 中繼存放區的管線。 不過,此模式也適用於已啟用 Unity 目錄的管線,但您必須從 外部位置讀取數據。 若要深入了解搭配 Delta Live Tables 使用 Unity 目錄,請參閱搭配您的 Delta Live Tables 管線使用 Unity 目錄 (英文)。

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

例如,您可以在與筆記本相同資料夾中名為 rules_module.py 的檔案中,建立名為 rules 的數據表來維護規則,而不是建立 Python 模組至主要規則:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

然後,藉由匯入模組並將函式變更為從模組讀取,而不是從rules數據表中讀取,以修改get_rules()上述筆記本:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )