Správa kvality dat pomocí dynamických tabulek Delta
Pomocí očekávání definujete omezení kvality dat v obsahu datové sady. Očekávání umožňují zaručit, že data přicházející do tabulek splňují požadavky na kvalitu dat a poskytují přehled o kvalitě dat pro každou aktualizaci kanálu. Na dotazy s využitím dekorátorů Pythonu nebo klauzulí omezení SQL použijete očekávání.
Co jsou očekávání Delta Live Tables?
Očekávání jsou volitelné klauzule, které přidáte do deklarací datových sad Delta Live Tables, které používají kontroly kvality dat u každého záznamu předávajícího dotazem.
Očekávání se skládá ze tří věcí:
- Popis, který funguje jako jedinečný identifikátor a umožňuje sledovat metriky omezení.
- Logický příkaz, který vždy vrátí hodnotu true nebo false na základě některé stavové podmínky.
- Akce, která se má provést, když záznam selže s očekáváním, což znamená, že logická hodnota vrátí hodnotu false.
Následující matice ukazuje tři akce, které můžete použít u neplatných záznamů:
Akce | Výsledek |
---|---|
upozornění (výchozí) | Do cíle jsou zapsány neplatné záznamy; selhání je hlášeno jako metrika pro datovou sadu. |
kapka | Neplatné záznamy se zahodí před zápisem dat do cíle; selhání je hlášeno jako metrika pro datovou sadu. |
selhat | Neplatné záznamy brání úspěšné aktualizaci. Před opětovným zpracováním je vyžadován ruční zásah. |
Metriky kvality dat, například počet záznamů, které porušují očekávání, můžete zobrazit dotazováním protokolu událostí Delta Live Tables. Viz Monitorování kanálů dynamických tabulek Delta.
Úplný odkaz na syntaxi deklarací datové sady Delta Live Tables najdete v referenční dokumentaci jazyka Pythonu pro rozdílové živé tabulky nebo referenční informace k jazyku SQL Delta Live Tables.
Poznámka:
- V jakémkoli očekávání můžete zahrnout více klauzulí, ale Python podporuje definování akcí na základě více očekávání. Podívejte se na několik očekávání.
- Očekávání musí být definována pomocí výrazů SQL. Syntaxi bez SQL (například funkce Pythonu) nelze použít při definování očekávání.
Zachování neplatných záznamů
Operátor expect
použijte, pokud chcete uchovávat záznamy, které porušují očekávání. Záznamy, které porušují očekávání, se přidají do cílové datové sady spolu s platnými záznamy:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Odstranění neplatných záznamů
Pomocí operátoru expect or drop
můžete zabránit dalšímu zpracování neplatných záznamů. Záznamy, které porušují očekávání, se z cílové datové sady zahodí:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Selhání u neplatných záznamů
Pokud jsou neplatné záznamy nepřijatelné, ukončete expect or fail
spuštění okamžitě, když záznam selže s ověřením. Pokud je operace aktualizace tabulky, systém atomicky vrátí zpět transakci:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Důležité
Pokud máte v kanálu definovaných více paralelních toků, selhání jednoho toku nezpůsobí selhání jiných toků.
Pokud kanál selže z důvodu porušení očekávání, musíte před opětovným spuštěním kanálu opravit kód kanálu, který zpracuje neplatná data správně.
Očekávání selhání upravují plán dotazů Sparku vašich transformací tak, aby sledovaly informace potřebné k detekci a hlášení porušení. U mnoha dotazů můžete pomocí těchto informací zjistit, který vstupní záznam způsobil narušení. Následuje příklad výjimky:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
Více očekávání
V kanálech Pythonu můžete definovat očekávání s jedním nebo více omezeními kvality dat. Tyto dekorátory přijímají slovník Pythonu jako argument, kde klíč je očekávaný název a hodnota je omezení očekávání.
Slouží expect_all
k určení více omezení kvality dat, pokud by záznamy, které selžou ověření, měly být zahrnuty do cílové datové sady:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Slouží expect_all_or_drop
k určení více omezení kvality dat, když se záznamy, které selžou ověření, by se měly z cílové datové sady vynechat:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Slouží expect_all_or_fail
k určení více omezení kvality dat, pokud by záznamy, které selžou ověření, zastavily provádění kanálu:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Můžete také definovat kolekci očekávání jako proměnnou a předat ji jednomu nebo více dotazům v kanálu:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
Umístit neplatná data do karantény
Následující příklad používá očekávání v kombinaci s dočasnými tabulkami a zobrazeními. Tento model poskytuje metriky pro záznamy, které během aktualizací kanálu procházejí očekávanými kontrolami, a poskytuje způsob zpracování platných a neplatných záznamů prostřednictvím různých podřízených cest.
Poznámka:
Tento příklad načte ukázková data zahrnutá v datových sadách Databricks. Vzhledem k tomu, že datové sady Databricks nejsou podporovány kanálem, který se publikuje do katalogu Unity, tento příklad funguje jenom s kanálem nakonfigurovaným pro publikování do metastoru Hive. Tento model ale funguje také s kanály s povoleným katalogem Unity, ale musíte číst data z externích umístění. Další informace o používání katalogu Unity s dynamickými tabulkami Delta najdete v tématu Použití katalogu Unity s kanály Delta Live Tables.
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
dlt.read("raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=true")
)
Ověření počtu řádků napříč tabulkami
Do kanálu můžete přidat další tabulku, která definuje očekávané porovnání počtu řádků mezi dvěma materializovanými zobrazeními nebo tabulkami streamování. Výsledky tohoto očekávání se zobrazí v protokolu událostí a v uživatelském rozhraní Delta Live Tables. Následující příklad ověří počet stejných řádků mezi tabulkami tbla
a tblb
tabulkami:
CREATE OR REFRESH MATERIALIZED VIEW count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
Provádění rozšířeného ověřování s očekáváními rozdílových živých tabulek
Materializovaná zobrazení můžete definovat pomocí agregačních dotazů a spojit dotazy a použít výsledky těchto dotazů jako součást kontroly očekávání. To je užitečné, pokud chcete provádět složité kontroly kvality dat, například zajistit, aby odvozená tabulka obsahovala všechny záznamy ze zdrojové tabulky nebo zaručit rovnost číselného sloupce v tabulkách.
Následující příklad ověří, že jsou v report
tabulce přítomny všechny očekávané záznamy:
CREATE MATERIALIZED VIEW report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
Následující příklad používá agregaci k zajištění jedinečnosti primárního klíče:
CREATE MATERIALIZED VIEW report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
Přenosná a opakovaně použitelná očekávání
Pravidla kvality dat můžete udržovat odděleně od implementací kanálů.
Databricks doporučuje ukládat pravidla v tabulce Delta s jednotlivými pravidly zařazenými do kategorií podle značky. Tuto značku použijete v definicích datové sady k určení pravidel, která se mají použít.
Následující příklad vytvoří tabulku s názvem rules
pro údržbu pravidel:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
Následující příklad Pythonu definuje očekávání kvality dat na základě pravidel uložených rules
v tabulce. Funkce get_rules()
čte pravidla z rules
tabulky a vrátí slovník Pythonu obsahující pravidla odpovídající tag
argumentu předaného funkci. Slovník se použije v @dlt.expect_all_*()
dekorátorech k vynucení omezení kvality dat. Například všechny záznamy, které validity
selhávají, se pravidla označená znaménou z tabulky zahodí raw_farmers_market
:
Poznámka:
Tento příklad načte ukázková data zahrnutá v datových sadách Databricks. Vzhledem k tomu, že datové sady Databricks nejsou podporovány kanálem, který se publikuje do katalogu Unity, tento příklad funguje jenom s kanálem nakonfigurovaným pro publikování do metastoru Hive. Tento model ale funguje také s kanály s povoleným katalogem Unity, ale musíte číst data z externích umístění. Další informace o používání katalogu Unity s dynamickými tabulkami Delta najdete v tématu Použití katalogu Unity s kanály Delta Live Tables.
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
Místo vytvoření tabulky s názvem rules
pro správu pravidel můžete vytvořit modul Pythonu pro hlavní pravidla, například v souboru rules_module.py
pojmenovaném ve stejné složce jako poznámkový blok:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
Potom upravte předchozí poznámkový blok importem modulu a změnou get_rules()
funkce na čtení z modulu místo z rules
tabulky:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)