Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Dalam arsitektur medali, Anda harus memberlakukan kualitas data di setiap tahap. Kualitas data yang buruk dapat menyebabkan wawasan yang salah dan inefisiensi operasional.
Artikel ini menjelaskan cara menerapkan pemeriksaan kualitas data dalam tampilan danau terwujud (MLV) di Microsoft Fabric.
Menerapkan kualitas data
Dalam tampilan danau terwujud (MLV) di Microsoft Fabric, Anda mempertahankan kualitas data dengan menetapkan batasan pada definisi tampilan Anda. Tanpa pemeriksaan eksplisit, masalah data kecil dapat meningkatkan waktu pemrosesan atau gagal dalam alur.
Saat terjadi pelanggaran batasan pada suatu baris, Anda dapat menggunakan salah satu tindakan berikut:
FAIL: Menghentikan pembaruan MLV pada pelanggaran kendala pertama. Ini adalah perilaku default, bahkan ketika Anda tidak menentukan
FAIL.DROP: Melanjutkan pemrosesan dan menghapus rekaman yang melanggar batasan. Tampilan silsilah memperlihatkan jumlah rekaman yang dihilangkan.
Nota
Jika Anda menentukan tindakan DROP dan FAIL dalam MLV, tindakan FAIL lebih diutamakan.
Menentukan pemeriksaan kualitas data dalam tampilan lake materialisasi
Saat membuat tampilan danau materialisasi, Anda dapat menentukan batasan — aturan kualitas data yang memvalidasi setiap baris saat pembaruan. Batasan adalah ekspresi Boolean yang harus dipenuhi setiap baris. Baris yang memenuhi syarat akan ditulis ke dalam tabel keluaran. Baris yang gagal ditangani sesuai dengan pengaturan saat pelanggaran: baris tersebut dihapus tanpa pemberitahuan atau menyebabkan seluruh refresh gagal.
Contoh berikut menentukan batasan cust_blank, yang memeriksa apakah customerName bidang tidak null. Batasan mengecualikan baris dengan null customerName dari pemrosesan.
CREATE OR REPLACE MATERIALIZED LAKE VIEW IF NOT EXISTS silver.customers_enriched
(CONSTRAINT cust_blank CHECK (customerName is not null) on MISMATCH DROP)
AS
SELECT
c.customerID,
c.customerName,
c.contact,
CASE
WHEN COUNT(o.orderID) OVER (PARTITION BY c.customerID) > 0 THEN TRUE
ELSE FALSE
END AS has_orders
FROM bronze.customers c LEFT JOIN bronze.orders o
ON c.customerID = o.customerID;
Fungsi Bawaan Sistem
Fungsi Spark/SQL bawaan seperti UPPER(), LOWER(), TRIM(), COALESCE(), INITCAP(), dan DATE_FORMAT() didukung sepenuhnya dalam semua konteks MLV untuk refresh CREATE dan LINEAGE.
CREATE MATERIALIZED LAKE VIEW sample_lakehouse.silver.names (
CONSTRAINT substring_check
CHECK (SUBSTRING(name, 1, 2) = 'Al') ON MISMATCH drop
) AS
SELECT id, name
FROM (VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Ann')) AS t(id, name)
Nota
Fungsi sistem adalah opsi yang paling sederhana dan paling dapat diandalkan. Mereka tidak memerlukan pendaftaran, dan berfungsi dalam semua konteks serta didukung sepenuhnya selama proses refresh LINEAGE.
UDF – Didefinisikan dan Terdaftar di Notebook yang Sama
UDF yang terdaftar di spark.udf.register() di notebook yang sama didukung untuk CREATE di semua konteks. Untuk refresh LINEAGE, hanya konteks PySpark yang didukung karena definisi UDF berjalan sebagai bagian dari eksekusi notebook terjadwal.
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr
spark = SparkSession.builder.getOrCreate()
# UDF: Extract domain from email
def extract_email_domain(email):
if email is None or '@' not in email:
return None
return email.split('@')[1]
# Registration
spark.udf.register(
"udf_email_domain",
extract_email_domain,
StringType()
)
@fmlv.materialized_lake_view(
name="udf_testing_silver.mlv_high_value_customers",
comment="High-value customers identified by UDF criteria",
table_properties={"delta.enableChangeDataFeed": "true"}
)
def mlv_high_value_customers():
return spark.sql("""
SELECT
c.customer_id,
c.name,
c.email,
udf_email_domain(c.email) as email_domain,
c.segment,
c.lifetime_value,
total_transactions.total_amount,
total_transactions.txn_count
FROM udf_testing_bronze.customers c
INNER JOIN (
SELECT
customer_id,
SUM(amount) as total_amount,
COUNT(*) as txn_count
FROM udf_testing_bronze.transactions
WHERE udf_is_positive(amount)
GROUP BY customer_id
HAVING SUM(amount) > 1000
) total_transactions ON c.customer_id = total_transactions.customer_id
WHERE udf_validate_customer(c.email, c.age)
AND c.segment IN ('premium', 'vip')
""")
print("✓ Created mlv_high_value_customers")
Perpustakaan pihak ketiga - Pandas UDFs
Pustaka pihak ketiga seperti Pandas UDF memungkinkan aturan kualitas data diterapkan dengan pemrosesan vektor. Mereka mengaktifkan validasi lanjutan seperti logika bisnis kustom, pemeriksaan statistik, atau deteksi pola yang tidak dimungkinkan dengan fungsi bawaan. Ini membantu membangun batasan kualitas data yang dapat diskalakan dan dapat digunakan kembali selama pembuatan dan refresh MLV.
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
import pandas as pd
from pyspark.sql.types import BooleanType
def pandas_check_impl(val):
# Reject if value < median of [100, 200, 300]
return val >= pd.Series([100, 200, 300]).median()
spark.udf.register("pandas_check", pandas_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_pandas",
comment="PySpark MLV INNER JOIN using pandas-based constraint and DROP violations"
)
@fmlv.check(
name="dq_pandas_check",
condition="pandas_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_pandas():
# Define the function
# Register the function as a Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_pandas")
# All amounts should be >= median (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with pandas-based DQ DROP passed")
Pustaka Kustom – Roda Python (.whl)
Fungsi yang dikemas sebagai file jar atau file roda dapat diinstal pada kluster Fabric (melalui pengaturan Lingkungan) dan digunakan dalam definisi MLV. CREATE dan LINEAGE REFRESH didukung untuk konteks PySpark. Lihat Mengelola pustaka kustom di lingkungan Fabric untuk detail selengkapnya.
%%pyspark
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
from pyspark.sql.types import BooleanType
from custom_dq_lib import threshold_check
def custom_check_impl(val):
return threshold_check(val, threshold=200)
spark.udf.register("custom_check", custom_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_custom_whl",
comment="PySpark MLV INNER JOIN using custom DQ library and DROP violations",
replace=True
)
@fmlv.check(
name="dq_custom_check",
condition="custom_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_custom():
# Wrap the custom function as Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_custom_whl")
# All amounts should be >= threshold (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with custom DQ library passed")
Fungsi Data Pengguna Fabric
Fabric User Data Functions (UDF) ditentukan secara terpusat dan dikelola di ruang kerja Fabric. Mereka tersedia untuk notebook atau alur apa pun tanpa perlu didaftarkan ulang per sesi, membuatnya ideal untuk alur MLV produksi. Fitur ini hanya didukung dalam konteks PySpark untuk refresh CREATE dan LINEAGE. Pelajari selengkapnya tentang Fungsi Data Pengguna di sini. Untuk informasi selengkapnya, lihat Ringkasan Fungsi Data Pengguna Fabric.
%%pyspark
import fmlv
from pyspark.sql import functions as F
from notebookutils import udf
myFuncs = udf.getFunctions("UserDataFunction_1")
def add_greeting_column(df):
pdf = df.toPandas()
pdf["greeting"] = pdf["name"].apply(lambda n: myFuncs.hello_fabric(n))
return spark.createDataFrame(pdf)
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
# -------------------------------------------------------------
# Define base PySpark MLV (no SQL)
# -------------------------------------------------------------
@fmlv.materialized_lake_view(
name="silver.base_pysparkmlv",
comment="Base MLV created using PySpark",
replace=True
)
def base_pysparkmlv():
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("country", StringType(), True)
])
data = [
(1, "Alice", 100.0, "US"),
(2, "Bob", 200.0, "UK"),
(3, "Charlie", 300.0, "UK")
]
return spark.createDataFrame(data, schema)
@fmlv.materialized_lake_view(
name="silver.mlv_udfn_null_test",
replace=False
)
@fmlv.check(
name="null_check",
condition="greeting IS NOT NULL",
action="FAIL"
)
def mlv_udfn_null_test():
df = spark.table("silver.base_pysparkmlv")
return add_greeting_column(df)