Mengelola kualitas data dengan Tabel Langsung Delta

Anda menggunakan harapan untuk menentukan batasan kualitas pada konten himpunan data. Ekspektasi memungkinkan Anda menjamin data yang tiba dalam tabel memenuhi persyaratan kualitas data dan memberikan wawasan tentang kualitas data untuk setiap pembaruan alur. Anda menerapkan harapan untuk kueri menggunakan dekorator Python atau klausul batasan SQL.

Apa itu ekspektasi Tabel Langsung Delta?

Ekspektasi adalah klausa opsional yang Anda tambahkan ke deklarasi himpunan data Delta Live Tables yang menerapkan pemeriksaan kualitas data pada setiap rekaman yang melewati kueri.

Harapan terdiri dari tiga hal:

  • Deskripsi, yang bertindak sebagai pengidentifikasi unik dan memungkinkan Anda melacak metrik untuk batasan.
  • Pernyataan boolean yang selalu mengembalikan true atau false berdasarkan beberapa kondisi yang dinyatakan.
  • Tindakan yang harus diambil ketika rekaman gagal harapan, yang berarti boolean mengembalikan false.

Matriks berikut ini memperlihatkan tiga tindakan yang bisa Anda terapkan ke rekaman yang tidak valid:

Perbuatan Hasil
peringatkan (default) Rekaman yang tidak valid ditulis ke target; kegagalan dilaporkan sebagai metrik untuk himpunan data.
Drop Rekaman yang tidak valid dihilangkan sebelum data ditulis ke target; kegagalan dilaporkan sebagai metrik untuk himpunan data.
Gagal Rekaman yang tidak valid mencegah pembaruan berhasil. Intervensi manual diperlukan sebelum pemrosesan ulang.

Anda dapat melihat metrik kualitas data seperti jumlah rekaman yang melanggar harapan dengan meminta log peristiwa Delta Live Tables. Lihat Memantau alur Tabel Langsung Delta.

Untuk referensi lengkap sintaks deklarasi himpunan data Delta Live Tables, lihat Referensi bahasa Delta Live Tables Python atau referensi bahasa Delta Live Tables SQL.

Catatan

Meskipun Anda dapat menyertakan beberapa klausa dalam harapan apa pun, hanya Python yang mendukung penentuan tindakan berdasarkan beberapa ekspektasi. Lihat Beberapa ekspektasi.

Menyimpan rekaman yang tidak valid

Gunakan expect operator saat Anda ingin menyimpan rekaman yang melanggar harapan. Catatan yang melanggar harapan ditambahkan ke himpunan data target bersama dengan catatan yang valid:

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Hilangkan rekaman tidak valid

expect or drop Gunakan operator untuk mencegah pemrosesan rekaman yang tidak valid lebih lanjut. Rekaman yang melanggar harapan dikeluarkan dari himpunan data target:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Gagal dalam rekaman tidak valid

Ketika rekaman yang tidak valid tidak dapat diterima, gunakan expect or fail operator untuk segera menghentikan eksekusi ketika rekaman gagal validasi. Jika operasi adalah pembaruan tabel, sistem secara atomik memutar kembali transaksi:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Ketika alur gagal karena pelanggaran harapan, Anda harus memperbaiki kode alur untuk menangani data yang tidak valid dengan benar sebelum menjalankan kembali alur.

Gagalkan harapan mengubah rencana kueri Spark dari transformasi Anda untuk melacak informasi yang diperlukan untuk mendeteksi dan melaporkan pelanggaran. Untuk banyak kueri, Anda dapat menggunakan informasi ini untuk mengidentifikasi rekaman input mana yang mengakibatkan pelanggaran. Berikut ini adalah contoh pengecualian:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Beberapa harapan

Anda dapat menentukan ekspektasi dengan satu atau beberapa batasan kualitas data dalam alur Python. Dekorator ini menerima kamus Python sebagai argumen, yang mana kuncinya adalah nama harapan dan nilainya adalah kendala harapan.

Gunakan expect_all untuk menentukan beberapa batasan kualitas data saat rekaman yang gagal validasi harus disertakan dalam himpunan data target:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Gunakan expect_all_or_drop untuk menentukan beberapa batasan kualitas data saat rekaman yang gagal validasi harus dihilangkan dari himpunan data target:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Gunakan expect_all_or_fail untuk menentukan beberapa batasan kualitas data saat rekaman yang gagal validasi harus menghentikan eksekusi alur:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Anda juga dapat menentukan kumpulan harapan sebagai variabel dan meneruskannya ke satu atau beberapa kueri dalam alur Anda:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Mengkarantina data yang tidak valid

Contoh berikut menggunakan ekspektasi dalam kombinasi dengan tabel dan tampilan sementara. Pola ini memberi Anda metrik untuk rekaman yang melewati pemeriksaan ekspektasi selama pembaruan alur, dan menyediakan cara untuk memproses rekaman yang valid dan tidak valid melalui jalur hilir yang berbeda.

Catatan

Contoh ini membaca data sampel yang disertakan dalam himpunan data Databricks. Karena himpunan data Databricks tidak didukung dengan alur yang diterbitkan ke Unity Catalog, contoh ini hanya berfungsi dengan alur yang dikonfigurasi untuk diterbitkan ke metastore Apache Hive. Namun, pola ini juga berfungsi dengan alur yang diaktifkan Unity Catalog, tetapi Anda harus membaca data dari lokasi eksternal. Untuk mempelajari selengkapnya tentang menggunakan Katalog Unity dengan Delta Live Tables, lihat Menggunakan Katalog Unity dengan alur Delta Live Tables Anda.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Validasi jumlah baris di seluruh tabel

Anda dapat menambahkan tabel tambahan ke alur Anda yang menentukan harapan untuk membandingkan jumlah baris di antara dua tabel langsung. Hasil dari ekspektasi ini muncul di log peristiwa dan UI Delta Live Tables. Contoh berikut memvalidasi jumlah baris yang sama antara tbla tabel dan tblb :

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Lakukan validasi tingkat lanjut dengan harapan Tabel Langsung Delta

Anda dapat menentukan tabel langsung menggunakan kueri agregat dan gabungan dan menggunakan hasil kueri tersebut sebagai bagian dari pemeriksaan harapan Anda. Ini berguna jika Anda ingin melakukan pemeriksaan kualitas data yang kompleks, misalnya, memastikan tabel turunan berisi semua rekaman dari tabel sumber atau menjamin kesetaraan kolom numerik di seluruh tabel. Anda dapat menggunakan TEMPORARY kata kunci untuk mencegah tabel ini diterbitkan ke skema target.

Contoh berikut memvalidasi bahwa semua rekaman yang diharapkan ada dalam report tabel:

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

Contoh berikut menggunakan agregat untuk memastikan keunikan kunci primer:

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Menjadikan ekspektasi portabel dan dapat digunakan kembali

Anda dapat mempertahankan aturan kualitas data secara terpisah dari implementasi alur Anda.

Databricks merekomendasikan untuk menyimpan aturan dalam tabel Delta dengan setiap aturan dikategorikan oleh tag. Anda menggunakan tag ini dalam definisi himpunan data untuk menentukan aturan mana yang akan diterapkan.

Contoh berikut membuat tabel bernama rules untuk mempertahankan aturan:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

Contoh Python berikut menentukan ekspektasi kualitas data berdasarkan aturan yang disimpan dalam rules tabel. Fungsi get_rules() membaca aturan dari rules tabel dan mengembalikan kamus Python yang berisi aturan yang cocok dengan argumen yang tag diteruskan ke fungsi. Kamus diterapkan di dekorator @dlt.expect_all_*() untuk menerapkan batasan kualitas data. Misalnya, catatan apa pun yang melanggar aturan yang ditandai dengan validity akan dibuang dari tabel raw_farmers_market:

Catatan

Contoh ini membaca data sampel yang disertakan dalam himpunan data Databricks. Karena himpunan data Databricks tidak didukung dengan alur yang diterbitkan ke Unity Catalog, contoh ini hanya berfungsi dengan alur yang dikonfigurasi untuk diterbitkan ke metastore Apache Hive. Namun, pola ini juga berfungsi dengan alur yang diaktifkan Unity Catalog, tetapi Anda harus membaca data dari lokasi eksternal. Untuk mempelajari selengkapnya tentang menggunakan Katalog Unity dengan Delta Live Tables, lihat Menggunakan Katalog Unity dengan alur Delta Live Tables Anda.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Alih-alih membuat tabel bernama rules untuk mempertahankan aturan, Anda dapat membuat modul Python ke aturan utama, misalnya, dalam file bernama rules_module.py di folder yang sama dengan buku catatan:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Kemudian ubah notebook sebelumnya dengan mengimpor modul dan mengubah get_rules() fungsi untuk membaca dari modul alih-alih dari rules tabel:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )