Поделиться через


Рекомендации по ожиданиям и расширенные шаблоны

В этой статье содержатся рекомендации по реализации ожиданий в масштабе и примерах расширенных шаблонов, поддерживаемых ожиданиями. Эти шаблоны используют несколько наборов данных в сочетании с ожиданиями и требуют, чтобы пользователи понимали синтаксис и семантику материализованных представлений, потоковых таблиц и ожиданий.

Общие сведения о поведении и синтаксисе ожиданий см. в статье "Управление качеством данных с помощью ожиданий конвейера".

Переносимые и многократно используемые ожидаемые результаты

Databricks рекомендует следующие рекомендации при реализации ожиданий для повышения переносимости и снижения нагрузки на обслуживание:

Recommendation Воздействие
Храните определения ожиданий отдельно от логики конвейера. Легко применять ожидания к нескольким наборам данных или конвейерам. Обновление, аудит и поддержание ожиданий без изменения исходного кода конвейера.
Добавьте настраиваемые теги для создания групп связанных ожиданий. Фильтрация ожиданий на основе тегов.
Согласованное применение ожиданий для аналогичных наборов данных. Используйте одинаковые ожидания для нескольких наборов данных и конвейеров для оценки одинаковой логики.

В следующих примерах показано использование таблицы delta или словаря для создания центрального репозитория ожиданий. Затем пользовательские функции Python применяют эти ожидания к наборам данных в примере конвейера:

Таблица дельта

В следующем примере создается таблица с именем rules для поддержания правил:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

В следующем примере Python определяются ожидания качества данных на основе правил в rules таблице. Функция get_rules() считывает правила из rules таблицы и возвращает словарь Python, содержащий правила, tag соответствующие аргументу, переданном функции.

В этом примере словарь используется с помощью @dp.expect_all_or_drop() декораторов для обеспечения соблюдения ограничений на качество данных.

Например, все записи, не соответствующие правилам и помеченные тегом validity, будут удалены из таблицы raw_farmers_market.

from pyspark import pipelines as dp
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  df = spark.read.table("rules").filter(col("tag") == tag).collect()
  return {
      row['name']: row['constraint']
      for row in df
  }

@dp.table
@dp.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dp.table
@dp.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
  return (
    spark.read.table("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
  )

Модуль Python

В следующем примере создается модуль Python для поддержания правил. В этом примере сохраните этот код в файле с именем rules_module.py в той же папке, что и записная книжка, используемая в качестве исходного кода для конвейера:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

В следующем примере Python определяются ожидания качества данных на основе правил, определенных в rules_module.py файле. Функция get_rules() возвращает словарь Python, содержащий правила, соответствующие аргументу, переданного tag в него.

В этом примере словарь используется с помощью @dp.expect_all_or_drop() декораторов для обеспечения соблюдения ограничений на качество данных.

Например, все записи, не соответствующие правилам и помеченные тегом validity, будут удалены из таблицы raw_farmers_market.

from pyspark import pipelines as dp
from rules_module import *
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  return {
    row['name']: row['constraint']
    for row in get_rules_as_list_of_dict()
    if row['tag'] == tag
  }

@dp.table
@dp.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dp.table
@dp.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
  return (
    spark.read.table("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
  )

Проверка количества строк

В следующем примере проверяется равенство количества строк между table_a и table_b, чтобы убедиться, что никакие данные не теряются во время преобразований.

График проверки счетчика строк LDP с ожидаемым использованием

Питон

@dp.view(
  name="count_verification",
  comment="Validates equal row counts between tables"
)
@dp.expect_or_fail("no_rows_dropped", "a_count == b_count")
def validate_row_counts():
  return spark.sql("""
    SELECT * FROM
      (SELECT COUNT(*) AS a_count FROM table_a),
      (SELECT COUNT(*) AS b_count FROM table_b)""")

SQL

CREATE OR REFRESH MATERIALIZED VIEW count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM table_a),
  (SELECT COUNT(*) AS b_count FROM table_b)

Обнаружение отсутствующих записей

В следующем примере проверяется наличие всех ожидаемых записей в report таблице:

График обнаружения отсутствующих строк с использованием ожиданий

Питон

@dp.view(
  name="report_compare_tests",
  comment="Validates no records are missing after joining"
)
@dp.expect_or_fail("no_missing_records", "r_key IS NOT NULL")
def validate_report_completeness():
  return (
    spark.read.table("validation_copy").alias("v")
      .join(
        spark.read.table("report").alias("r"),
        on="key",
        how="left_outer"
      )
      .select(
        "v.*",
        "r.key as r_key"
      )
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r_key IS NOT NULL)
)
AS SELECT v.*, r.key as r_key FROM validation_copy v
  LEFT OUTER JOIN report r ON v.key = r.key

Уникальность первичного ключа

В следующем примере проверяются ограничения первичного ключа в таблицах:

Граф уникальности первичного ключа LDP с ожидаемым использованием

Питон

@dp.view(
  name="report_pk_tests",
  comment="Validates primary key uniqueness"
)
@dp.expect_or_fail("unique_pk", "num_entries = 1")
def validate_pk_uniqueness():
  return (
    spark.read.table("report")
      .groupBy("pk")
      .count()
      .withColumnRenamed("count", "num_entries")
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
  FROM report
  GROUP BY pk

Шаблон эволюции схемы

В следующем примере показано, как обрабатывать эволюцию схемы для дополнительных столбцов. Используйте этот шаблон при миграции источников данных или обработке нескольких версий исходных данных, обеспечивая обратную совместимость и соблюдение высокого качества данных.

Проверка эволюции схемы LDP с ожидаемым использованием

Питон

@dp.table
@dp.expect_all_or_fail({
  "required_columns": "col1 IS NOT NULL AND col2 IS NOT NULL",
  "valid_col3": "CASE WHEN col3 IS NOT NULL THEN col3 > 0 ELSE TRUE END"
})
def evolving_table():
  # Legacy data (V1 schema)
  legacy_data = spark.read.table("legacy_source")

  # New data (V2 schema)
  new_data = spark.read.table("new_source")

  # Combine both sources
  return legacy_data.unionByName(new_data, allowMissingColumns=True)

SQL

CREATE OR REFRESH MATERIALIZED VIEW evolving_table(
  -- Merging multiple constraints into one as expect_all is Python-specific API
  CONSTRAINT valid_migrated_data EXPECT (
    (col1 IS NOT NULL AND col2 IS NOT NULL) AND (CASE WHEN col3 IS NOT NULL THEN col3 > 0 ELSE TRUE END)
  ) ON VIOLATION FAIL UPDATE
) AS
  SELECT * FROM new_source
  UNION
  SELECT *, NULL as col3 FROM legacy_source;

Шаблон проверки на основе диапазона

В следующем примере демонстрируется, как проверить новые точки данных по историческим статистическим диапазонам, помогая выявлять выбросы данных и аномалии в потоке данных.

Валидация на основе диапазона LDP с использованием ожиданий

Питон

@dp.view
def stats_validation_view():
  # Calculate statistical bounds from historical data
  bounds = spark.sql("""
    SELECT
      avg(amount) - 3 * stddev(amount) as lower_bound,
      avg(amount) + 3 * stddev(amount) as upper_bound
    FROM historical_stats
    WHERE
      date >= CURRENT_DATE() - INTERVAL 30 DAYS
  """)

  # Join with new data and apply bounds
  return spark.read.table("new_data").crossJoin(bounds)

@dp.table
@dp.expect_or_drop(
  "within_statistical_range",
  "amount BETWEEN lower_bound AND upper_bound"
)
def validated_amounts():
  return spark.read.table("stats_validation_view")

SQL

CREATE OR REFRESH MATERIALIZED VIEW stats_validation_view AS
  WITH bounds AS (
    SELECT
    avg(amount) - 3 * stddev(amount) as lower_bound,
    avg(amount) + 3 * stddev(amount) as upper_bound
    FROM historical_stats
    WHERE date >= CURRENT_DATE() - INTERVAL 30 DAYS
  )
  SELECT
    new_data.*,
    bounds.*
  FROM new_data
  CROSS JOIN bounds;

CREATE OR REFRESH MATERIALIZED VIEW validated_amounts (
  CONSTRAINT within_statistical_range EXPECT (amount BETWEEN lower_bound AND upper_bound)
)
AS SELECT * FROM stats_validation_view;

Карантин недопустимых записей

Этот шаблон сочетает ожидания с временными таблицами и представлениями для отслеживания метрик качества данных во время обновлений конвейера и предоставляет отдельные пути обработки для допустимых и недопустимых записей в последующих операциях.

Шаблон карантина данных LDP с использованием ожиданий

Питон

from pyspark import pipelines as dp
from pyspark.sql.functions import expr

rules = {
  "valid_pickup_zip": "(pickup_zip IS NOT NULL)",
  "valid_dropoff_zip": "(dropoff_zip IS NOT NULL)",
}
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dp.view
def raw_trips_data():
  return spark.readStream.table("samples.nyctaxi.trips")

@dp.table(
  temporary=True,
  partition_cols=["is_quarantined"],
)
@dp.expect_all(rules)
def trips_data_quarantine():
  return (
    spark.readStream.table("raw_trips_data").withColumn("is_quarantined", expr(quarantine_rules))
  )

@dp.view
def valid_trips_data():
  return spark.read.table("trips_data_quarantine").filter("is_quarantined=false")

@dp.view
def invalid_trips_data():
  return spark.read.table("trips_data_quarantine").filter("is_quarantined=true")

SQL

CREATE TEMPORARY STREAMING LIVE VIEW raw_trips_data AS
  SELECT * FROM STREAM(samples.nyctaxi.trips);

CREATE OR REFRESH TEMPORARY STREAMING TABLE trips_data_quarantine(
  -- Option 1 - merge all expectations to have a single name in the pipeline event log
  CONSTRAINT quarantined_row EXPECT (pickup_zip IS NOT NULL OR dropoff_zip IS NOT NULL),
  -- Option 2 - Keep the expectations separate, resulting in multiple entries under different names
  CONSTRAINT invalid_pickup_zip EXPECT (pickup_zip IS NOT NULL),
  CONSTRAINT invalid_dropoff_zip EXPECT (dropoff_zip IS NOT NULL)
)
PARTITIONED BY (is_quarantined)
AS
  SELECT
    *,
    NOT ((pickup_zip IS NOT NULL) and (dropoff_zip IS NOT NULL)) as is_quarantined
  FROM STREAM(raw_trips_data);

CREATE TEMPORARY LIVE VIEW valid_trips_data AS
SELECT * FROM trips_data_quarantine WHERE is_quarantined=FALSE;

CREATE TEMPORARY LIVE VIEW invalid_trips_data AS
SELECT * FROM trips_data_quarantine WHERE is_quarantined=TRUE;