Správa kvality dat pomocí dynamických tabulek Delta

Pomocí očekávání definujete omezení kvality dat v obsahu datové sady. Očekávání umožňují zaručit, že data přicházející do tabulek splňují požadavky na kvalitu dat a poskytují přehled o kvalitě dat pro každou aktualizaci kanálu. Na dotazy s využitím dekorátorů Pythonu nebo klauzulí omezení SQL použijete očekávání.

Co jsou očekávání Delta Live Tables?

Očekávání jsou volitelné klauzule, které přidáte do deklarací datových sad Delta Live Tables, které používají kontroly kvality dat u každého záznamu předávajícího dotazem.

Očekávání se skládá ze tří věcí:

  • Popis, který funguje jako jedinečný identifikátor a umožňuje sledovat metriky omezení.
  • Logický příkaz, který vždy vrátí hodnotu true nebo false na základě některé stavové podmínky.
  • Akce, která se má provést, když záznam selže s očekáváním, což znamená, že logická hodnota vrátí hodnotu false.

Následující matice ukazuje tři akce, které můžete použít u neplatných záznamů:

Akce Výsledek
upozornění (výchozí) Do cíle jsou zapsány neplatné záznamy; selhání je hlášeno jako metrika pro datovou sadu.
Drop Neplatné záznamy se zahodí před zápisem dat do cíle; selhání je hlášeno jako metrika pro datovou sadu.
Selhání Neplatné záznamy brání úspěšné aktualizaci. Před opětovným zpracováním je vyžadován ruční zásah.

Metriky kvality dat, například počet záznamů, které porušují očekávání, můžete zobrazit dotazováním protokolu událostí Delta Live Tables. Viz Monitorování kanálů dynamických tabulek Delta.

Úplný odkaz na syntaxi deklarací datové sady Delta Live Tables najdete v referenční dokumentaci jazyka Pythonu pro rozdílové živé tabulky nebo referenční informace k jazyku SQL Delta Live Tables.

Poznámka:

V jakémkoli očekávání můžete zahrnout více klauzulí, ale Python podporuje definování akcí na základě více očekávání. Podívejte se na několik očekávání.

Zachování neplatných záznamů

Operátor expect použijte, pokud chcete uchovávat záznamy, které porušují očekávání. Záznamy, které porušují očekávání, se přidají do cílové datové sady spolu s platnými záznamy:

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Odstranění neplatných záznamů

Pomocí operátoru expect or drop můžete zabránit dalšímu zpracování neplatných záznamů. Záznamy, které porušují očekávání, se z cílové datové sady zahodí:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Selhání u neplatných záznamů

Pokud jsou neplatné záznamy nepřijatelné, ukončete expect or fail spuštění okamžitě, když záznam selže s ověřením. Pokud je operace aktualizace tabulky, systém atomicky vrátí zpět transakci:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Pokud kanál selže z důvodu porušení očekávání, musíte před opětovným spuštěním kanálu opravit kód kanálu, který zpracuje neplatná data správně.

Očekávání selhání upravují plán dotazů Sparku vašich transformací tak, aby sledovaly informace potřebné k detekci a hlášení porušení. U mnoha dotazů můžete pomocí těchto informací zjistit, který vstupní záznam způsobil narušení. Následuje příklad výjimky:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Více očekávání

V kanálech Pythonu můžete definovat očekávání s jedním nebo více omezeními kvality dat. Tyto dekorátory přijímají slovník Pythonu jako argument, kde klíč je očekávaný název a hodnota je omezení očekávání.

Slouží expect_all k určení více omezení kvality dat, pokud by záznamy, které selžou ověření, měly být zahrnuty do cílové datové sady:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Slouží expect_all_or_drop k určení více omezení kvality dat, když se záznamy, které selžou ověření, by se měly z cílové datové sady vynechat:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Slouží expect_all_or_fail k určení více omezení kvality dat, pokud by záznamy, které selžou ověření, zastavily provádění kanálu:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Můžete také definovat kolekci očekávání jako proměnnou a předat ji jednomu nebo více dotazům v kanálu:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Umístit neplatná data do karantény

Následující příklad používá očekávání v kombinaci s dočasnými tabulkami a zobrazeními. Tento model poskytuje metriky pro záznamy, které během aktualizací kanálu procházejí očekávanými kontrolami, a poskytuje způsob zpracování platných a neplatných záznamů prostřednictvím různých podřízených cest.

Poznámka:

Tento příklad načte ukázková data zahrnutá v datových sadách Databricks. Vzhledem k tomu, že datové sady Databricks nejsou podporovány kanálem, který se publikuje do katalogu Unity, tento příklad funguje jenom s kanálem nakonfigurovaným pro publikování do metastoru Hive. Tento model ale funguje také s kanály s povoleným katalogem Unity, ale musíte číst data z externích umístění. Další informace o používání katalogu Unity s dynamickými tabulkami Delta najdete v tématu Použití katalogu Unity s kanály Delta Live Tables.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Ověření počtu řádků napříč tabulkami

Do kanálu můžete přidat další tabulku, která definuje očekávané porovnání počtu řádků mezi dvěma živými tabulkami. Výsledky tohoto očekávání se zobrazí v protokolu událostí a v uživatelském rozhraní Delta Live Tables. Následující příklad ověří počet stejných řádků mezi tabulkami tbla a tblb tabulkami:

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Provádění rozšířeného ověřování s očekáváními rozdílových živých tabulek

Živé tabulky můžete definovat pomocí agregačních dotazů a spojit dotazy a výsledky těchto dotazů používat jako součást kontroly očekávání. To je užitečné, pokud chcete provádět složité kontroly kvality dat, například zajistit, aby odvozená tabulka obsahovala všechny záznamy ze zdrojové tabulky nebo zaručit rovnost číselného sloupce v tabulkách. Pomocí klíčového TEMPORARY slova můžete zabránit publikování těchto tabulek do cílového schématu.

Následující příklad ověří, že jsou v report tabulce přítomny všechny očekávané záznamy:

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

Následující příklad používá agregaci k zajištění jedinečnosti primárního klíče:

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Přenosná a opakovaně použitelná očekávání

Pravidla kvality dat můžete udržovat odděleně od implementací kanálů.

Databricks doporučuje ukládat pravidla v tabulce Delta s jednotlivými pravidly zařazenými do kategorií podle značky. Tuto značku použijete v definicích datové sady k určení pravidel, která se mají použít.

Následující příklad vytvoří tabulku s názvem rules pro údržbu pravidel:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

Následující příklad Pythonu definuje očekávání kvality dat na základě pravidel uložených rules v tabulce. Funkce get_rules() čte pravidla z rules tabulky a vrátí slovník Pythonu obsahující pravidla odpovídající tag argumentu předaného funkci. Slovník se použije v @dlt.expect_all_*() dekorátorech k vynucení omezení kvality dat. Například všechny záznamy, které validity selhávají, se pravidla označená znaménou z tabulky zahodí raw_farmers_market :

Poznámka:

Tento příklad načte ukázková data zahrnutá v datových sadách Databricks. Vzhledem k tomu, že datové sady Databricks nejsou podporovány kanálem, který se publikuje do katalogu Unity, tento příklad funguje jenom s kanálem nakonfigurovaným pro publikování do metastoru Hive. Tento model ale funguje také s kanály s povoleným katalogem Unity, ale musíte číst data z externích umístění. Další informace o používání katalogu Unity s dynamickými tabulkami Delta najdete v tématu Použití katalogu Unity s kanály Delta Live Tables.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Místo vytvoření tabulky s názvem rules pro správu pravidel můžete vytvořit modul Pythonu pro hlavní pravidla, například v souboru rules_module.py pojmenovaném ve stejné složce jako poznámkový blok:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Potom upravte předchozí poznámkový blok importem modulu a změnou get_rules() funkce na čtení z modulu místo z rules tabulky:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )