Kualitas data dalam tampilan lake materialisasi

Dalam arsitektur medali, Anda harus memberlakukan kualitas data di setiap tahap. Kualitas data yang buruk dapat menyebabkan wawasan yang salah dan inefisiensi operasional.

Artikel ini menjelaskan cara menerapkan pemeriksaan kualitas data dalam tampilan danau terwujud (MLV) di Microsoft Fabric.

Menerapkan kualitas data

Dalam tampilan danau terwujud (MLV) di Microsoft Fabric, Anda mempertahankan kualitas data dengan menetapkan batasan pada definisi tampilan Anda. Tanpa pemeriksaan eksplisit, masalah data kecil dapat meningkatkan waktu pemrosesan atau gagal dalam alur.

Saat terjadi pelanggaran batasan pada suatu baris, Anda dapat menggunakan salah satu tindakan berikut:

  • FAIL: Menghentikan pembaruan MLV pada pelanggaran kendala pertama. Ini adalah perilaku default, bahkan ketika Anda tidak menentukan FAIL.

  • DROP: Melanjutkan pemrosesan dan menghapus rekaman yang melanggar batasan. Tampilan silsilah memperlihatkan jumlah rekaman yang dihilangkan.

Nota

Jika Anda menentukan tindakan DROP dan FAIL dalam MLV, tindakan FAIL lebih diutamakan.

Menentukan pemeriksaan kualitas data dalam tampilan lake materialisasi

Saat membuat tampilan danau materialisasi, Anda dapat menentukan batasan — aturan kualitas data yang memvalidasi setiap baris saat pembaruan. Batasan adalah ekspresi Boolean yang harus dipenuhi setiap baris. Baris yang memenuhi syarat akan ditulis ke dalam tabel keluaran. Baris yang gagal ditangani sesuai dengan pengaturan saat pelanggaran: baris tersebut dihapus tanpa pemberitahuan atau menyebabkan seluruh refresh gagal.

Contoh berikut menentukan batasan cust_blank, yang memeriksa apakah customerName bidang tidak null. Batasan mengecualikan baris dengan null customerName dari pemrosesan.

CREATE OR REPLACE MATERIALIZED LAKE VIEW IF NOT EXISTS silver.customers_enriched  
(CONSTRAINT cust_blank CHECK (customerName is not null) on MISMATCH DROP)
AS
SELECT
    c.customerID,
    c.customerName,
    c.contact, 
    CASE  
       WHEN COUNT(o.orderID) OVER (PARTITION BY c.customerID) > 0 THEN TRUE  
       ELSE FALSE  
    END AS has_orders 
FROM bronze.customers c LEFT JOIN bronze.orders o 
ON c.customerID = o.customerID;

Fungsi Bawaan Sistem

Fungsi Spark/SQL bawaan seperti UPPER(), LOWER(), TRIM(), COALESCE(), INITCAP(), dan DATE_FORMAT() didukung sepenuhnya dalam semua konteks MLV untuk refresh CREATE dan LINEAGE.

CREATE MATERIALIZED LAKE VIEW sample_lakehouse.silver.names (
CONSTRAINT substring_check
CHECK (SUBSTRING(name, 1, 2) = 'Al') ON MISMATCH drop
) AS
SELECT id, name
FROM (VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Ann')) AS t(id, name)

Nota

Fungsi sistem adalah opsi yang paling sederhana dan paling dapat diandalkan. Mereka tidak memerlukan pendaftaran, dan berfungsi dalam semua konteks serta didukung sepenuhnya selama proses refresh LINEAGE.

UDF – Didefinisikan dan Terdaftar di Notebook yang Sama

UDF yang terdaftar di spark.udf.register() di notebook yang sama didukung untuk CREATE di semua konteks. Untuk refresh LINEAGE, hanya konteks PySpark yang didukung karena definisi UDF berjalan sebagai bagian dari eksekusi notebook terjadwal.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr

spark = SparkSession.builder.getOrCreate()

# UDF: Extract domain from email
def extract_email_domain(email):
    if email is None or '@' not in email:
        return None
    return email.split('@')[1]

# Registration
spark.udf.register(
    "udf_email_domain",
    extract_email_domain,
    StringType()
)

@fmlv.materialized_lake_view(
    name="udf_testing_silver.mlv_high_value_customers",
    comment="High-value customers identified by UDF criteria",
    table_properties={"delta.enableChangeDataFeed": "true"}
)
def mlv_high_value_customers():
    return spark.sql("""
        SELECT 
            c.customer_id,
            c.name,
            c.email,
            udf_email_domain(c.email) as email_domain,
            c.segment,
            c.lifetime_value,
            total_transactions.total_amount,
            total_transactions.txn_count
        FROM udf_testing_bronze.customers c
        INNER JOIN (
            SELECT 
                customer_id,
                SUM(amount) as total_amount,
                COUNT(*) as txn_count
            FROM udf_testing_bronze.transactions
            WHERE udf_is_positive(amount)
            GROUP BY customer_id
            HAVING SUM(amount) > 1000
        ) total_transactions ON c.customer_id = total_transactions.customer_id
        WHERE udf_validate_customer(c.email, c.age)
            AND c.segment IN ('premium', 'vip')
    """)

print("✓ Created mlv_high_value_customers")

Perpustakaan pihak ketiga - Pandas UDFs

Pustaka pihak ketiga seperti Pandas UDF memungkinkan aturan kualitas data diterapkan dengan pemrosesan vektor. Mereka mengaktifkan validasi lanjutan seperti logika bisnis kustom, pemeriksaan statistik, atau deteksi pola yang tidak dimungkinkan dengan fungsi bawaan. Ini membantu membangun batasan kualitas data yang dapat diskalakan dan dapat digunakan kembali selama pembuatan dan refresh MLV.

  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
  from datetime import datetime

  import pandas as pd
  from pyspark.sql.types import BooleanType
  def pandas_check_impl(val):
    # Reject if value < median of [100, 200, 300]
    return val >= pd.Series([100, 200, 300]).median()
  spark.udf.register("pandas_check", pandas_check_impl, BooleanType())

  @fmlv.materialized_lake_view(
    name="silver.pyspark_from_two_sqlmlv_inner_pandas",
    comment="PySpark MLV INNER JOIN using pandas-based constraint and DROP violations"
  )
  @fmlv.check(
    name="dq_pandas_check",
    condition="pandas_check(l3)",
    action="DROP"
  )
  def pyspark_from_two_sqlmlv_inner_pandas():
    # Define the function

    # Register the function as a Spark UDF

    # Read source tables
    df1 = spark.table("silver.base_sqlmlv")
    df2 = spark.table("silver.base_sqlmlv")

    # Rename columns for unique join
    df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
    df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])

    # Perform INNER JOIN
    df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
    return df


  df = spark.table("silver.pyspark_from_two_sqlmlv_inner_pandas")
  # All amounts should be >= median (200)
  assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
  print("PySpark MLV INNER JOIN with pandas-based DQ DROP passed")

Pustaka Kustom – Roda Python (.whl)

Fungsi yang dikemas sebagai file jar atau file roda dapat diinstal pada kluster Fabric (melalui pengaturan Lingkungan) dan digunakan dalam definisi MLV. CREATE dan LINEAGE REFRESH didukung untuk konteks PySpark. Lihat Mengelola pustaka kustom di lingkungan Fabric untuk detail selengkapnya.

  %%pyspark
  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
  from datetime import datetime

  from pyspark.sql.types import BooleanType
  from custom_dq_lib import threshold_check
  def custom_check_impl(val):
    return threshold_check(val, threshold=200)
  spark.udf.register("custom_check", custom_check_impl, BooleanType())

  @fmlv.materialized_lake_view(
    name="silver.pyspark_from_two_sqlmlv_inner_custom_whl",
    comment="PySpark MLV INNER JOIN using custom DQ library and DROP violations",
    replace=True
  )
  @fmlv.check(
    name="dq_custom_check",
    condition="custom_check(l3)",
    action="DROP"
  )
  def pyspark_from_two_sqlmlv_inner_custom():
    # Wrap the custom function as Spark UDF


    # Read source tables
    df1 = spark.table("silver.base_sqlmlv")
    df2 = spark.table("silver.base_sqlmlv")

    # Rename columns for unique join
    df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
    df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])

    # Perform INNER JOIN
    df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
    return df


    df = spark.table("silver.pyspark_from_two_sqlmlv_inner_custom_whl")
    # All amounts should be >= threshold (200)
    assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
    print("PySpark MLV INNER JOIN with custom DQ library passed")

Fungsi Data Pengguna Fabric

Fabric User Data Functions (UDF) ditentukan secara terpusat dan dikelola di ruang kerja Fabric. Mereka tersedia untuk notebook atau alur apa pun tanpa perlu didaftarkan ulang per sesi, membuatnya ideal untuk alur MLV produksi. Fitur ini hanya didukung dalam konteks PySpark untuk refresh CREATE dan LINEAGE. Pelajari selengkapnya tentang Fungsi Data Pengguna di sini. Untuk informasi selengkapnya, lihat Ringkasan Fungsi Data Pengguna Fabric.

  %%pyspark
  import fmlv
  from pyspark.sql import functions as F
  from notebookutils import udf

  myFuncs = udf.getFunctions("UserDataFunction_1")

  def add_greeting_column(df):
    pdf = df.toPandas()
    pdf["greeting"] = pdf["name"].apply(lambda n: myFuncs.hello_fabric(n))
    return spark.createDataFrame(pdf)
  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

  # -------------------------------------------------------------
  # Define base PySpark MLV (no SQL)
  # -------------------------------------------------------------
  @fmlv.materialized_lake_view(
    name="silver.base_pysparkmlv",
    comment="Base MLV created using PySpark",
    replace=True
  )
  def base_pysparkmlv():

    schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("country", StringType(), True)
    ])

    data = [
        (1, "Alice", 100.0, "US"),
        (2, "Bob", 200.0, "UK"),
        (3, "Charlie", 300.0, "UK")
    ]

    return spark.createDataFrame(data, schema)

  @fmlv.materialized_lake_view(
    name="silver.mlv_udfn_null_test",
    replace=False
  )
  @fmlv.check(
    name="null_check",
    condition="greeting IS NOT NULL",
    action="FAIL"
  )
  def mlv_udfn_null_test():
    df = spark.table("silver.base_pysparkmlv")
    return add_greeting_column(df)