إدارة جودة البيانات باستخدام جداول Delta Live
يمكنك استخدام التوقعات لتحديد قيود جودة البيانات على محتويات مجموعة البيانات. تسمح لك التوقعات بضمان أن البيانات التي تصل إلى الجداول تفي بمتطلبات جودة البيانات وتوفر رؤى حول جودة البيانات لكل تحديث للبنية الأساسية لبرنامج ربط العمليات التجارية. يمكنك تطبيق التوقعات على الاستعلامات باستخدام مزخرفات Python أو عبارات قيد SQL.
ما هي توقعات Delta Live Tables؟
التوقعات هي عبارات اختيارية تضيفها إلى إعلانات مجموعة بيانات Delta Live Tables التي تطبق عمليات التحقق من جودة البيانات على كل سجل يمر عبر استعلام.
يتكون التوقع من ثلاثة أشياء:
- وصف، والذي يعمل كمعرف فريد ويسمح لك بتعقب مقاييس القيد.
- عبارة منطقية ترجع دائما صوابا أو خطأ استنادا إلى بعض الشروط المذكورة.
- إجراء يجب اتخاذه عندما يفشل السجل في التوقع، ما يعني أن القيمة المنطقية ترجع خطأ.
تعرض المصفوفة التالية الإجراءات الثلاثة التي يمكنك تطبيقها على السجلات غير الصالحة:
الإجراء | نتيجة |
---|---|
تحذير (افتراضي) | تتم كتابة السجلات غير الصالحة إلى الهدف؛ يتم الإبلاغ عن الفشل كمقياس لمجموعة البيانات. |
قطرة | يتم إسقاط السجلات غير الصالحة قبل كتابة البيانات إلى الهدف؛ يتم الإبلاغ عن الفشل كمقاييس لمجموعة البيانات. |
استعصى | تمنع السجلات غير الصالحة نجاح التحديث. مطلوب تدخل يدوي قبل إعادة المعالجة. |
يمكنك عرض مقاييس جودة البيانات مثل عدد السجلات التي تنتهك التوقعات عن طريق الاستعلام عن سجل أحداث Delta Live Tables. راجع مراقبة خطوط أنابيب Delta Live Tables.
للحصول على مرجع كامل لبناء جملة تعريف مجموعة بيانات Delta Live Tables، راجع مرجع لغة Delta Live Tables Python أو مرجع لغة Delta Live Tables SQL.
إشعار
- بينما يمكنك تضمين عبارات متعددة في أي توقع، تدعم Python فقط تحديد الإجراءات بناء على توقعات متعددة. راجع توقعات متعددة.
- يجب تحديد التوقعات باستخدام تعبيرات SQL. لا يمكنك استخدام بناء جملة غير SQL (على سبيل المثال، وظائف Python) عند تحديد توقع.
الاحتفاظ بالسجلات غير الصالحة
expect
استخدم عامل التشغيل عندما تريد الاحتفاظ بالسجلات التي تنتهك التوقعات. تتم إضافة السجلات التي تنتهك التوقعات إلى مجموعة البيانات الهدف مع سجلات صالحة:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
إسقاط سجلات غير صحيحة
expect or drop
استخدم عامل التشغيل لمنع المزيد من معالجة السجلات غير الصالحة. يتم إسقاط السجلات التي تنتهك التوقعات من مجموعة البيانات الهدف:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
فشل في السجلات غير الصالحة
عندما تكون السجلات غير صالحة غير مقبولة expect or fail
، استخدم عامل التشغيل لإيقاف التنفيذ فورا عند فشل التحقق من صحة السجل. إذا كانت العملية عبارة عن تحديث جدول، يتراجع النظام تلقائيا عن المعاملة:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
عند فشل البنية الأساسية لبرنامج ربط العمليات التجارية بسبب انتهاك توقع، يجب إصلاح التعليمات البرمجية للبنية الأساسية لبرنامج ربط العمليات التجارية لمعالجة البيانات غير الصالحة بشكل صحيح قبل إعادة تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية.
تعدل توقعات الفشل خطة استعلام Spark للتحويلات الخاصة بك لتتبع المعلومات المطلوبة للكشف عن الانتهاكات والإبلاغ عنها. بالنسبة للعديد من الاستعلامات، يمكنك استخدام هذه المعلومات لتحديد سجل الإدخال الذي أدى إلى الانتهاك. فيما يلي مثال على استثناء:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
توقعات متعددة
يمكنك تحديد التوقعات مع واحد أو أكثر من قيود جودة البيانات في مسارات Python. تقبل هذه المصممات قاموس Python كوسيطة، حيث المفتاح هو اسم التوقع والقيمة هي قيد التوقع.
استخدم expect_all
لتحديد قيود جودة بيانات متعددة عندما يجب تضمين السجلات التي تفشل في التحقق من الصحة في مجموعة البيانات الهدف:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
استخدم expect_all_or_drop
لتحديد قيود جودة بيانات متعددة عندما يجب إسقاط السجلات التي تفشل في التحقق من الصحة من مجموعة البيانات الهدف:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
يستخدم expect_all_or_fail
لتحديد قيود جودة بيانات متعددة عندما يجب أن توقف السجلات التي تفشل عملية التحقق من الصحة تنفيذ البنية الأساسية لبرنامج ربط العمليات التجارية:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
يمكنك أيضا تحديد مجموعة من التوقعات كمتغير وتمريرها إلى استعلامات واحدة أو أكثر في البنية الأساسية لبرنامج ربط العمليات التجارية الخاصة بك:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
عزل البيانات غير الصالحة
يستخدم المثال التالي التوقعات مع الجداول وطرق العرض المؤقتة. يوفر لك هذا النمط مقاييس للسجلات التي تمرر عمليات التحقق من التوقعات أثناء تحديثات البنية الأساسية لبرنامج ربط العمليات التجارية، ويوفر طريقة لمعالجة السجلات الصالحة وغير الصالحة من خلال مسارات انتقال البيانات من الخادم المختلفة.
إشعار
يقرأ هذا المثال البيانات النموذجية المضمنة في مجموعات بيانات Databricks. نظرا لأن مجموعات بيانات Databricks غير مدعومة مع البنية الأساسية لبرنامج ربط العمليات التجارية التي تنشر إلى كتالوج Unity، يعمل هذا المثال فقط مع البنية الأساسية لبرنامج ربط العمليات التجارية المكونة للنشر إلى Hive metastore. ومع ذلك، يعمل هذا النمط أيضا مع البنية الأساسية لبرنامج ربط العمليات التجارية الممكنة لكتالوج Unity، ولكن يجب قراءة البيانات من مواقع خارجية. لمعرفة المزيد حول استخدام كتالوج Unity مع Delta Live Tables، راجع استخدام كتالوج Unity مع خطوط أنابيب Delta Live Tables.
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
dlt.read("raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=true")
)
التحقق من صحة عدد الصفوف عبر الجداول
يمكنك إضافة جدول إضافي إلى البنية الأساسية لبرنامج ربط العمليات التجارية الخاصة بك يحدد توقعا لمقارنة عدد الصفوف بين اثنين من طرق العرض المجسدة أو جداول الدفق. تظهر نتائج هذا التوقع في سجل الأحداث وواجهة مستخدم Delta Live Tables. يتحقق المثال التالي من صحة عدد الصفوف المتساوية tbla
بين الجدولين و tblb
:
CREATE OR REFRESH MATERIALIZED VIEW count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
إجراء التحقق المتقدم من الصحة مع توقعات جداول Delta Live
يمكنك تحديد طرق العرض المجسدة باستخدام الاستعلامات التجميعية والانضمام واستخدام نتائج هذه الاستعلامات كجزء من التحقق من توقعاتك. وهذا مفيد إذا كنت ترغب في إجراء عمليات فحص معقدة لجودة البيانات، على سبيل المثال، ضمان احتواء جدول مشتق على جميع السجلات من الجدول المصدر أو ضمان المساواة في عمود رقمي عبر الجداول. يمكنك استخدام TEMPORARY
الكلمة الأساسية لمنع نشر هذه الجداول إلى المخطط الهدف.
يتحقق المثال التالي من وجود كافة السجلات المتوقعة في report
الجدول:
CREATE TEMPORARY MATERIALIZED VIEW report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
يستخدم المثال التالي تجميعا لضمان تفرد المفتاح الأساسي:
CREATE MATERIALIZED VIEW report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
جعل التوقعات قابلة للنقل وإعادة الاستخدام
يمكنك الاحتفاظ بقواعد جودة البيانات بشكل منفصل عن تطبيقات البنية الأساسية لبرنامج ربط العمليات التجارية الخاصة بك.
توصي Databricks بتخزين القواعد في جدول Delta مع كل قاعدة مصنفة حسب علامة. يمكنك استخدام هذه العلامة في تعريفات مجموعة البيانات لتحديد القواعد التي يجب تطبيقها.
ينشئ المثال التالي جدولا باسم rules
للحفاظ على القواعد:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
يحدد مثال Python التالي توقعات جودة البيانات استنادا إلى القواعد المخزنة في rules
الجدول. get_rules()
تقرأ الدالة القواعد من rules
الجدول وترجع قاموس Python يحتوي على قواعد مطابقة للوسيطة التي tag
تم تمريرها إلى الدالة. يتم تطبيق القاموس في @dlt.expect_all_*()
المحسنات لفرض قيود جودة البيانات. على سبيل المثال، سيتم إسقاط أي سجلات تفشل في القواعد التي تم validity
وضع علامة عليها من raw_farmers_market
الجدول:
إشعار
يقرأ هذا المثال البيانات النموذجية المضمنة في مجموعات بيانات Databricks. نظرا لأن مجموعات بيانات Databricks غير مدعومة مع البنية الأساسية لبرنامج ربط العمليات التجارية التي تنشر إلى كتالوج Unity، يعمل هذا المثال فقط مع البنية الأساسية لبرنامج ربط العمليات التجارية المكونة للنشر إلى Hive metastore. ومع ذلك، يعمل هذا النمط أيضا مع البنية الأساسية لبرنامج ربط العمليات التجارية الممكنة لكتالوج Unity، ولكن يجب قراءة البيانات من مواقع خارجية. لمعرفة المزيد حول استخدام كتالوج Unity مع Delta Live Tables، راجع استخدام كتالوج Unity مع خطوط أنابيب Delta Live Tables.
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
بدلا من إنشاء جدول باسم rules
للحفاظ على القواعد، يمكنك إنشاء وحدة Python إلى القواعد الرئيسية، على سبيل المثال، في ملف يسمى rules_module.py
في نفس المجلد مثل دفتر الملاحظات:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
ثم قم بتعديل دفتر الملاحظات السابق عن طريق استيراد الوحدة النمطية وتغيير الدالة get_rules()
للقراءة من الوحدة النمطية بدلا من rules
الجدول:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)