Mengelola kualitas data dengan ekspektasi alur

Gunakan ekspektasi untuk menerapkan batasan kualitas yang memvalidasi data saat mengalir melalui alur ETL. Ekspektasi memberikan wawasan yang lebih besar tentang metrik kualitas data dan memungkinkan Anda untuk gagal memperbarui atau menghilangkan rekaman saat mendeteksi rekaman yang tidak valid.

Artikel ini memiliki gambaran umum ekspektasi, termasuk contoh sintaks dan opsi perilaku. Untuk kasus penggunaan tingkat lanjut dan praktik terbaik yang direkomendasikan, lihat Rekomendasi ekspektasi dan pola tingkat lanjut.

Grafik alur ekspektasi alur

Apa itu ekspektasi?

Ekspektasi adalah klausul opsional dalam tampilan materialisasi jalur, tabel streaming, atau pernyataan pembuatan tampilan yang menerapkan pemeriksaan kualitas data pada setiap rekaman yang melewati kueri. Ekspektasi menggunakan pernyataan SQL Boolean standar untuk menentukan batasan. Anda dapat menggabungkan beberapa harapan untuk satu himpunan data dan menetapkan harapan di semua deklarasi himpunan data dalam pipeline.

Bagian berikut memperkenalkan tiga komponen ekspektasi dan memberikan contoh sintaks.

Nama ekspektasi

Setiap harapan harus memiliki nama, yang digunakan sebagai pengidentifikasi untuk melacak dan memantau harapan. Pilih nama yang mengomunikasikan metrik yang sedang divalidasi. Contoh berikut mendefinisikan harapan untuk mengonfirmasi valid_customer_age bahwa usia antara 0 dan 120 tahun:

Penting

Nama harapan harus unik untuk himpunan data tertentu. Anda dapat menggunakan kembali ekspektasi di beberapa himpunan data dalam pipeline. Lihat Ekspektasi portabel dan dapat digunakan kembali.

Phyton

@dp.table
@dp.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

Batasan untuk mengevaluasi

Klausa batasan adalah pernyataan kondisional SQL yang harus dievaluasi ke true atau false untuk setiap rekaman. Batasan berisi logika aktual untuk apa yang sedang divalidasi. Ketika rekaman gagal dalam kondisi ini, harapan dipicu.

Batasan harus menggunakan sintaks SQL yang valid dan tidak boleh berisi yang berikut ini:

  • Fungsi Python kustom
  • Panggilan layanan eksternal
  • Subkueri yang mereferensikan tabel lain

Berikut ini adalah contoh batasan yang dapat ditambahkan ke pernyataan pembuatan himpunan data:

Phyton

Sintaks untuk batasan dalam Python adalah:

@dp.expect(<constraint-name>, <constraint-clause>)

Beberapa batasan dapat ditentukan:

@dp.expect(<constraint-name>, <constraint-clause>)
@dp.expect(<constraint2-name>, <constraint2-clause>)

Examples:

# Simple constraint
@dp.expect("non_negative_price", "price >= 0")

# SQL functions
@dp.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dp.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dp.expect("non_negative_price", "price >= 0")
@dp.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dp.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dp.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

Sintaks untuk batasan dalam SQL adalah:

CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> )

Beberapa batasan harus dipisahkan oleh koma:

CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> ),
CONSTRAINT <constraint2-name> EXPECT ( <constraint2-clause> )

Examples:

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0),
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

Tindakan pada catatan yang tidak valid

Anda harus menentukan tindakan untuk menentukan apa yang terjadi ketika rekaman gagal dalam pemeriksaan validasi. Tabel berikut ini menjelaskan tindakan yang tersedia:

Tindakan Sintaks SQL Sintaks Python Result
peringatkan (default) EXPECT dp.expect Rekaman yang tidak valid ditulis ke target.
menjatuhkan EXPECT ... ON VIOLATION DROP ROW dp.expect_or_drop Rekaman yang tidak valid dihilangkan sebelum data ditulis ke target. Jumlah rekaman yang dihilangkan dicatat bersama metrik himpunan data lainnya.
gagal EXPECT ... ON VIOLATION FAIL UPDATE dp.expect_or_fail Rekaman yang tidak valid mencegah pembaruan berhasil. Intervensi manual diperlukan sebelum pemrosesan ulang. Ekspektasi ini menyebabkan kegagalan satu alur dan tidak menyebabkan alur lain di pipeline Anda gagal.

Anda juga dapat menerapkan logika tingkat lanjut untuk mengkarantina rekaman yang tidak valid tanpa gagal atau menghilangkan data. Lihat Mengkarantina rekaman yang tidak valid.

Metrik pelacakan harapan

Anda dapat melihat metrik pelacakan untuk tindakan warn atau drop dari UI pipeline. Karena fail menyebabkan pembaruan gagal ketika rekaman yang tidak valid terdeteksi, metrik tidak direkam.

Untuk melihat metrik ekspektasi, selesaikan langkah-langkah berikut:

  1. Di bilah sisi ruang kerja Azure Databricks Anda, klik Tugas & Pipeline.
  2. Klik Nama alur Anda.
  3. Klik himpunan data dengan harapan yang ditentukan.
  4. Pilih tab Kualitas data di bilah sisi kanan.

Anda dapat melihat metrik kualitas data dengan mengkueri log peristiwa Lakeflow Spark Declarative Pipelines. Lihat Metrik kualitas data kueri atau metrik ekspektasi.

Pertahankan rekaman yang tidak valid

Menyimpan rekaman yang tidak valid adalah perilaku default untuk ekspektasi. expect Gunakan operator saat Anda ingin menyimpan rekaman yang melanggar ekspektasi tetapi mengumpulkan metrik tentang berapa banyak rekaman yang lolos atau gagal batasan. Rekaman yang melanggar harapan ditambahkan ke himpunan data target bersama dengan rekaman yang valid:

Phyton

@dp.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Menghapus rekaman yang tidak valid

expect_or_drop Gunakan operator untuk mencegah pemrosesan rekaman yang tidak valid lebih lanjut. Rekaman yang melanggar harapan dihilangkan dari himpunan data target:

Phyton

@dp.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Gagal pada entri yang tidak valid

Ketika rekaman yang tidak valid tidak dapat diterima, gunakan expect_or_fail operator untuk segera menghentikan eksekusi ketika rekaman gagal validasi. Jika operasi adalah pembaruan tabel, sistem secara atomik mengembalikan transaksi.

Phyton

@dp.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Penting

Jika Anda memiliki beberapa alur paralel yang ditentukan dalam jalur proses, kegagalan satu alur tidak menyebabkan alur-alur lain gagal.

Grafik penjelasan kegagalan aliran LDP

Pemecahan masalah pembaruan yang gagal sesuai ekspektasi

Ketika alur gagal karena pelanggaran ekspektasi, Anda harus memperbaiki kode alur untuk menangani data yang tidak valid dengan benar sebelum menjalankan kembali alur.

Ekspektasi yang dikonfigurasi untuk membuat pipeline gagal memodifikasi rencana kueri Spark pada transformasi Anda untuk melacak informasi yang diperlukan guna mendeteksi dan melaporkan pelanggaran. Anda dapat menggunakan informasi ini untuk mengidentifikasi rekaman input mana yang menghasilkan pelanggaran pada berbagai kueri. Lakeflow Spark Declarative Pipelines menyediakan pesan kesalahan khusus untuk melaporkan pelanggaran tersebut. Berikut adalah contoh pesan kesalahan pelanggaran ekspektasi:

[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false

Manajemen beberapa ekspektasi

Nota

Meskipun SQL dan Python mendukung beberapa ekspektasi dalam satu himpunan data, hanya Python yang memungkinkan Anda mengelompokkan beberapa harapan dan menentukan tindakan kolektif.

LDP dengan beberapa ekspektasi pada graf aliran

Anda dapat mengelompokkan beberapa ekspektasi bersama-sama dan menentukan tindakan kolektif menggunakan fungsi expect_all, expect_all_or_drop, dan expect_all_or_fail.

Dekorator ini menerima kamus Python sebagai argumen, di mana kuncinya adalah nama ekspektasi dan nilainya adalah kendala ekspektasi. Anda dapat menggunakan kembali serangkaian ekspektasi yang sama dalam beberapa himpunan data di alur Anda. Berikut ini menunjukkan contoh masing-masing expect_all operator Python:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dp.table
@dp.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dp.table
@dp.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dp.table
@dp.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset

Batasan

  • Karena hanya tabel streaming, tampilan materialisasi, dan tampilan sementara yang mendukung ekspetasi, metrik kualitas data hanya didukung untuk jenis objek ini.
  • Metrik kualitas data tidak tersedia saat:
    • Tidak ada ekspektasi yang ditetapkan pada kueri.
    • Alur menggunakan operator yang tidak mendukung fungsi yang diharapkan.
    • Jenis alur, seperti sink, tidak mendukung ekspektasi.
    • Tidak ada pembaruan untuk tabel streaming terkait atau tampilan materialisasi untuk proses alur kerja tertentu.
    • Konfigurasi alur tidak menyertakan pengaturan yang diperlukan untuk menangkap metrik, seperti pipelines.metrics.flowTimeReporter.enabled.
  • Untuk beberapa kasus, COMPLETED alur mungkin tidak berisi metrik. Sebagai gantinya, metrik dilaporkan di setiap mikro-batch dalam peristiwa flow_progress dengan status RUNNING.
  • Karena tampilan hanya dihitung saat dikueri, metrik kualitas data mungkin tidak tersedia untuk tampilan yang ditentukan. Atau, tampilan yang ditanyakan dalam beberapa himpunan data hilir mungkin terdapat beberapa set metrik kualitas data.