Az adatminőség kezelése Delta Live Tables használatával
Az elvárásokat az adathalmaz tartalmára vonatkozó adatminőségi korlátozások meghatározására használja. Az elvárások lehetővé teszik, hogy a táblákba érkező adatok megfeleljenek az adatminőségi követelményeknek, és betekintést nyújt az egyes folyamatfrissítések adatminőségébe. Elvárásokat alkalmazhat a Python-dekorátorok vagy SQL-kényszer záradékok használatával végzett lekérdezésekre.
Mik a Delta Live Tables elvárásai?
Az elvárások olyan opcionális záradékok, amelyeket a Delta Live Tables adatkészlet-deklarációihoz ad hozzá, amelyek adatminőség-ellenőrzést alkalmaznak a lekérdezésen áthaladó minden rekordon.
A várakozás három dologból áll:
- Egy leírás, amely egyedi azonosítóként működik, és lehetővé teszi a korlátozás metrikáinak nyomon követését.
- Logikai utasítás, amely mindig igaz vagy hamis értéket ad vissza valamilyen megadott feltétel alapján.
- Olyan művelet, amely akkor hajtható végre, ha egy rekord nem felel meg a várakozásnak, vagyis a logikai érték hamis értéket ad vissza.
Az alábbi mátrix az érvénytelen rekordokra alkalmazható három műveletet mutatja be:
Művelet | Eredmény |
---|---|
figyelmeztetés (alapértelmezett) | Érvénytelen rekordok vannak megírva a célhoz; hibajelentés az adathalmaz metrikájaként történik. |
csepp | Az adatok célba írása előtt érvénytelen rekordok kerülnek elvetésére; hibajelentés az adathalmaz metrikájaként történik. |
megbukik | Érvénytelen rekordok megakadályozzák a frissítés sikerességét. Az újrafeldolgozás előtt manuális beavatkozásra van szükség. |
A Delta Live Tables eseménynaplójának lekérdezésével megtekintheti az adatminőségi metrikákat, például a várakozást sértő rekordok számát. Lásd: Delta Live Tables-folyamatok figyelése.
A Delta Live Tables adatkészlet-deklarációs szintaxisának teljes referenciáját lásd: Delta Live Tables Python nyelvi referencia vagy Delta Live Tables SQL nyelvi referencia.
Feljegyzés
- Bár bármilyen elvárásba több záradékot is belefoglalhat, csak a Python támogatja a műveletek több elváráson alapuló definiálását. Tekintse meg a több elvárást.
- Az elvárásokat SQL-kifejezések használatával kell meghatározni. Nem SQL-szintaxist (például Python-függvényeket) nem használhat az elvárás meghatározásakor.
Érvénytelen rekordok megőrzése
Használja az operátort expect
, ha olyan rekordokat szeretne tárolni, amelyek megsértik a várakozást. A várakozást sértő rekordok hozzáadódnak a céladatkészlethez az érvényes rekordokkal együtt:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Érvénytelen rekordok elvetése
Az operátorral expect or drop
megakadályozhatja az érvénytelen rekordok további feldolgozását. A várakozást sértő rekordokat a rendszer elveti a céladatkészletből:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Érvénytelen rekordok meghiúsulnak
Ha érvénytelen rekordok nem fogadhatók el, az operátorral azonnal leállíthatja a expect or fail
végrehajtást, ha egy rekord érvényesítése meghiúsul. Ha a művelet táblafrissítés, a rendszer atomi módon visszaállítja a tranzakciót:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Fontos
Ha egy folyamatban több párhuzamos folyamat van definiálva, egyetlen folyamat meghibásodása nem okoz más folyamatok meghiúsulását.
Ha egy folyamat egy várakozási szabálysértés miatt meghiúsul, a folyamat újrafuttatása előtt ki kell javítania a folyamat kódját, hogy megfelelően kezelje az érvénytelen adatokat.
A sikertelen várakozások úgy módosítják az átalakítások Spark-lekérdezési tervét, hogy nyomon kövessék a szabálysértések észleléséhez és jelentéséhez szükséges információkat. Számos lekérdezés esetén ezekkel az információkkal azonosíthatja, hogy melyik bemeneti rekord eredményezte a szabálysértést. A következő példa kivétel:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
Több elvárás
A Python-folyamatokban egy vagy több adatminőségi korlátozással definiálhat elvárásokat. Ezek a dekorátorok argumentumként elfogadnak egy Python-szótárat, ahol a kulcs a várakozás neve, az érték pedig a várakozási kényszer.
Több adatminőségi korlátozás megadására használható expect_all
, ha a sikertelen érvényesítést tartalmazó rekordokat fel kell venni a céladatkészletbe:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Több adatminőségi korlátozás megadására használható expect_all_or_drop
, ha a sikertelen érvényesítést tartalmazó rekordokat el kell dobni a céladatkészletből:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Több adatminőségi korlátozás megadására használható expect_all_or_fail
, ha a sikertelen érvényesítést tartalmazó rekordoknak le kell állítaniuk a folyamat végrehajtását:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Az elvárások gyűjteményét változóként is definiálhatja, és átadhatja a folyamat egy vagy több lekérdezésének:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
Érvénytelen adatok karanténba helyezése
Az alábbi példa az elvárásokat használja ideiglenes táblákkal és nézetekkel kombinálva. Ez a minta metrikákat biztosít a folyamatfrissítések során a várakozási ellenőrzéseket átadó rekordokhoz, és lehetővé teszi az érvényes és érvénytelen rekordok feldolgozását különböző lefelé irányuló útvonalakon keresztül.
Feljegyzés
Ez a példa a Databricks-adathalmazokban szereplő mintaadatokat olvassa be. Mivel a Databricks-adatkészletek nem támogatottak a Unity Catalogban közzétett folyamatokkal, ez a példa csak a Hive metaadattárban való közzétételre konfigurált folyamattal működik. Ez a minta azonban a Unity Catalog által engedélyezett folyamatokkal is működik, de külső helyekről kell adatokat olvasnia. Ha többet szeretne tudni a Unity Catalog és a Delta Live Tables használatáról, olvassa el a Unity Catalog használata a Delta Live Tables-folyamatokkal című témakört.
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
dlt.read("raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=true")
)
Sorok számának ellenőrzése táblák között
Hozzáadhat egy további táblát a folyamathoz, amely meghatározza azt a várakozást, hogy összehasonlítsa a sorok számát két materializált nézet vagy streamelési tábla között. Ennek a várakozásnak az eredményei megjelennek az eseménynaplóban és a Delta Live Tables felhasználói felületén. Az alábbi példa az egyenlő sorok számát ellenőrzi a tbla
táblák között tblb
:
CREATE OR REFRESH MATERIALIZED VIEW count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
Speciális ellenőrzés végrehajtása a Delta Live Tables elvárásaival
A materializált nézeteket összesítő és összekapcsoló lekérdezésekkel határozhatja meg, és a lekérdezések eredményeit a várakozási ellenőrzés részeként használhatja. Ez akkor hasznos, ha összetett adatminőség-ellenőrzéseket szeretne végezni, például biztosíthatja, hogy egy származtatott tábla tartalmazza a forrástáblából származó összes rekordot, vagy garantálja a numerikus oszlopok egyenlőségét a táblák között.
Az alábbi példa ellenőrzi, hogy az összes várt rekord szerepel-e a report
táblában:
CREATE MATERIALIZED VIEW report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
Az alábbi példa egy összesítést használ az elsődleges kulcs egyediségének biztosítására:
CREATE MATERIALIZED VIEW report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
Az elvárások hordozhatóvá és újrafelhasználhatóvá tétele
Az adatminőségi szabályokat a folyamat implementációitól elkülönítve tarthatja fenn.
A Databricks azt javasolja, hogy a szabályokat egy Delta-táblában tárolja, minden szabályt címkével kategorizálva. Ezt a címkét az adathalmaz-definíciókban használja annak meghatározásához, hogy mely szabályokat kell alkalmazni.
Az alábbi példa létrehoz egy táblát a rules
szabályok fenntartásához:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
Az alábbi Python-példa a táblában tárolt szabályok alapján határozza meg az adatminőséggel rules
kapcsolatos elvárásokat. A get_rules()
függvény beolvassa a szabályokat a rules
táblából, és visszaad egy Python-szótárt, amely a függvénynek átadott argumentumnak megfelelő tag
szabályokat tartalmazza. A szótár a dekorátorokban @dlt.expect_all_*()
az adatminőségi korlátozások kikényszerítéséhez lesz alkalmazva. Ha például a szabályok nem hajthatóak be, a rendszer elveti a táblából a raw_farmers_market
következő validity
rekordokat:
Feljegyzés
Ez a példa a Databricks-adathalmazokban szereplő mintaadatokat olvassa be. Mivel a Databricks-adatkészletek nem támogatottak a Unity Catalogban közzétett folyamatokkal, ez a példa csak a Hive metaadattárban való közzétételre konfigurált folyamattal működik. Ez a minta azonban a Unity Catalog által engedélyezett folyamatokkal is működik, de külső helyekről kell adatokat olvasnia. Ha többet szeretne tudni a Unity Catalog és a Delta Live Tables használatáról, olvassa el a Unity Catalog használata a Delta Live Tables-folyamatokkal című témakört.
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
A szabályok fenntartásához elnevezett rules
tábla létrehozása helyett létrehozhat egy Python-modult a fő szabályokhoz, például egy olyan fájlban, amely ugyanabban a mappában található rules_module.py
, mint a jegyzetfüzet:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
Ezután módosítsa az előző jegyzetfüzetet úgy, hogy importálja a modult, és a get_rules()
függvényt úgy módosítja, hogy a táblázat helyett a modulból olvasson rules
:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)