Administración de la calidad de los datos con Delta Live Tables

Las expectativas se usan para definir restricciones de calidad de datos en los contenidos de un conjunto de datos. Las expectativas permiten garantizar que los datos que llegan a las tablas cumplen los requisitos de calidad de los datos y proporcionan información sobre la calidad de los datos para cada actualización de canalización. Las expectativas se aplican a las consultas mediante decoradores de Python o cláusulas de restricción SQL.

¿Cuáles son las expectativas de Delta Live Tables?

Las expectativas son cláusulas opcionales que se agregan a las declaraciones de conjuntos de datos de Delta Live Tables que aplican comprobaciones de calidad de datos en cada registro que pasa a través de una consulta.

Una expectativa consta de tres cosas:

  • Una descripción, que actúa como un identificador único y permite realizar un seguimiento de las métricas de la restricción.
  • Una instrucción booleana que siempre devuelve true o false en función de alguna condición indicada.
  • Una acción que se debe realizar cuando un registro no cumple la expectativa, lo que significa que el valor booleano devuelve false.

En la matriz siguiente se muestran las tres acciones que se pueden aplicar a registros no válidos:

Acción Resultado
advertir (valor predeterminado) Los registros no válidos se escriben en el destino; el error se notifica como una métrica del conjunto de datos.
anular Los registros no válidos se quitan antes de que los datos se escriban en el destino; el error se notifica como una métrica para el conjunto de datos.
fail Los registros no válidos impiden que la actualización se realice correctamente. Es necesario intervenir manualmente antes de volver a procesar.

Puede ver métricas de calidad de datos, como el número de registros que infringen una expectativa, consultando el registro de eventos de Delta Live Tables. Vea Supervisión de canalizaciones de Delta Live Tables.

Para obtener una referencia completa de la sintaxis de declaración de conjunto de datos de Delta Live Tables, consulte Referencia del lenguaje Python de Delta Live Tables o Referencia del lenguaje SQL de Delta Live Tables.

Nota:

Aunque puede incluir varias cláusulas en cualquier expectativa, solo Python admite la definición de acciones basadas en varias expectativas. Consulte Varias expectativas.

Conservación de registros no válidos

Use el operador expect cuando desee mantener registros que infrinjan la expectativa. Los registros que infringen la expectativa se agregan al conjunto de datos de destino, junto con los registros válidos:

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Anulación de registros no válidos

Use el operador expect or drop para evitar que se sigan procesando registros no válidos. Los registros que infringen la expectativa se anulan del conjunto de datos de destino:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Errores en registros no válidos

Cuando los registros no válidos son inaceptables, use el operador expect or fail para detener la ejecución inmediatamente, en cuanto se produzca un error en la validación de un registro. Si la operación es una actualización de tabla, el sistema revierte la transacción de forma atómica:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Cuando se produce un error en una canalización, debido a una infracción de expectativa, debe corregir el código de canalización para controlar correctamente los datos no válidos, antes de volver a ejecutar la canalización.

Las expectativas con error modifican el plan de consulta de Spark de las transformaciones, para realizar un seguimiento de la información necesaria para detectar y notificar las infracciones. Para muchas consultas, puede usar esta información para identificar qué registro de entrada ha dado lugar a la infracción. A continuación se muestra una excepción de ejemplo:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Varias expectativas

Puede definir expectativas con una o varias restricciones de calidad de datos, en canalizaciones de Python. Estos decoradores aceptan un diccionario de Python como argumento, donde la clave es el nombre de la expectativa y el valor es la restricción de la expectativa.

Use expect_all para especificar varias restricciones de calidad de datos, cuando los registros que no se validen deban incluirse en el conjunto de datos de destino:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Use expect_all_or_drop para especificar varias restricciones de calidad de datos, cuando los registros que no se validen deban anularse en el conjunto de datos de destino:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Use expect_all_or_fail para especificar varias restricciones de calidad de datos, cuando los registros que no se validen deban detener la ejecución de la canalización:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

También puede definir una colección de expectativas como una variable, y pasarla a una o varias consultas de la canalización:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Poner en cuarentena datos no válidos

En el ejemplo siguiente se usan expectativas en combinación con vistas y tablas temporales. Este patrón proporciona métricas para registros que pasan comprobaciones de expectativas durante las actualizaciones de canalización y proporciona una manera de procesar registros válidos y no válidos a través de diferentes rutas de acceso de bajada.

Nota:

En este ejemplo se leen los datos de ejemplo incluidos en los conjuntos de datos de Databricks. Dado que los conjuntos de datos de Databricks no son compatibles con una canalización que publique en Unity Catalog, este ejemplo solo funciona con una canalización configurada para publicar en el metastore de Hive. Sin embargo, este patrón también funciona con canalizaciones habilitadas para Unity Catalog, pero debe leer datos de ubicaciones externas. Para obtener más información sobre el uso del catálogo de Unity con Delta Live Tables, consulte Uso del catálogo de Unity con las canalizaciones de Delta Live Tables.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Validación de recuentos de filas a lo largo de las tablas

Puede agregar una tabla adicional a la canalización que defina una expectativa para comparar los recuentos de filas entre dos tablas dinámicas. Los resultados de esta expectativa aparecen en el registro de eventos y en la UI de Delta Live Tables. En este ejemplo siguiente se validan los mismos recuentos de filas entre las tablas tbla y tblb:

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Validación avanzada con las expectativas de Delta Live Tables

Puede definir tablas dinámicas mediante consultas de agregado y combinación y usar los resultados de esas consultas como parte de la comprobación de expectativas. Esto resulta útil si desea realizar comprobaciones complejas de calidad de datos, por ejemplo, para garantizar que una tabla derivada contenga todos los registros de la tabla de origen o para garantizar la igualdad de una columna numérica entre tablas. Puede usar la palabra clave TEMPORARY para evitar que estas tablas se publiquen en el esquema de destino.

En el ejemplo siguiente se valida que todos los registros esperados estén presentes en la tabla report:

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

En el ejemplo siguiente se usa un agregado para garantizar la unicidad de una clave principal:

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Hacer que las expectativas sean portables y reutilizables

Puede mantener las reglas de calidad de datos separadas de las implementaciones de canalización.

Databricks recomienda almacenar las reglas en una tabla Delta con cada regla clasificada por una etiqueta. Esta etiqueta se usa en las definiciones de conjunto de datos, para determinar qué reglas se deben aplicar.

En el ejemplo siguiente se crea una tabla denominada rules para mantener las reglas:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

En el siguiente ejemplo de Python se definen las expectativas de calidad de los datos, en función de las reglas almacenadas en la tabla rules. La función get_rules() lee las reglas de la tabla rules y devuelve un diccionario de Python, que contiene reglas que coinciden con el argumento tag pasado a la función. El diccionario se aplica en los decoradores @dlt.expect_all_*() para aplicar restricciones de calidad de datos. Por ejemplo, los registros con errores de las reglas etiquetadas con validity se anularán en la tabla raw_farmers_market:

Nota:

En este ejemplo se leen los datos de ejemplo incluidos en los conjuntos de datos de Databricks. Dado que los conjuntos de datos de Databricks no son compatibles con una canalización que publique en Unity Catalog, este ejemplo solo funciona con una canalización configurada para publicar en el metastore de Hive. Sin embargo, este patrón también funciona con canalizaciones habilitadas para Unity Catalog, pero debe leer datos de ubicaciones externas. Para obtener más información sobre el uso del catálogo de Unity con Delta Live Tables, consulte Uso del catálogo de Unity con las canalizaciones de Delta Live Tables.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

En lugar de crear una tabla denominada rules para mantener reglas, podría crear un módulo de Python para las reglas principales, por ejemplo, en un archivo denominado rules_module.py en la misma carpeta que el cuaderno:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

A continuación, modifique el cuaderno anterior importando el módulo y cambiando la función get_rules() para que lea del módulo en lugar de leer de la tabla rules:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )