Compartir a través de


Administración de la calidad de los datos con las expectativas de canalización

Utilizar expectativas para aplicar restricciones de calidad que validen los datos a medida que fluyen a través de tuberías de ETL. Las expectativas proporcionan una mayor comprensión de las métricas de calidad de los datos y permiten rechazar actualizaciones o eliminar registros cuando se detectan registros no válidos.

En este artículo se ofrece información general sobre las expectativas, incluidos ejemplos de sintaxis y opciones de comportamiento. Para obtener casos de uso más avanzados y procedimientos recomendados, consulte Recomendaciones de expectativas y patrones avanzados.

Gráfico de flujo de expectativas de canalizaciones declarativas de Lakeflow

¿Cuáles son las expectativas?

Las expectativas son cláusulas opcionales en la vista materializada de canalización, tabla de streaming o instrucciones de creación de vistas que aplican comprobaciones de calidad de datos en cada registro que pasa por una consulta. Las expectativas utilizan declaraciones Booleanas SQL estándar para especificar restricciones. Puedes combinar varias expectativas para un único conjunto de datos y establecer expectativas en todas las declaraciones de conjuntos de datos en una canalización.

En las secciones siguientes se presentan los tres componentes de una expectativa y se proporcionan ejemplos de sintaxis.

Nombre de la expectativa

Cada expectativa debe tener un nombre, que se usa como identificador para realizar un seguimiento y supervisar la expectativa. Elija un nombre que comunique las métricas que se validan. En el ejemplo siguiente se define la expectativa valid_customer_age para confirmar que la edad está entre 0 y 120 años:

Importante

Un nombre de expectativa debe ser único para un conjunto de datos determinado. Puedes reutilizar las expectativas en varios conjuntos de datos de una canalización. Consulta Expectativas portátiles y reutilizables.

Pitón

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

Restricción para evaluar

La cláusula constraint es una instrucción condicional de SQL que debe evaluarse como true o false para cada registro. La restricción contiene la lógica real para lo que se está validando. Cuando un registro no cumple esta condición, se activa la expectativa.

Las restricciones deben usar la sintaxis SQL válida y no pueden contener lo siguiente:

  • Funciones personalizadas de Python
  • Llamadas de servicio externo
  • Subconsultas que hacen referencia a otras tablas

A continuación se muestran ejemplos de restricciones que se pueden agregar a las instrucciones de creación del conjunto de datos:

Pitón

La sintaxis de una restricción en Python es:

@dlt.expect(<constraint-name>, <constraint-clause>)

Se pueden especificar varias restricciones:

@dlt.expect(<constraint-name>, <constraint-clause>)
@dlt.expect(<constraint2-name>, <constraint2-clause>)

Ejemplos:

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

La sintaxis de una restricción en SQL es:

CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> )

Varias restricciones deben estar separadas por una coma:

CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> ),
CONSTRAINT <constraint2-name> EXPECT ( <constraint2-clause> )

Ejemplos:

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0),
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

Acción sobre un registro no válido

Debe especificar una acción para determinar qué ocurre cuando un registro produce un error en la comprobación de validación. En la tabla siguiente se describen las acciones disponibles:

Acción Sintaxis SQL Sintaxis de Python Resultado
advertencia (predeterminado) EXPECT dlt.expect Los registros no válidos se escriben en el destino.
anular EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop Los registros no válidos se quitan antes de que los datos se escriban en el destino. El recuento de registros descartados se registra junto con otras métricas del conjunto de datos.
error EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail Los registros no válidos impiden que la actualización se realice correctamente. Se requiere la intervención manual antes de volver a procesar. Esta expectativa provoca un error de un único flujo y no hace que se produzcan errores en otros flujos de la canalización.

También puede implementar lógica avanzada para poner en cuarentena registros no válidos sin errores ni quitar datos. Consulte Poner en cuarentena registros no válidos.

Métricas de seguimiento de las expectativas

Puedes ver las métricas de seguimiento de las acciones warn o drop en la interfaz de usuario del pipeline. Dado que fail hace que se produzca un error en la actualización cuando se detecta un registro no válido, no se registran las métricas.

Para ver las métricas de expectativas, complete los pasos siguientes:

  1. En la barra lateral del área de trabajo de Azure Databricks, haga clic en Trabajos y canalizaciones.
  2. Haz clic en el Nombre de la canalización.
  3. Haga clic en un conjunto de datos con una expectativa definida.
  4. Selecciona la pestaña Calidad de datos en la barra lateral derecha.

Puede ver las métricas de calidad de datos a través de la consulta del registro de eventos de las Canalizaciones Declarativas de Lakeflow. Accede a Consultar la calidad de los datos desde el registro de eventos.

Conservación de registros no válidos

Conservar registros no válidos es el comportamiento predeterminado de las expectativas. Use el operador expect cuando desee conservar registros que infringen la expectativa, pero recopile métricas sobre cuántos registros pasan o producen un error en una restricción. Los registros que infringen la expectativa se agregan al conjunto de datos de destino, junto con los registros válidos:

Pitón

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Anulación de registros no válidos

Use el operador expect_or_drop para evitar que se sigan procesando registros no válidos. Los registros que infringen la expectativa se anulan del conjunto de datos de destino:

Pitón

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Error en registros no válidos

Cuando los registros no válidos son inaceptables, use el operador expect_or_fail para detener la ejecución inmediatamente, en cuanto se produzca un error en la validación de un registro. Si la operación es una actualización de tabla, el sistema revierte la transacción de forma atómica:

Pitón

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Importante

Si tiene varios flujos paralelos definidos en una canalización, el error de un único flujo no hace que se produzcan errores en otros flujos.

Gráfico de explicación del error en el flujo de canalizaciones declarativas de Lakeflow

Solución de problemas de actualizaciones fallidas respecto a las expectativas

Cuando se produce un error en una canalización, debido a una infracción de expectativa, debe corregir el código de canalización para controlar correctamente los datos no válidos, antes de volver a ejecutar la canalización.

Las expectativas configuradas para producir errores en las canalizaciones modifican el plan de consulta de Spark de las transformaciones para realizar un seguimiento de la información necesaria a fin de detectar y notificar infracciones. Puede usar esta información para identificar qué registro de entrada produjo la infracción de muchas consultas. Las canalizaciones declarativas de Lakeflow proporcionan un mensaje de error dedicado para notificar estas infracciones. Este es un ejemplo de un mensaje de error de incumplimiento de expectativas:

[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false

Gestión de múltiples expectativas

Nota:

Aunque SQL y Python admiten varias expectativas en un único conjunto de datos, solo Python permite agrupar varias expectativas y especificar acciones colectivas.

Canalizaciones declarativas de Lakeflow con múltiples diagramas de flujo de expectativas

Puede agrupar varias expectativas y especificar acciones colectivas mediante las funciones expect_all, expect_all_or_dropy expect_all_or_fail.

Estos decoradores aceptan un diccionario de Python como argumento, donde la clave es el nombre de la expectativa y el valor es la restricción de la expectativa. Puedes reutilizar el mismo conjunto de expectativas en varios conjuntos de datos de la canalización. A continuación se muestran ejemplos de cada uno de los operadores de python de expect_all:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset

Limitaciones

  • Dado que solo las tablas de streaming y las vistas materializadas admiten expectativas, las métricas de calidad de datos solo se admiten para estos tipos de objetos.
  • Las métricas de calidad de los datos no están disponibles cuando:
    • No se definen expectativas en una consulta.
    • Un flujo usa un operador que no admite expectativas.
    • El tipo de flujo, como los receptores de canalizaciones declarativas de Lakeflow, no admite expectativas.
    • No hay actualizaciones en la tabla de streaming asociada ni en la vista materializada de una ejecución de flujo determinada.
    • La configuración de la canalización no incluye los ajustes necesarios para capturar métricas, como pipelines.metrics.flowTimeReporter.enabled.
  • En algunos casos, es posible que un COMPLETED flujo no contenga métricas. En su lugar, las métricas se notifican en cada microlote en un evento flow_progress con el estado RUNNING.