Gegevenskwaliteit beheren met Delta Live Tables
U gebruikt verwachtingen om beperkingen voor gegevenskwaliteit te definiëren voor de inhoud van een gegevensset. Met de verwachtingen kunt u garanderen dat gegevens die binnenkomen in tabellen voldoen aan de vereisten voor gegevenskwaliteit en inzicht krijgen in de gegevenskwaliteit voor elke pijplijnupdate. U past verwachtingen toe op query's met behulp van Python-decorators of SQL-beperkingsclausules.
Wat zijn de verwachtingen van Delta Live Tables?
Verwachtingen zijn optionele componenten die u toevoegt aan declaraties van gegevenssets van Delta Live Tables die controles van gegevenskwaliteit toepassen op elke record die via een query wordt doorgegeven.
Een verwachting bestaat uit drie dingen:
- Een beschrijving, die fungeert als een unieke id en waarmee u metrische gegevens voor de beperking kunt bijhouden.
- Een Booleaanse instructie die altijd waar of onwaar retourneert op basis van een bepaalde voorwaarde.
- Een actie die moet worden uitgevoerd wanneer een record de verwachting mislukt, wat betekent dat de Booleaanse waarde onwaar retourneert.
In de volgende matrix ziet u de drie acties die u kunt toepassen op ongeldige records:
Actie | Result |
---|---|
waarschuwen (standaard) | Ongeldige records worden naar het doel geschreven; fout wordt gerapporteerd als een metrische waarde voor de gegevensset. |
druppel | Ongeldige records worden verwijderd voordat gegevens naar het doel worden geschreven; fout wordt gerapporteerd als metrische gegevens voor de gegevensset. |
mislukken | Ongeldige records verhinderen dat de update slaagt. Handmatige interventie is vereist voordat de verwerking opnieuw wordt uitgevoerd. |
U kunt metrische gegevens over de kwaliteit van gegevens bekijken, zoals het aantal records dat een verwachting schendt door een query uit te voeren op het gebeurtenislogboek van Delta Live Tables. Zie Delta Live Tables-pijplijnen bewaken.
Zie voor een volledig overzicht van de declaratiesyntaxis van Delta Live Tables-gegevenssets delta livetabellen python of sql-taalreferentie voor Delta Live Tables.
Notitie
- Hoewel u meerdere componenten in elke verwachting kunt opnemen, ondersteunt alleen Python het definiëren van acties op basis van meerdere verwachtingen. Bekijk meerdere verwachtingen.
- Verwachtingen moeten worden gedefinieerd met behulp van SQL-expressies. U kunt geen niet-SQL-syntaxis (bijvoorbeeld Python-functies) gebruiken bij het definiëren van een verwachting.
Ongeldige records behouden
Gebruik de expect
operator als u records wilt bewaren die in strijd zijn met de verwachting. Records die in strijd zijn met de verwachting, worden toegevoegd aan de doelgegevensset, samen met geldige records:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Ongeldige records verwijderen
Gebruik de expect or drop
operator om verdere verwerking van ongeldige records te voorkomen. Records die in strijd zijn met de verwachting, worden verwijderd uit de doelgegevensset:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Mislukt bij ongeldige records
Wanneer ongeldige records onacceptabel zijn, gebruikt u de operator om de expect or fail
uitvoering onmiddellijk te stoppen wanneer de validatie van een record mislukt. Als de bewerking een tabelupdate is, wordt de transactie door het systeem atomisch teruggedraaid:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Belangrijk
Als u meerdere parallelle stromen hebt gedefinieerd in een pijplijn, leidt het mislukken van één stroom er niet toe dat andere stromen mislukken.
Wanneer een pijplijn mislukt vanwege een schending van de verwachting, moet u de pijplijncode corrigeren om de ongeldige gegevens correct te verwerken voordat u de pijplijn opnieuw uitvoert.
Bij het mislukken van de verwachtingen wordt het Spark-queryplan van uw transformaties gewijzigd om informatie bij te houden die nodig is om schendingen te detecteren en te rapporteren. Voor veel query's kunt u deze informatie gebruiken om te bepalen welke invoerrecord heeft geleid tot de schending. Hier volgt een voorbeeld van een uitzondering:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
Meerdere verwachtingen
U kunt verwachtingen definiëren met een of meer beperkingen voor gegevenskwaliteit in Python-pijplijnen. Deze decorators accepteren een Python-woordenlijst als argument, waarbij de sleutel de naam van de verwachting is en de waarde de verwachtingsbeperking is.
Gebruik expect_all
dit om meerdere beperkingen voor gegevenskwaliteit op te geven wanneer records die niet kunnen worden gevalideerd, moeten worden opgenomen in de doelgegevensset:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Gebruik expect_all_or_drop
dit om meerdere beperkingen voor gegevenskwaliteit op te geven wanneer records die niet kunnen worden gevalideerd, moeten worden verwijderd uit de doelgegevensset:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Gebruik expect_all_or_fail
dit om meerdere beperkingen voor gegevenskwaliteit op te geven wanneer records die mislukken bij validatie, de uitvoering van pijplijnen moeten stoppen:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
U kunt ook een verzameling verwachtingen definiëren als een variabele en deze doorgeven aan een of meer query's in uw pijplijn:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
Ongeldige gegevens in quarantaine plaatsen
In het volgende voorbeeld worden verwachtingen gebruikt in combinatie met tijdelijke tabellen en weergaven. Dit patroon biedt u metrische gegevens voor records die verwachtingen tijdens pijplijnupdates doorgeven en biedt een manier om geldige en ongeldige records via verschillende downstreampaden te verwerken.
Notitie
In dit voorbeeld worden voorbeeldgegevens gelezen die zijn opgenomen in de Databricks-gegevenssets. Omdat de Databricks-gegevenssets niet worden ondersteund met een pijplijn die naar Unity Catalog wordt gepubliceerd, werkt dit voorbeeld alleen met een pijplijn die is geconfigureerd om te publiceren naar de Hive-metastore. Dit patroon werkt echter ook met pijplijnen met Unity Catalog, maar u moet gegevens van externe locaties lezen. Raadpleeg Unity Catalog gebruiken met uw Delta Live Tables-pijplijnen voor meer informatie over het gebruik van Unity Catalog met Delta Live Tables.
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
dlt.read("raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=true")
)
Aantal rijen valideren in verschillende tabellen
U kunt een extra tabel toevoegen aan uw pijplijn die een verwachting definieert voor het vergelijken van het aantal rijen tussen twee gerealiseerde weergaven of streamingtabellen. De resultaten van deze verwachting worden weergegeven in het gebeurtenislogboek en de gebruikersinterface van Delta Live Tables. In het volgende voorbeeld worden gelijke aantal rijen tussen de tbla
en tblb
tabellen gevalideerd:
CREATE OR REFRESH MATERIALIZED VIEW count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
Geavanceerde validatie uitvoeren met de verwachtingen van Delta Live Tables
U kunt gerealiseerde weergaven definiëren met behulp van statistische query's en joinquery's en de resultaten van deze query's gebruiken als onderdeel van uw verwachtingscontrole. Dit is handig als u complexe gegevenskwaliteitscontroles wilt uitvoeren, bijvoorbeeld om ervoor te zorgen dat een afgeleide tabel alle records uit de brontabel bevat of de gelijkheid van een numerieke kolom tussen tabellen garandeert.
In het volgende voorbeeld wordt gecontroleerd of alle verwachte records aanwezig zijn in de report
tabel:
CREATE MATERIALIZED VIEW report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
In het volgende voorbeeld wordt een statistische functie gebruikt om de uniekheid van een primaire sleutel te garanderen:
CREATE MATERIALIZED VIEW report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
Verwachtingen draagbaar en herbruikbaar maken
U kunt regels voor gegevenskwaliteit afzonderlijk van uw pijplijn-implementaties onderhouden.
Databricks raadt aan de regels op te slaan in een Delta-tabel, waarbij elke regel wordt gecategoriseerd door een tag. U gebruikt deze tag in gegevenssetdefinities om te bepalen welke regels moeten worden toegepast.
In het volgende voorbeeld wordt een tabel gemaakt met de naam rules
voor het onderhouden van regels:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
In het volgende Python-voorbeeld worden de verwachtingen voor gegevenskwaliteit gedefinieerd op basis van de regels die zijn opgeslagen in de rules
tabel. De get_rules()
functie leest de regels uit de rules
tabel en retourneert een Python-woordenlijst met regels die overeenkomen met het tag
argument dat aan de functie is doorgegeven. De woordenlijst wordt toegepast in de @dlt.expect_all_*()
decorators om beperkingen voor gegevenskwaliteit af te dwingen. Records die niet voldoen aan de regels die zijn validity
getagd, worden bijvoorbeeld verwijderd uit de raw_farmers_market
tabel:
Notitie
In dit voorbeeld worden voorbeeldgegevens gelezen die zijn opgenomen in de Databricks-gegevenssets. Omdat de Databricks-gegevenssets niet worden ondersteund met een pijplijn die naar Unity Catalog wordt gepubliceerd, werkt dit voorbeeld alleen met een pijplijn die is geconfigureerd om te publiceren naar de Hive-metastore. Dit patroon werkt echter ook met pijplijnen met Unity Catalog, maar u moet gegevens van externe locaties lezen. Raadpleeg Unity Catalog gebruiken met uw Delta Live Tables-pijplijnen voor meer informatie over het gebruik van Unity Catalog met Delta Live Tables.
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
In plaats van een tabel met de naam rules
regels te maken, kunt u een Python-module maken voor de belangrijkste regels, bijvoorbeeld in een bestand met de naam rules_module.py
in dezelfde map als het notebook:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
Wijzig vervolgens het voorgaande notebook door de module te importeren en de get_rules()
functie te wijzigen die uit de module moet worden gelezen in plaats van uit de rules
tabel:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)