Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
In medaillonarchitecturen moet u in elke fase de gegevenskwaliteit waarborgen. Slechte gegevenskwaliteit kan leiden tot onjuiste inzichten en operationele inefficiënties.
In dit artikel wordt uitgelegd hoe u controles van gegevenskwaliteit implementeert in gerealiseerde LAKE-weergaven (MLV's) in Microsoft Fabric.
Gegevenskwaliteit implementeren
In gematerialiseerde lake-weergaven (MLV's) in Microsoft Fabric kun je de gegevenskwaliteit behouden door voorwaarden voor je weergaven te definiëren. Zonder expliciete controles kunnen kleine gegevensproblemen de verwerkingstijd verhogen of de pijplijn mislukken.
Wanneer een rij een beperking schendt, kunt u een van deze acties gebruiken:
FAIL: Hiermee wordt MLV-vernieuwing gestopt bij de eerste schending van de restrictie. Dit is het standaardgedrag, zelfs wanneer u niet opgeeft
FAIL.Waarschuwing
Het maken of vernieuwen van een MLV met een
FAILactie kan resulteren in een fout 'Delta-tabel niet gevonden'. Als dit gebeurt, maak de MLV opnieuw aan en vermijd deFAILhandeling.DROP: Hiermee wordt de verwerking voortgezet en worden records verwijderd die de beperking schenden. In de afstammingsweergave ziet u het aantal verwijderde records.
Opmerking
Als u zowel DROP- als FAIL-acties in een MLV definieert, heeft de FAIL-actie voorrang.
Gegevenskwaliteitscontroles definiëren in een gerealiseerde lakeweergave
DQ MLV (Data Quality MLV): een MLV die een of meer beperkingen voor gegevenskwaliteit bevat die zijn opgegeven in een WITH DATA QUALITY-component. Elke beperking definieert een Boole-expressie en een actie die moet worden uitgevoerd wanneer een rij deze schendt. Geslaagde rijen worden naar de uitvoertabel geschreven; mislukte rijen worden stilletjes verwijderd of zorgen ervoor dat de gehele verversing wordt afgebroken, afhankelijk van de instelling bij overtreding.
In het volgende voorbeeld wordt de beperking cust_blankgedefinieerd, waarmee wordt gecontroleerd of het customerName veld niet null is. De beperking sluit rijen met een null customerName uit van verwerking.
De tabel bevat een overzicht van ondersteunde bewerkingen. '—' betekent dat de bewerking niet wordt ondersteund. 'N/B' betekent dat het functietype niet van toepassing is op die context.
| Scenario | Beschrijving | SpSQL Non-DQ maken | SpSQL Niet-DQ-lijn | PySpark Non-DQ aanmaken | PySpark Niet-DQ Afstamming | SpSQL-DQ maken | SpSQL DQ LIJN | PySpark DQ Maken | PySpark DQ-HERKOMST |
|---|---|---|---|---|---|---|---|---|---|
| Systeemfuncties | Ingebouwd: UPPER, LOWER, COALESCE, enz. | Ja | Ja | Ja | Ja | Ja | Ja | Ja | Ja |
| UDF - Hetzelfde NB | spark.udf.register() in hetzelfde notebook | Ja | — | Ja | Ja | Ja | — | Ja | Ja |
| UDF - Diff NB | UDF gedefinieerd in afzonderlijke NB, geregistreerd in hetzelfde NB | Ja | — | Ja | Ja | Ja | — | Ja | Ja |
| Bibliotheken van derden | Gevectoriseerde UDF met pandas-framework | Ja | — | Ja | Ja | Ja | — | Ja | Ja |
| Op maat gemaakt wiel | Functie verpakt in Python wheel-bestand | Ja | — | Ja | Ja | Ja | — | Ja | Ja |
| Aangepaste jar | Functie gecompileerd in Java/Scala JAR | Ja | — | Ja | Ja | Ja | — | Ja | Ja |
| PyPI-bibliotheek | Functie van openbaar PyPI-pakket | Ja | N/A | Ja | Ja | Ja | — | Ja | Ja |
| Fabric UDF | Functie gedefinieerd in Fabric User Data Functions | N/A | N/A | Ja | Ja | N/A | N/A | Ja | Ja |
Systeem-ingebouwde functies
Ingebouwde Spark-/SQL-functies zoals UPPER(), LOWER(), TRIM(), COALESCE(), INITCAP() en DATE_FORMAT() worden volledig ondersteund in alle MLV-contexten voor zowel CREATE- als LINEAGE-vernieuwing.
CREATE MATERIALIZED LAKE VIEW sample_lakehouse.silver.names (
CONSTRAINT substring_check
CHECK (SUBSTRING(name, 1, 2) = 'Al')
ON MISMATCH drop
) AS
SELECT id, name
FROM (VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Ann')) AS t(id, name)
Opmerking
Systeemfuncties zijn de eenvoudigste en meest betrouwbare optie. Ze vereisen geen registratie, werken in alle contexten en worden volledig ondersteund tijdens de verversing van de LINEAGE.
UDF's : gedefinieerd en geregistreerd in hetzelfde notebook
UDF's die zijn geregistreerd bij spark.udf.register() in hetzelfde notebook, worden ondersteund voor CREATE in alle contexten. Voor LINEAGE-vernieuwing worden alleen PySpark-contexten ondersteund omdat de UDF-definitie wordt uitgevoerd als onderdeel van de geplande uitvoering van het notebook.
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr
spark = SparkSession.builder.getOrCreate()
# UDF: Extract domain from email
def extract_email_domain(email):
if email is None or '@' not in email:
return None
return email.split('@')[1]
# Registration
spark.udf.register(
"udf_email_domain",
extract_email_domain,
StringType()
)
@fmlv.materialized_lake_view(
name="udf_testing_silver.mlv_high_value_customers",
comment="High-value customers identified by UDF criteria",
table_properties={"delta.enableChangeDataFeed": "true"}
)
def mlv_high_value_customers():
return spark.sql("""
SELECT
c.customer_id,
c.name,
c.email,
udf_email_domain(c.email) as email_domain,
c.segment,
c.lifetime_value,
total_transactions.total_amount,
total_transactions.txn_count
FROM udf_testing_bronze.customers c
INNER JOIN (
SELECT
customer_id,
SUM(amount) as total_amount,
COUNT(*) as txn_count
FROM udf_testing_bronze.transactions
WHERE udf_is_positive(amount)
GROUP BY customer_id
HAVING SUM(amount) > 1000
) total_transactions ON c.customer_id = total_transactions.customer_id
WHERE udf_validate_customer(c.email, c.age)
AND c.segment IN ('premium', 'vip')
""")
print("✓ Created mlv_high_value_customers")
Bibliotheken van derden: Pandas UDF's
Bibliotheken van derden, zoals Pandas UDF's, maken het mogelijk om regels voor gegevenskwaliteit te implementeren met gevectoriseerde verwerking. Ze maken geavanceerde validaties mogelijk, zoals aangepaste bedrijfslogica, statistische controles of patroondetectie die niet mogelijk zijn met ingebouwde functies. Dit helpt bij het bouwen van schaalbare en herbruikbare beperkingen voor gegevenskwaliteit tijdens het maken en vernieuwen van MLV.
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
import pandas as pd
from pyspark.sql.types import BooleanType
def pandas_check_impl(val):
# Reject if value < median of [100, 200, 300]
return val >= pd.Series([100, 200, 300]).median()
spark.udf.register("pandas_check", pandas_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_pandas",
comment="PySpark MLV INNER JOIN using pandas-based constraint and DROP violations"
)
@fmlv.check(
name="dq_pandas_check",
condition="pandas_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_pandas():
# Define the function
# Register the function as a Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_pandas")
# All amounts should be >= median (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with pandas-based DQ DROP passed")
Aangepaste bibliotheken - Python Wheel (.whl)
Functies die zijn verpakt als JAR-bestanden of wheel-bestanden kunnen worden geïnstalleerd op het Fabric-cluster (via omgevingsinstellingen) en worden gebruikt in MLV-definities. CREATE en lINEAGE REFRESH worden ondersteund voor PySpark-contexten. Zie Aangepaste bibliotheken beheren in Fabric-omgevingen voor meer informatie.
%%pyspark
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
from pyspark.sql.types import BooleanType
from custom_dq_lib import threshold_check
def custom_check_impl(val):
return threshold_check(val, threshold=200)
spark.udf.register("custom_check", custom_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_custom_whl",
comment="PySpark MLV INNER JOIN using custom DQ library and DROP violations",
replace=True
)
@fmlv.check(
name="dq_custom_check",
condition="custom_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_custom():
# Wrap the custom function as Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_custom_whl")
# All amounts should be >= threshold (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with custom DQ library passed")
Fabric-functies voor gebruikersgegevens
UDF's (Fabric User Data Functions) zijn centraal gedefinieerd en beheerd in de werkruimte Fabric. Ze zijn beschikbaar voor elke notebook of pijplijn zonder dat ze opnieuw moeten worden geregistreerd per sessie, waardoor ze ideaal zijn voor productie-MLV-pijplijnen. Deze functie wordt alleen ondersteund in PySpark-contexten voor zowel CREATE- als LINEAGE-vernieuwing. Meer informatie over gebruikersgegevensfuncties vindt u hier. Zie overzicht van Fabric User Data Functions voor meer informatie.
%%pyspark
import fmlv
from pyspark.sql import functions as F
from notebookutils import udf
myFuncs = udf.getFunctions("UserDataFunction_1")
def add_greeting_column(df):
pdf = df.toPandas()
pdf["greeting"] = pdf["name"].apply(lambda n: myFuncs.hello_fabric(n))
return spark.createDataFrame(pdf)
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
# -------------------------------------------------------------
# Define base PySpark MLV (no SQL)
# -------------------------------------------------------------
@fmlv.materialized_lake_view(
name="silver.base_pysparkmlv",
comment="Base MLV created using PySpark",
replace=True
)
def base_pysparkmlv():
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("country", StringType(), True)
])
data = [
(1, "Alice", 100.0, "US"),
(2, "Bob", 200.0, "UK"),
(3, "Charlie", 300.0, "UK")
]
return spark.createDataFrame(data, schema)
@fmlv.materialized_lake_view(
name="silver.mlv_udfn_null_test",
replace=False
)
@fmlv.check(
name="null_check",
condition="greeting IS NOT NULL",
action="FAIL"
)
def mlv_udfn_null_test():
df = spark.table("silver.base_pysparkmlv")
return add_greeting_column(df)