Delta Live Tables を使用してデータ品質を管理する

期待値を使用して、データセットの内容に対してデータ品質制約を定義します。 期待値により、テーブルに到着するデータがデータ品質要件を満たすことを保証し、パイプラインの更新ごとにデータ品質に関する分析情報を提供できます。 Python デコレーターまたは SQL 制約句を使用して、期待値をクエリに適用します。

Delta Live Tables の期待値とは

期待値は、クエリを通過する各レコードにデータ品質チェックを適用する Delta Live Tables データセット宣言に追加する省略可能な句です。

期待値は、次の 3 つで構成されます。

  • 説明。一意識別子として機能し、制約のメトリックを追跡できるようにします。
  • 指定された条件に基づいて常に true または false を返すブール値ステートメント。
  • レコードが期待どおりでなかった場合、つまり、ブール値が false を返したときに実行するアクション。

次の表は、無効なレコードに適用できる 3 つのアクションを示しています。

アクション 結果
warn (既定値) 無効なレコードはターゲットに書き込まれます。データセットのメトリックとして失敗が報告されます。
drop データがターゲットに書き込まれる前に無効なレコードはドロップされます。データセットのメトリックとして失敗が報告されます。
fail 無効なレコードにより更新が失敗します。 再処理を行う前に、手動による介入が必要です。

Delta Live Tables イベント ログに対してクエリを実行すると、期待値に違反したレコードの数などのデータ品質メトリックを表示できます。 「Delta Live Tables パイプラインを監視する」をご覧ください。

Delta Live Tables データセットの宣言構文の完全なリファレンスについては、「Delta Live Tables Python 言語リファレンス」または「Delta Live Tables SQL 言語リファレンス」を参照してください。

Note

任意の期待値に複数の句を含めることができますが、複数の期待値に基づくアクションの定義は Python でのみサポートされます。 「複数の期待値」を参照してください。

無効なレコードを保持する

期待値に違反するレコードを保持する場合は、expect 演算子を使用します。 期待値に違反したレコードは、有効なレコードとともにターゲット データセットに追加されます。

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

無効なレコードを破棄する

無効なレコードがそれ以上処理されないようにするには、expect or drop 演算子を使用します。 期待値に違反したレコードは、ターゲット データセットから破棄されます。

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

無効なレコードで失敗する

無効なレコードが許容できない場合は、expect or fail 演算子を使用して、レコードが検証に失敗したときに即座に実行を停止します。 操作がテーブルの更新である場合、システムはアトミックにトランザクションをロール バックします。

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

期待値違反が原因でパイプラインが失敗した場合は、パイプラインを再実行する前に、無効なデータを正しく処理するようにパイプライン コードを修正する必要があります。

失敗した期待値の場合、違反を検出し報告するために必要な情報を追跡するように、変換の Spark クエリ プランが変更されます。 多くのクエリで、この情報を使用して、違反が発生した入力レコードを特定できます。 例外の例を次に示します。

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

複数の期待値

Python パイプラインで 1 つ以上のデータ品質制約を使用して期待値を定義できます。 これらのデコレーターでは、Python ディクショナリを引数として受け入れます。ここでキーは期待値名で、値は期待値制約です。

検証に失敗したレコードをターゲット データセットに含める必要がある場合は、expect_all を使用して、複数のデータ品質制約を指定します。

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

検証に失敗したレコードをターゲット データセットから破棄する必要がある場合は、expect_all_or_drop を使用して、複数のデータ品質制約を指定します。

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

レコードの検証に失敗したときにパイプラインの実行を停止する必要がある場合は、expect_all_or_fail を使用して、複数のデータ品質制約を指定します。

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

また、期待値のコレクションを変数として定義し、パイプライン内の 1 つ以上のクエリに渡すこともできます。

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

無効なデータの検疫

次の例では、一時テーブルおよびビューと組み合わせて期待値を使用しています。 このパターンでは、パイプラインの更新中に期待値チェックに合格するレコードのメトリックが提供され、有効なレコードと無効なレコードをさまざまなダウンストリーム パスで処理する方法が提供されます。

Note

この例では、Databricks データセットに含まれるサンプル データを読み取ります。 Databricks データセットは Unity Catalog に発行されるパイプラインではサポートされていないため、この例は Hive メタストアに発行されるように構成されたパイプラインでのみ使用できます。 ただし、このパターンは Unity Catalog 対応パイプラインでも機能しますが、外部の場所からデータを読み取る必要があります。 Delta Live Tables での Unity Catalog の使用の詳細については、「Delta Live Tables パイプラインでの Unity Catalog の使用」を参照してください。

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

テーブル間での行数の検証

2 つのライブ テーブル間の行数を比較するための期待値を定義する追加のテーブルをパイプラインに追加できます。 この想定の結果は、イベント ログと Delta Live Tables の UI に表示されます。 次の例では、tbla テーブルと tblb テーブルの間の行数が等しいかどうかを検証します。

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Delta Live Tables の期待値を使用して高度な検証を実行する

集計クエリと結合クエリを使ってライブ テーブルを定義し、これらのクエリの結果を期待値チェックの一環として使用できます。 複雑なデータ品質チェックを実行する、たとえば、派生テーブルにソース テーブルのすべてのレコードが含まれていることを確認する場合や、テーブル間で数値列が等しいことを保証する場合などにこれが役に立ちます。 TEMPORARY キーワードを使用して、これらのテーブルがターゲット スキーマに発行されないようにすることができます。

次の例では、期待されるすべてのレコードが report テーブルに存在することを検証しています。

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

次の例では、集計を使って、主キーの一意性を保証しています。

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

想定を移植可能で再利用可能にする

データ品質ルールは、パイプラインの実装とは分離して維持できます。

Databricks では、各ルールをタグで分類して Delta テーブルにルールを格納することをお勧めしています。 このタグをデータセット定義で使用して、適用するルールを決定します。

次の例では、rules という名前のテーブルを作成してルールを保持します。

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

次の Python の例では、rules テーブルに格納されているルールに基づいて、データ品質の期待値を定義しています。 get_rules() 関数は rules テーブルからルールを読み取り、関数に渡された tag 引数に一致するルールを含む Python ディクショナリを返します。 ディクショナリは、データ品質制約を適用するために @dlt.expect_all_*() デコレーターに適用されます。 たとえば、validity というタグが付いているルールで不合格となったレコードは、raw_farmers_market テーブルから削除されます。

Note

この例では、Databricks データセットに含まれるサンプル データを読み取ります。 Databricks データセットは Unity Catalog に発行されるパイプラインではサポートされていないため、この例は Hive メタストアに発行されるように構成されたパイプラインでのみ使用できます。 ただし、このパターンは Unity Catalog 対応パイプラインでも機能しますが、外部の場所からデータを読み取る必要があります。 Delta Live Tables での Unity Catalog の使用の詳細については、「Delta Live Tables パイプラインでの Unity Catalog の使用」を参照してください。

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

ルールを維持するために rules という名前のテーブルを作成する代わりに、たとえば、ノートブックと同じフォルダー内の rules_module.py という名前のファイルにルールを維持する Python モジュールを作成できます。

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

次に、モジュールをインポートし、rules テーブルからではなくモジュールから読み取る get_rules() 関数を変更し、上記のノートブックを変更します。

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )