Mengelola kualitas data dengan Tabel Langsung Delta
Anda menggunakan harapan untuk menentukan batasan kualitas pada konten himpunan data. Ekspektasi memungkinkan Anda menjamin data yang tiba dalam tabel memenuhi persyaratan kualitas data dan memberikan wawasan tentang kualitas data untuk setiap pembaruan alur. Anda menerapkan harapan untuk kueri menggunakan dekorator Python atau klausul batasan SQL.
Apa itu ekspektasi Tabel Langsung Delta?
Ekspektasi adalah klausa opsional yang Anda tambahkan ke deklarasi himpunan data Delta Live Tables yang menerapkan pemeriksaan kualitas data pada setiap rekaman yang melewati kueri.
Harapan terdiri dari tiga hal:
- Deskripsi, yang bertindak sebagai pengidentifikasi unik dan memungkinkan Anda melacak metrik untuk batasan.
- Pernyataan boolean yang selalu mengembalikan true atau false berdasarkan beberapa kondisi yang dinyatakan.
- Tindakan yang harus diambil ketika rekaman gagal harapan, yang berarti boolean mengembalikan false.
Matriks berikut ini memperlihatkan tiga tindakan yang bisa Anda terapkan ke rekaman yang tidak valid:
Perbuatan | Hasil |
---|---|
peringatkan (default) | Rekaman yang tidak valid ditulis ke target; kegagalan dilaporkan sebagai metrik untuk himpunan data. |
Drop | Rekaman yang tidak valid dihilangkan sebelum data ditulis ke target; kegagalan dilaporkan sebagai metrik untuk himpunan data. |
Gagal | Rekaman yang tidak valid mencegah pembaruan berhasil. Intervensi manual diperlukan sebelum pemrosesan ulang. |
Anda dapat melihat metrik kualitas data seperti jumlah rekaman yang melanggar harapan dengan meminta log peristiwa Delta Live Tables. Lihat Memantau alur Tabel Langsung Delta.
Untuk referensi lengkap sintaks deklarasi himpunan data Delta Live Tables, lihat Referensi bahasa Delta Live Tables Python atau referensi bahasa Delta Live Tables SQL.
Catatan
Meskipun Anda dapat menyertakan beberapa klausa dalam harapan apa pun, hanya Python yang mendukung penentuan tindakan berdasarkan beberapa ekspektasi. Lihat Beberapa ekspektasi.
Menyimpan rekaman yang tidak valid
Gunakan expect
operator saat Anda ingin menyimpan rekaman yang melanggar harapan. Catatan yang melanggar harapan ditambahkan ke himpunan data target bersama dengan catatan yang valid:
Python
@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Hilangkan rekaman tidak valid
expect or drop
Gunakan operator untuk mencegah pemrosesan rekaman yang tidak valid lebih lanjut. Rekaman yang melanggar harapan dikeluarkan dari himpunan data target:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Gagal dalam rekaman tidak valid
Ketika rekaman yang tidak valid tidak dapat diterima, gunakan expect or fail
operator untuk segera menghentikan eksekusi ketika rekaman gagal validasi. Jika operasi adalah pembaruan tabel, sistem secara atomik memutar kembali transaksi:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Ketika alur gagal karena pelanggaran harapan, Anda harus memperbaiki kode alur untuk menangani data yang tidak valid dengan benar sebelum menjalankan kembali alur.
Gagalkan harapan mengubah rencana kueri Spark dari transformasi Anda untuk melacak informasi yang diperlukan untuk mendeteksi dan melaporkan pelanggaran. Untuk banyak kueri, Anda dapat menggunakan informasi ini untuk mengidentifikasi rekaman input mana yang mengakibatkan pelanggaran. Berikut ini adalah contoh pengecualian:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
Beberapa harapan
Anda dapat menentukan ekspektasi dengan satu atau beberapa batasan kualitas data dalam alur Python. Dekorator ini menerima kamus Python sebagai argumen, yang mana kuncinya adalah nama harapan dan nilainya adalah kendala harapan.
Gunakan expect_all
untuk menentukan beberapa batasan kualitas data saat rekaman yang gagal validasi harus disertakan dalam himpunan data target:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Gunakan expect_all_or_drop
untuk menentukan beberapa batasan kualitas data saat rekaman yang gagal validasi harus dihilangkan dari himpunan data target:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Gunakan expect_all_or_fail
untuk menentukan beberapa batasan kualitas data saat rekaman yang gagal validasi harus menghentikan eksekusi alur:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Anda juga dapat menentukan kumpulan harapan sebagai variabel dan meneruskannya ke satu atau beberapa kueri dalam alur Anda:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
Mengkarantina data yang tidak valid
Contoh berikut menggunakan ekspektasi dalam kombinasi dengan tabel dan tampilan sementara. Pola ini memberi Anda metrik untuk rekaman yang melewati pemeriksaan ekspektasi selama pembaruan alur, dan menyediakan cara untuk memproses rekaman yang valid dan tidak valid melalui jalur hilir yang berbeda.
Catatan
Contoh ini membaca data sampel yang disertakan dalam himpunan data Databricks. Karena himpunan data Databricks tidak didukung dengan alur yang diterbitkan ke Unity Catalog, contoh ini hanya berfungsi dengan alur yang dikonfigurasi untuk diterbitkan ke metastore Apache Hive. Namun, pola ini juga berfungsi dengan alur yang diaktifkan Unity Catalog, tetapi Anda harus membaca data dari lokasi eksternal. Untuk mempelajari selengkapnya tentang menggunakan Katalog Unity dengan Delta Live Tables, lihat Menggunakan Katalog Unity dengan alur Delta Live Tables Anda.
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
dlt.read("raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=true")
)
Validasi jumlah baris di seluruh tabel
Anda dapat menambahkan tabel tambahan ke alur Anda yang menentukan harapan untuk membandingkan jumlah baris di antara dua tabel langsung. Hasil dari ekspektasi ini muncul di log peristiwa dan UI Delta Live Tables. Contoh berikut memvalidasi jumlah baris yang sama antara tbla
tabel dan tblb
:
CREATE OR REFRESH LIVE TABLE count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
Lakukan validasi tingkat lanjut dengan harapan Tabel Langsung Delta
Anda dapat menentukan tabel langsung menggunakan kueri agregat dan gabungan dan menggunakan hasil kueri tersebut sebagai bagian dari pemeriksaan harapan Anda. Ini berguna jika Anda ingin melakukan pemeriksaan kualitas data yang kompleks, misalnya, memastikan tabel turunan berisi semua rekaman dari tabel sumber atau menjamin kesetaraan kolom numerik di seluruh tabel. Anda dapat menggunakan TEMPORARY
kata kunci untuk mencegah tabel ini diterbitkan ke skema target.
Contoh berikut memvalidasi bahwa semua rekaman yang diharapkan ada dalam report
tabel:
CREATE TEMPORARY LIVE TABLE report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
Contoh berikut menggunakan agregat untuk memastikan keunikan kunci primer:
CREATE TEMPORARY LIVE TABLE report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
Menjadikan ekspektasi portabel dan dapat digunakan kembali
Anda dapat mempertahankan aturan kualitas data secara terpisah dari implementasi alur Anda.
Databricks merekomendasikan untuk menyimpan aturan dalam tabel Delta dengan setiap aturan dikategorikan oleh tag. Anda menggunakan tag ini dalam definisi himpunan data untuk menentukan aturan mana yang akan diterapkan.
Contoh berikut membuat tabel bernama rules
untuk mempertahankan aturan:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
Contoh Python berikut menentukan ekspektasi kualitas data berdasarkan aturan yang disimpan dalam rules
tabel. Fungsi get_rules()
membaca aturan dari rules
tabel dan mengembalikan kamus Python yang berisi aturan yang cocok dengan argumen yang tag
diteruskan ke fungsi. Kamus diterapkan di dekorator @dlt.expect_all_*()
untuk menerapkan batasan kualitas data. Misalnya, catatan apa pun yang melanggar aturan yang ditandai dengan validity
akan dibuang dari tabel raw_farmers_market
:
Catatan
Contoh ini membaca data sampel yang disertakan dalam himpunan data Databricks. Karena himpunan data Databricks tidak didukung dengan alur yang diterbitkan ke Unity Catalog, contoh ini hanya berfungsi dengan alur yang dikonfigurasi untuk diterbitkan ke metastore Apache Hive. Namun, pola ini juga berfungsi dengan alur yang diaktifkan Unity Catalog, tetapi Anda harus membaca data dari lokasi eksternal. Untuk mempelajari selengkapnya tentang menggunakan Katalog Unity dengan Delta Live Tables, lihat Menggunakan Katalog Unity dengan alur Delta Live Tables Anda.
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
Alih-alih membuat tabel bernama rules
untuk mempertahankan aturan, Anda dapat membuat modul Python ke aturan utama, misalnya, dalam file bernama rules_module.py
di folder yang sama dengan buku catatan:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
Kemudian ubah notebook sebelumnya dengan mengimpor modul dan mengubah get_rules()
fungsi untuk membaca dari modul alih-alih dari rules
tabel:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)