Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Gunakan ekspektasi untuk menerapkan batasan kualitas yang memvalidasi data saat mengalir melalui alur ETL. Ekspektasi memberikan wawasan yang lebih besar tentang metrik kualitas data dan memungkinkan Anda untuk gagal memperbarui atau menghilangkan rekaman saat mendeteksi rekaman yang tidak valid.
Artikel ini memiliki gambaran umum ekspektasi, termasuk contoh sintaks dan opsi perilaku. Untuk kasus penggunaan tingkat lanjut dan praktik terbaik yang direkomendasikan, lihat Rekomendasi ekspektasi dan pola tingkat lanjut.
Apa itu ekspektasi?
Ekspektasi adalah klausul opsional dalam tampilan materialisasi jalur, tabel streaming, atau pernyataan pembuatan tampilan yang menerapkan pemeriksaan kualitas data pada setiap rekaman yang melewati kueri. Ekspektasi menggunakan pernyataan SQL Boolean standar untuk menentukan batasan. Anda dapat menggabungkan beberapa harapan untuk satu himpunan data dan menetapkan harapan di semua deklarasi himpunan data dalam pipeline.
Bagian berikut memperkenalkan tiga komponen ekspektasi dan memberikan contoh sintaks.
Nama ekspektasi
Setiap harapan harus memiliki nama, yang digunakan sebagai pengidentifikasi untuk melacak dan memantau harapan. Pilih nama yang mengomunikasikan metrik yang sedang divalidasi. Contoh berikut mendefinisikan harapan untuk mengonfirmasi valid_customer_age bahwa usia antara 0 dan 120 tahun:
Penting
Nama harapan harus unik untuk himpunan data tertentu. Anda dapat menggunakan kembali ekspektasi di beberapa himpunan data dalam pipeline. Lihat Ekspektasi portabel dan dapat digunakan kembali.
Phyton
@dp.table
@dp.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
Batasan untuk mengevaluasi
Klausa batasan adalah pernyataan kondisional SQL yang harus dievaluasi ke true atau false untuk setiap rekaman. Batasan berisi logika aktual untuk apa yang sedang divalidasi. Ketika rekaman gagal dalam kondisi ini, harapan dipicu.
Batasan harus menggunakan sintaks SQL yang valid dan tidak boleh berisi yang berikut ini:
- Fungsi Python kustom
- Panggilan layanan eksternal
- Subkueri yang mereferensikan tabel lain
Berikut ini adalah contoh batasan yang dapat ditambahkan ke pernyataan pembuatan himpunan data:
Phyton
Sintaks untuk batasan dalam Python adalah:
@dp.expect(<constraint-name>, <constraint-clause>)
Beberapa batasan dapat ditentukan:
@dp.expect(<constraint-name>, <constraint-clause>)
@dp.expect(<constraint2-name>, <constraint2-clause>)
Examples:
# Simple constraint
@dp.expect("non_negative_price", "price >= 0")
# SQL functions
@dp.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dp.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dp.expect("non_negative_price", "price >= 0")
@dp.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dp.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dp.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
Sintaks untuk batasan dalam SQL adalah:
CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> )
Beberapa batasan harus dipisahkan oleh koma:
CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> ),
CONSTRAINT <constraint2-name> EXPECT ( <constraint2-clause> )
Examples:
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0),
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
Tindakan pada catatan yang tidak valid
Anda harus menentukan tindakan untuk menentukan apa yang terjadi ketika rekaman gagal dalam pemeriksaan validasi. Tabel berikut ini menjelaskan tindakan yang tersedia:
| Tindakan | Sintaks SQL | Sintaks Python | Result |
|---|---|---|---|
| peringatkan (default) | EXPECT |
dp.expect |
Rekaman yang tidak valid ditulis ke target. |
| menjatuhkan | EXPECT ... ON VIOLATION DROP ROW |
dp.expect_or_drop |
Rekaman yang tidak valid dihilangkan sebelum data ditulis ke target. Jumlah rekaman yang dihilangkan dicatat bersama metrik himpunan data lainnya. |
| gagal | EXPECT ... ON VIOLATION FAIL UPDATE |
dp.expect_or_fail |
Rekaman yang tidak valid mencegah pembaruan berhasil. Intervensi manual diperlukan sebelum pemrosesan ulang. Ekspektasi ini menyebabkan kegagalan satu alur dan tidak menyebabkan alur lain di pipeline Anda gagal. |
Anda juga dapat menerapkan logika tingkat lanjut untuk mengkarantina rekaman yang tidak valid tanpa gagal atau menghilangkan data. Lihat Mengkarantina rekaman yang tidak valid.
Metrik pelacakan harapan
Anda dapat melihat metrik pelacakan untuk tindakan warn atau drop dari UI pipeline. Karena fail menyebabkan pembaruan gagal ketika rekaman yang tidak valid terdeteksi, metrik tidak direkam.
Untuk melihat metrik ekspektasi, selesaikan langkah-langkah berikut:
- Di bilah sisi ruang kerja Azure Databricks Anda, klik
Tugas & Pipeline . - Klik Nama alur Anda.
- Klik himpunan data dengan harapan yang ditentukan.
- Pilih tab Kualitas data di bilah sisi kanan.
Anda dapat melihat metrik kualitas data dengan mengkueri log peristiwa Lakeflow Spark Declarative Pipelines. Lihat Metrik kualitas data kueri atau metrik ekspektasi.
Pertahankan rekaman yang tidak valid
Menyimpan rekaman yang tidak valid adalah perilaku default untuk ekspektasi.
expect Gunakan operator saat Anda ingin menyimpan rekaman yang melanggar ekspektasi tetapi mengumpulkan metrik tentang berapa banyak rekaman yang lolos atau gagal batasan. Rekaman yang melanggar harapan ditambahkan ke himpunan data target bersama dengan rekaman yang valid:
Phyton
@dp.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Menghapus rekaman yang tidak valid
expect_or_drop Gunakan operator untuk mencegah pemrosesan rekaman yang tidak valid lebih lanjut. Rekaman yang melanggar harapan dihilangkan dari himpunan data target:
Phyton
@dp.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Gagal pada entri yang tidak valid
Ketika rekaman yang tidak valid tidak dapat diterima, gunakan expect_or_fail operator untuk segera menghentikan eksekusi ketika rekaman gagal validasi. Jika operasi adalah pembaruan tabel, sistem secara atomik mengembalikan transaksi.
Phyton
@dp.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Penting
Jika Anda memiliki beberapa alur paralel yang ditentukan dalam jalur proses, kegagalan satu alur tidak menyebabkan alur-alur lain gagal.
Pemecahan masalah pembaruan yang gagal sesuai ekspektasi
Ketika alur gagal karena pelanggaran ekspektasi, Anda harus memperbaiki kode alur untuk menangani data yang tidak valid dengan benar sebelum menjalankan kembali alur.
Ekspektasi yang dikonfigurasi untuk membuat pipeline gagal memodifikasi rencana kueri Spark pada transformasi Anda untuk melacak informasi yang diperlukan guna mendeteksi dan melaporkan pelanggaran. Anda dapat menggunakan informasi ini untuk mengidentifikasi rekaman input mana yang menghasilkan pelanggaran pada berbagai kueri. Lakeflow Spark Declarative Pipelines menyediakan pesan kesalahan khusus untuk melaporkan pelanggaran tersebut. Berikut adalah contoh pesan kesalahan pelanggaran ekspektasi:
[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false
Manajemen beberapa ekspektasi
Nota
Meskipun SQL dan Python mendukung beberapa ekspektasi dalam satu himpunan data, hanya Python yang memungkinkan Anda mengelompokkan beberapa harapan dan menentukan tindakan kolektif.
Anda dapat mengelompokkan beberapa ekspektasi bersama-sama dan menentukan tindakan kolektif menggunakan fungsi expect_all, expect_all_or_drop, dan expect_all_or_fail.
Dekorator ini menerima kamus Python sebagai argumen, di mana kuncinya adalah nama ekspektasi dan nilainya adalah kendala ekspektasi. Anda dapat menggunakan kembali serangkaian ekspektasi yang sama dalam beberapa himpunan data di alur Anda. Berikut ini menunjukkan contoh masing-masing expect_all operator Python:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dp.table
@dp.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dp.table
@dp.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dp.table
@dp.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset
Batasan
- Karena hanya tabel streaming, tampilan materialisasi, dan tampilan sementara yang mendukung ekspetasi, metrik kualitas data hanya didukung untuk jenis objek ini.
- Metrik kualitas data tidak tersedia saat:
- Tidak ada ekspektasi yang ditetapkan pada kueri.
- Alur menggunakan operator yang tidak mendukung fungsi yang diharapkan.
- Jenis alur, seperti sink, tidak mendukung ekspektasi.
- Tidak ada pembaruan untuk tabel streaming terkait atau tampilan materialisasi untuk proses alur kerja tertentu.
- Konfigurasi alur tidak menyertakan pengaturan yang diperlukan untuk menangkap metrik, seperti
pipelines.metrics.flowTimeReporter.enabled.
- Untuk beberapa kasus,
COMPLETEDalur mungkin tidak berisi metrik. Sebagai gantinya, metrik dilaporkan di setiap mikro-batch dalam peristiwaflow_progressdengan statusRUNNING. - Karena tampilan hanya dihitung saat dikueri, metrik kualitas data mungkin tidak tersedia untuk tampilan yang ditentukan. Atau, tampilan yang ditanyakan dalam beberapa himpunan data hilir mungkin terdapat beberapa set metrik kualitas data.