Управление качеством данных с помощью разностных динамических таблиц

Для определения ограничений качества данных в содержимом наборов данных используются ожидания. Ожидания позволяют гарантировать, что данные, поступающие в таблицы, соответствуют требованиям к качеству данных и предоставляют аналитические сведения о качестве данных для каждого обновления конвейера. Применяйте ожидания к запросам с помощью декораторов Python или предложений ограничений SQL.

Что такое ожидания разностных динамических таблиц?

Ожидания — это необязательные предложения, добавляемые в объявления набора данных Delta Live Tables, которые применяют проверка качества данных к каждой записи, передаваемой через запрос.

Ожидание состоит из трех вещей:

  • Описание, которое выступает в качестве уникального идентификатора и позволяет отслеживать метрики ограничения.
  • Логический оператор, который всегда возвращает значение true или false на основе определенного указанного условия.
  • Действие, выполняемое при сбое ожидания записи, то есть логическое значение возвращает значение false.

В следующей матрице показаны три действия, которые можно применить к недопустимым записям:

Действие Результат
предупреждение (по умолчанию) Недопустимые записи записываются в целевой объект; Сбой сообщается как метрика для набора данных.
Падение Недопустимые записи удаляются перед записью данных в целевой объект; сбой сообщается как метрики для набора данных.
fail Недопустимые записи препятствуют успешному обновлению. Перед повторной обработкой требуется вмешательство вручную.

Вы можете просмотреть метрики качества данных, например число записей, которые отличаются от ожидания, запросив журнал событий разностных динамических таблиц. См. статью "Мониторинг конвейеров разностных динамических таблиц".

Полный справочник по синтаксису объявления набора данных Delta Live Table см . в справочнике по языку Python delta Live Table или справочнике по языку SQL Delta Live Table.

Примечание.

Хотя вы можете включить несколько предложений в любое ожидание, только Python поддерживает определение действий на основе нескольких ожиданий. См . несколько ожиданий.

Хранение недопустимых записей

Используйте оператор expect, когда нужно сохранять записи, отличающиеся от ожидания. Записи, отличающиеся от ожидания, добавляются в целевой набор данных вместе с допустимыми записями:

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Удаление недопустимых записей

expect or drop Используйте оператор, чтобы предотвратить дальнейшую обработку недопустимых записей. Записи, отличающиеся от ожидания, удаляются из целевого набора данных:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Сбой при наличии недопустимых записей

Если недопустимые записи неприемлемы, используйте expect or fail оператор, чтобы остановить выполнение сразу после сбоя проверки записи. Если операция является обновлением таблиц, система атомарно откатывает транзакцию:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

При сбое конвейера из-за отклонения от ожидаемых результатов необходимо исправить код конвейера, чтобы правильно обработать недопустимые данные и только после этого повторно запустить конвейер.

Отклонения от ожиданий изменяют план запроса Spark для преобразований, что позволяет отслеживать сведения, необходимые для обнаружения отклонений и создания отчетов по ним. Для многих запросов эти сведения можно использовать для указания того, какая входная запись привела к отклонению. Ниже приведен пример исключения:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Множественные ожидания

Вы можете определить ожидания с одним или несколькими ограничениями качества данных в конвейерах Python. Эти декораторы принимают словарь Python в качестве аргумента, где ключ является именем ожидания, а значение — ограничением ожидания.

Используйте expect_all, чтобы указать несколько ограничений качества данных, если записи, которые не прошли проверку, следует включить в целевой набор данных:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Используйте expect_all_or_drop, чтобы указать несколько ограничений качества данных, если записи, которые не прошли проверку, следует удалить из целевого набора данных:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Используйте expect_all_or_fail, чтобы указать несколько ограничений качества данных, если записи, которые не прошли проверку, должны быть причиной остановки выполнения конвейера:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Можно также задать коллекцию ожиданий как переменную и передать ее в один или несколько запросов в конвейере:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Карантин недопустимых данных

В следующем примере используются ожидания в сочетании с временными таблицами и представлениями. Этот шаблон предоставляет метрики для записей, которые передают ожидаемые проверка во время обновлений конвейера, и предоставляет способ обработки допустимых и недопустимых записей с помощью различных подчиненных путей.

Примечание.

В этом примере считываются примеры данных, включенные в наборы данных Databricks. Так как наборы данных Databricks не поддерживаются конвейером, который публикуется в каталоге Unity, этот пример работает только с конвейером, настроенным для публикации в хранилище метаданных Hive. Однако этот шаблон также работает с конвейерами с включенными конвейерами каталога Unity, но необходимо считывать данные из внешних расположений. Дополнительные сведения об использовании каталога Unity с разностными динамическими таблицами см. в статье Использование каталога Unity с конвейерами таблиц Delta Live Tables.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Проверка количества строк в таблицах

В конвейер можно добавить дополнительную таблицу, которая определяет ожидание сравнения количества строк между двумя динамическими таблицами. Результаты этого ожидания отображаются в журнале событий и пользовательском интерфейсе разностной динамической таблицы. В следующем примере проверяется количество равных строк между tbla таблицами и tblb таблицами.

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Выполнение расширенной проверки с ожиданиями разностных динамических таблиц

Динамические таблицы можно определить с помощью запросов агрегирования и объединения и использовать результаты этих запросов в ходе проверки ожиданий. Это полезно, если вы хотите выполнить сложные проверка качества данных, например, гарантируя, что производная таблица содержит все записи из исходной таблицы или гарантирует равенство числовых столбцов между таблицами. Вы можете использовать TEMPORARY ключевое слово, чтобы предотвратить публикацию этих таблиц в целевой схеме.

В следующем примере проверяется, присутствуют ли в таблице report все ожидаемые записи:

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

В следующем примере используется агрегат для обеспечения уникальности первичного ключа:

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Обеспечение возможности переноса и повторного использования ожиданий

Правила качества данных можно поддерживать отдельно от реализаций конвейера.

Databricks рекомендует хранить правила в таблице Delta с каждым правилом, классифицированным по тегу. Этот тег используется в определениях набора данных, чтобы определить, какие правила следует применять.

В следующем примере создается таблица с именем rules для поддержания правил:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

В следующем примере Python определяются ожидания качества данных на основе правил, хранящихся в rules таблице. Функция get_rules() считывает правила из rules таблицы и возвращает словарь Python, содержащий правила, tag соответствующие аргументу, переданном функции. Словарь применяется в декораторах @dlt.expect_all_*() для обеспечения ограничений качества данных. Например, все записи, которые не соответствуют правилам и помеченные с помощью validity, будут удалены из таблицы raw_farmers_market:

Примечание.

В этом примере считываются примеры данных, включенные в наборы данных Databricks. Так как наборы данных Databricks не поддерживаются конвейером, который публикуется в каталоге Unity, этот пример работает только с конвейером, настроенным для публикации в хранилище метаданных Hive. Однако этот шаблон также работает с конвейерами с включенными конвейерами каталога Unity, но необходимо считывать данные из внешних расположений. Дополнительные сведения об использовании каталога Unity с разностными динамическими таблицами см. в статье Использование каталога Unity с конвейерами таблиц Delta Live Tables.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Вместо создания таблицы с именем rules для поддержания правил можно создать модуль Python для основных правил, например в файле с именем rules_module.py в той же папке, что и записная книжка:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Затем измените предыдущую записную книжку, импортируя модуль и изменив get_rules() функцию для чтения из модуля вместо rules таблицы:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )