Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Utilizar expectativas para aplicar restricciones de calidad que validen los datos a medida que fluyen a través de tuberías de ETL. Las expectativas proporcionan una mayor comprensión de las métricas de calidad de los datos y permiten rechazar actualizaciones o eliminar registros cuando se detectan registros no válidos.
En este artículo se ofrece información general sobre las expectativas, incluidos ejemplos de sintaxis y opciones de comportamiento. Para obtener casos de uso más avanzados y procedimientos recomendados, consulte Recomendaciones de expectativas y patrones avanzados.
¿Cuáles son las expectativas?
Las expectativas son cláusulas opcionales en la vista materializada de canalización, tabla de streaming o instrucciones de creación de vistas que aplican comprobaciones de calidad de datos en cada registro que pasa por una consulta. Las expectativas utilizan declaraciones Booleanas SQL estándar para especificar restricciones. Puedes combinar varias expectativas para un único conjunto de datos y establecer expectativas en todas las declaraciones de conjuntos de datos en una canalización.
En las secciones siguientes se presentan los tres componentes de una expectativa y se proporcionan ejemplos de sintaxis.
Nombre de la expectativa
Cada expectativa debe tener un nombre, que se usa como identificador para realizar un seguimiento y supervisar la expectativa. Elija un nombre que comunique las métricas que se validan. En el ejemplo siguiente se define la expectativa valid_customer_age
para confirmar que la edad está entre 0 y 120 años:
Importante
Un nombre de expectativa debe ser único para un conjunto de datos determinado. Puedes reutilizar las expectativas en varios conjuntos de datos de una canalización. Consulta Expectativas portátiles y reutilizables.
Pitón
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
Restricción para evaluar
La cláusula constraint es una instrucción condicional de SQL que debe evaluarse como true o false para cada registro. La restricción contiene la lógica real para lo que se está validando. Cuando un registro no cumple esta condición, se activa la expectativa.
Las restricciones deben usar la sintaxis SQL válida y no pueden contener lo siguiente:
- Funciones personalizadas de Python
- Llamadas de servicio externo
- Subconsultas que hacen referencia a otras tablas
A continuación se muestran ejemplos de restricciones que se pueden agregar a las instrucciones de creación del conjunto de datos:
Pitón
La sintaxis de una restricción en Python es:
@dlt.expect(<constraint-name>, <constraint-clause>)
Se pueden especificar varias restricciones:
@dlt.expect(<constraint-name>, <constraint-clause>)
@dlt.expect(<constraint2-name>, <constraint2-clause>)
Ejemplos:
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")
# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
La sintaxis de una restricción en SQL es:
CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> )
Varias restricciones deben estar separadas por una coma:
CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> ),
CONSTRAINT <constraint2-name> EXPECT ( <constraint2-clause> )
Ejemplos:
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0),
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
Acción sobre un registro no válido
Debe especificar una acción para determinar qué ocurre cuando un registro produce un error en la comprobación de validación. En la tabla siguiente se describen las acciones disponibles:
Acción | Sintaxis SQL | Sintaxis de Python | Resultado |
---|---|---|---|
advertencia (predeterminado) | EXPECT |
dlt.expect |
Los registros no válidos se escriben en el destino. |
anular | EXPECT ... ON VIOLATION DROP ROW |
dlt.expect_or_drop |
Los registros no válidos se quitan antes de que los datos se escriban en el destino. El recuento de registros descartados se registra junto con otras métricas del conjunto de datos. |
error | EXPECT ... ON VIOLATION FAIL UPDATE |
dlt.expect_or_fail |
Los registros no válidos impiden que la actualización se realice correctamente. Se requiere la intervención manual antes de volver a procesar. Esta expectativa provoca un error de un único flujo y no hace que se produzcan errores en otros flujos de la canalización. |
También puede implementar lógica avanzada para poner en cuarentena registros no válidos sin errores ni quitar datos. Consulte Poner en cuarentena registros no válidos.
Métricas de seguimiento de las expectativas
Puedes ver las métricas de seguimiento de las acciones warn
o drop
en la interfaz de usuario del pipeline. Dado que fail
hace que se produzca un error en la actualización cuando se detecta un registro no válido, no se registran las métricas.
Para ver las métricas de expectativas, complete los pasos siguientes:
- En la barra lateral del área de trabajo de Azure Databricks, haga clic en Trabajos y canalizaciones.
- Haz clic en el Nombre de la canalización.
- Haga clic en un conjunto de datos con una expectativa definida.
- Selecciona la pestaña Calidad de datos en la barra lateral derecha.
Puede ver las métricas de calidad de datos a través de la consulta del registro de eventos de las Canalizaciones Declarativas de Lakeflow. Accede a Consultar la calidad de los datos desde el registro de eventos.
Conservación de registros no válidos
Conservar registros no válidos es el comportamiento predeterminado de las expectativas. Use el operador expect
cuando desee conservar registros que infringen la expectativa, pero recopile métricas sobre cuántos registros pasan o producen un error en una restricción. Los registros que infringen la expectativa se agregan al conjunto de datos de destino, junto con los registros válidos:
Pitón
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Anulación de registros no válidos
Use el operador expect_or_drop
para evitar que se sigan procesando registros no válidos. Los registros que infringen la expectativa se anulan del conjunto de datos de destino:
Pitón
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Error en registros no válidos
Cuando los registros no válidos son inaceptables, use el operador expect_or_fail
para detener la ejecución inmediatamente, en cuanto se produzca un error en la validación de un registro. Si la operación es una actualización de tabla, el sistema revierte la transacción de forma atómica:
Pitón
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Importante
Si tiene varios flujos paralelos definidos en una canalización, el error de un único flujo no hace que se produzcan errores en otros flujos.
Solución de problemas de actualizaciones fallidas respecto a las expectativas
Cuando se produce un error en una canalización, debido a una infracción de expectativa, debe corregir el código de canalización para controlar correctamente los datos no válidos, antes de volver a ejecutar la canalización.
Las expectativas configuradas para producir errores en las canalizaciones modifican el plan de consulta de Spark de las transformaciones para realizar un seguimiento de la información necesaria a fin de detectar y notificar infracciones. Puede usar esta información para identificar qué registro de entrada produjo la infracción de muchas consultas. Las canalizaciones declarativas de Lakeflow proporcionan un mensaje de error dedicado para notificar estas infracciones. Este es un ejemplo de un mensaje de error de incumplimiento de expectativas:
[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false
Gestión de múltiples expectativas
Nota:
Aunque SQL y Python admiten varias expectativas en un único conjunto de datos, solo Python permite agrupar varias expectativas y especificar acciones colectivas.
Puede agrupar varias expectativas y especificar acciones colectivas mediante las funciones expect_all
, expect_all_or_drop
y expect_all_or_fail
.
Estos decoradores aceptan un diccionario de Python como argumento, donde la clave es el nombre de la expectativa y el valor es la restricción de la expectativa. Puedes reutilizar el mismo conjunto de expectativas en varios conjuntos de datos de la canalización. A continuación se muestran ejemplos de cada uno de los operadores de python de expect_all
:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset
Limitaciones
- Dado que solo las tablas de streaming y las vistas materializadas admiten expectativas, las métricas de calidad de datos solo se admiten para estos tipos de objetos.
- Las métricas de calidad de los datos no están disponibles cuando:
- No se definen expectativas en una consulta.
- Un flujo usa un operador que no admite expectativas.
- El tipo de flujo, como los receptores de canalizaciones declarativas de Lakeflow, no admite expectativas.
- No hay actualizaciones en la tabla de streaming asociada ni en la vista materializada de una ejecución de flujo determinada.
- La configuración de la canalización no incluye los ajustes necesarios para capturar métricas, como
pipelines.metrics.flowTimeReporter.enabled
.
- En algunos casos, es posible que un
COMPLETED
flujo no contenga métricas. En su lugar, las métricas se notifican en cada microlote en un eventoflow_progress
con el estadoRUNNING
.