إدارة جودة البيانات باستخدام جداول Delta Live

يمكنك استخدام التوقعات لتحديد قيود جودة البيانات على محتويات مجموعة البيانات. تسمح لك التوقعات بضمان أن البيانات التي تصل إلى الجداول تفي بمتطلبات جودة البيانات وتوفر رؤى حول جودة البيانات لكل تحديث للبنية الأساسية لبرنامج ربط العمليات التجارية. يمكنك تطبيق التوقعات على الاستعلامات باستخدام مزخرفات Python أو عبارات قيد SQL.

ما هي توقعات Delta Live Tables؟

التوقعات هي عبارات اختيارية تضيفها إلى إعلانات مجموعة بيانات Delta Live Tables التي تطبق عمليات التحقق من جودة البيانات على كل سجل يمر عبر استعلام.

يتكون التوقع من ثلاثة أشياء:

  • وصف، والذي يعمل كمعرف فريد ويسمح لك بتعقب مقاييس القيد.
  • عبارة منطقية ترجع دائما صوابا أو خطأ استنادا إلى بعض الشروط المذكورة.
  • إجراء يجب اتخاذه عندما يفشل السجل في التوقع، ما يعني أن القيمة المنطقية ترجع خطأ.

تعرض المصفوفة التالية الإجراءات الثلاثة التي يمكنك تطبيقها على السجلات غير الصالحة:

الإجراء النتيجة
تحذير (افتراضي) تتم كتابة السجلات غير الصالحة إلى الهدف؛ يتم الإبلاغ عن الفشل كمقياس لمجموعة البيانات.
اسقاط يتم إسقاط السجلات غير الصالحة قبل كتابة البيانات إلى الهدف؛ يتم الإبلاغ عن الفشل كمقاييس لمجموعة البيانات.
تفشل تمنع السجلات غير الصالحة نجاح التحديث. مطلوب تدخل يدوي قبل إعادة المعالجة.

يمكنك عرض مقاييس جودة البيانات مثل عدد السجلات التي تنتهك التوقعات عن طريق الاستعلام عن سجل أحداث Delta Live Tables. راجع مراقبة خطوط أنابيب Delta Live Tables.

للحصول على مرجع كامل لبناء جملة تعريف مجموعة بيانات Delta Live Tables، راجع مرجع لغة Delta Live Tables Python أو مرجع لغة Delta Live Tables SQL.

إشعار

بينما يمكنك تضمين عبارات متعددة في أي توقع، تدعم Python فقط تحديد الإجراءات بناء على توقعات متعددة. راجع توقعات متعددة.

الاحتفاظ بالسجلات غير الصالحة

expect استخدم عامل التشغيل عندما تريد الاحتفاظ بالسجلات التي تنتهك التوقعات. تتم إضافة السجلات التي تنتهك التوقعات إلى مجموعة البيانات الهدف مع سجلات صالحة:

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

إسقاط سجلات غير صحيحة

expect or drop استخدم عامل التشغيل لمنع المزيد من معالجة السجلات غير الصالحة. يتم إسقاط السجلات التي تنتهك التوقعات من مجموعة البيانات الهدف:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

فشل في السجلات غير الصالحة

عندما تكون السجلات غير صالحة غير مقبولة expect or fail ، استخدم عامل التشغيل لإيقاف التنفيذ فورا عند فشل التحقق من صحة السجل. إذا كانت العملية عبارة عن تحديث جدول، يتراجع النظام تلقائيا عن المعاملة:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

عند فشل البنية الأساسية لبرنامج ربط العمليات التجارية بسبب انتهاك توقع، يجب إصلاح التعليمات البرمجية للبنية الأساسية لبرنامج ربط العمليات التجارية لمعالجة البيانات غير الصالحة بشكل صحيح قبل إعادة تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية.

تعدل توقعات الفشل خطة استعلام Spark للتحويلات الخاصة بك لتتبع المعلومات المطلوبة للكشف عن الانتهاكات والإبلاغ عنها. بالنسبة للعديد من الاستعلامات، يمكنك استخدام هذه المعلومات لتحديد سجل الإدخال الذي أدى إلى الانتهاك. فيما يلي مثال على استثناء:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

توقعات متعددة

يمكنك تحديد التوقعات مع واحد أو أكثر من قيود جودة البيانات في مسارات Python. تقبل هذه المصممات قاموس Python كوسيطة، حيث المفتاح هو اسم التوقع والقيمة هي قيد التوقع.

استخدم expect_all لتحديد قيود جودة بيانات متعددة عندما يجب تضمين السجلات التي تفشل في التحقق من الصحة في مجموعة البيانات الهدف:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

استخدم expect_all_or_drop لتحديد قيود جودة بيانات متعددة عندما يجب إسقاط السجلات التي تفشل في التحقق من الصحة من مجموعة البيانات الهدف:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

يستخدم expect_all_or_fail لتحديد قيود جودة بيانات متعددة عندما يجب أن توقف السجلات التي تفشل عملية التحقق من الصحة تنفيذ البنية الأساسية لبرنامج ربط العمليات التجارية:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

يمكنك أيضا تحديد مجموعة من التوقعات كمتغير وتمريرها إلى استعلامات واحدة أو أكثر في البنية الأساسية لبرنامج ربط العمليات التجارية الخاصة بك:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

عزل البيانات غير الصالحة

يستخدم المثال التالي التوقعات مع الجداول وطرق العرض المؤقتة. يوفر لك هذا النمط مقاييس للسجلات التي تمرر عمليات التحقق من التوقعات أثناء تحديثات البنية الأساسية لبرنامج ربط العمليات التجارية، ويوفر طريقة لمعالجة السجلات الصالحة وغير الصالحة من خلال مسارات انتقال البيانات من الخادم المختلفة.

إشعار

يقرأ هذا المثال البيانات النموذجية المضمنة في مجموعات بيانات Databricks. نظرا لأن مجموعات بيانات Databricks غير مدعومة مع البنية الأساسية لبرنامج ربط العمليات التجارية التي تنشر إلى كتالوج Unity، يعمل هذا المثال فقط مع البنية الأساسية لبرنامج ربط العمليات التجارية المكونة للنشر إلى Hive metastore. ومع ذلك، يعمل هذا النمط أيضا مع البنية الأساسية لبرنامج ربط العمليات التجارية الممكنة لكتالوج Unity، ولكن يجب قراءة البيانات من مواقع خارجية. لمعرفة المزيد حول استخدام كتالوج Unity مع Delta Live Tables، راجع استخدام كتالوج Unity مع خطوط أنابيب Delta Live Tables.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

التحقق من صحة عدد الصفوف عبر الجداول

يمكنك إضافة جدول إضافي إلى البنية الأساسية لبرنامج ربط العمليات التجارية الخاصة بك يحدد توقع مقارنة عدد الصفوف بين جدولين مباشرين. تظهر نتائج هذا التوقع في سجل الأحداث وواجهة مستخدم Delta Live Tables. يتحقق هذا المثال التالي من صحة عدد الصفوف المتساوية بين الجدولين tbla و tblb :

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

إجراء التحقق المتقدم من الصحة مع توقعات جداول Delta Live

يمكنك تعريف الجداول المباشرة باستخدام الاستعلامات التجميعية والانضمام واستخدام نتائج هذه الاستعلامات كجزء من التحقق من توقعاتك. وهذا مفيد إذا كنت ترغب في إجراء عمليات فحص معقدة لجودة البيانات، على سبيل المثال، ضمان احتواء جدول مشتق على جميع السجلات من الجدول المصدر أو ضمان المساواة في عمود رقمي عبر الجداول. يمكنك استخدام TEMPORARY الكلمة الأساسية لمنع نشر هذه الجداول إلى المخطط الهدف.

يتحقق المثال التالي من وجود كافة السجلات المتوقعة في report الجدول:

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

يستخدم المثال التالي تجميعا لضمان تفرد المفتاح الأساسي:

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

جعل التوقعات قابلة للنقل وإعادة الاستخدام

يمكنك الاحتفاظ بقواعد جودة البيانات بشكل منفصل عن تطبيقات البنية الأساسية لبرنامج ربط العمليات التجارية الخاصة بك.

توصي Databricks بتخزين القواعد في جدول Delta مع كل قاعدة مصنفة حسب علامة. يمكنك استخدام هذه العلامة في تعريفات مجموعة البيانات لتحديد القواعد التي يجب تطبيقها.

ينشئ المثال التالي جدولا باسم rules للحفاظ على القواعد:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

يحدد مثال Python التالي توقعات جودة البيانات استنادا إلى القواعد المخزنة في rules الجدول. get_rules() تقرأ الدالة القواعد من rules الجدول وترجع قاموس Python يحتوي على قواعد مطابقة للوسيطة التي tag تم تمريرها إلى الدالة. يتم تطبيق القاموس في @dlt.expect_all_*() المحسنات لفرض قيود جودة البيانات. على سبيل المثال، سيتم إسقاط أي سجلات تفشل في القواعد التي تم validity وضع علامة عليها من raw_farmers_market الجدول:

إشعار

يقرأ هذا المثال البيانات النموذجية المضمنة في مجموعات بيانات Databricks. نظرا لأن مجموعات بيانات Databricks غير مدعومة مع البنية الأساسية لبرنامج ربط العمليات التجارية التي تنشر إلى كتالوج Unity، يعمل هذا المثال فقط مع البنية الأساسية لبرنامج ربط العمليات التجارية المكونة للنشر إلى Hive metastore. ومع ذلك، يعمل هذا النمط أيضا مع البنية الأساسية لبرنامج ربط العمليات التجارية الممكنة لكتالوج Unity، ولكن يجب قراءة البيانات من مواقع خارجية. لمعرفة المزيد حول استخدام كتالوج Unity مع Delta Live Tables، راجع استخدام كتالوج Unity مع خطوط أنابيب Delta Live Tables.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

بدلا من إنشاء جدول باسم rules للحفاظ على القواعد، يمكنك إنشاء وحدة Python إلى القواعد الرئيسية، على سبيل المثال، في ملف يسمى rules_module.py في نفس المجلد مثل دفتر الملاحظات:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

ثم قم بتعديل دفتر الملاحظات السابق عن طريق استيراد الوحدة النمطية وتغيير الدالة get_rules() للقراءة من الوحدة النمطية بدلا من rules الجدول:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )