Partager via


Gérer la qualité des données avec les attentes de pipeline

Utilisez des attentes pour appliquer des contraintes de qualité qui valident les données au fur et à mesure qu’elles transitent par des pipelines ETL. Les attentes fournissent un meilleur aperçu des métriques de qualité des données et vous permettent d’échouer des mises à jour ou de supprimer des enregistrements lors de la détection d’enregistrements non valides.

Cet article présente une vue d’ensemble des attentes, notamment des exemples de syntaxe et des options de comportement. Pour connaître les cas d’usage plus avancés et les meilleures pratiques recommandées, consultez recommandations d’attente et les modèles avancés.

Diagramme de flux des attentes des pipelines déclaratifs Lakeflow

Quelles sont les attentes ?

Les attentes sont des clauses facultatives dans la vue matérialisée du pipeline, la table de diffusion en continu ou les instructions de création de vues qui appliquent des vérifications de qualité des données sur chaque enregistrement passant par une requête. Les attentes utilisent des instructions booléennes SQL standard pour spécifier des contraintes. Vous pouvez combiner plusieurs attentes pour un jeu de données unique et définir des attentes dans toutes les déclarations de jeu de données dans un pipeline.

Les sections suivantes présentent les trois composants d’une attente et fournissent des exemples de syntaxe.

Nom de l’attente

Chaque attente doit avoir un nom, qui est utilisé comme identificateur pour suivre et surveiller l’attente. Choisissez un nom qui communique les métriques validées. L’exemple suivant définit l’attente valid_customer_age pour confirmer que l’âge est compris entre 0 et 120 ans :

Importante

Un nom d’attente doit être unique pour un jeu de données donné. Vous pouvez réutiliser les attentes dans plusieurs jeux de données d’un pipeline. Consultez Attentes portables et réutilisables.

Python

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

Contrainte à évaluer

La clause de contrainte est une instruction conditionnelle SQL qui doit évaluer true ou false pour chaque enregistrement. La contrainte contient la logique réelle pour ce qui est validé. Lorsqu’un enregistrement échoue à satisfaire cette condition, l’attente est déclenchée.

Les contraintes doivent utiliser la syntaxe SQL valide et ne peuvent pas contenir les éléments suivants :

  • Fonctions Python personnalisées
  • Appels de service externe
  • Sous-requêtes référençant d’autres tables

Voici des exemples de contraintes qui peuvent être ajoutées aux instructions de création de jeu de données :

Python

La syntaxe d’une contrainte dans Python est la suivante :

@dlt.expect(<constraint-name>, <constraint-clause>)

Plusieurs contraintes peuvent être spécifiées :

@dlt.expect(<constraint-name>, <constraint-clause>)
@dlt.expect(<constraint2-name>, <constraint2-clause>)

Exemples:

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

La syntaxe d’une contrainte dans SQL est la suivante :

CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> )

Plusieurs contraintes doivent être séparées par une virgule :

CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> ),
CONSTRAINT <constraint2-name> EXPECT ( <constraint2-clause> )

Exemples:

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0),
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

Action sur un enregistrement non valide

Vous devez spécifier une action pour déterminer ce qui se passe lorsqu’un enregistrement échoue à la vérification de validation. Le tableau suivant décrit les actions disponibles :

Action Syntaxe SQL Syntaxe Python Résultat
avertir (par défaut) EXPECT dlt.expect Les enregistrements non valides sont écrits dans la cible.
supprimer EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop Les enregistrements non valides sont supprimés avant l’écriture des données vers la destination. Le nombre d’enregistrements supprimés est enregistré en même temps que d’autres métriques de jeu de données.
échouer EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail Les enregistrements non valides empêchent la mise à jour de réussir. L’intervention manuelle est nécessaire avant le retraitement. Cette attente entraîne une défaillance d’un flux unique et n’entraîne pas l’échec d’autres flux dans votre pipeline.

Vous pouvez également implémenter une logique avancée pour mettre en quarantaine les enregistrements non valides sans échec ou suppression de données. Consultez Enregistrements non valides des mises en quarantaine.

métriques de suivi des attentes

Vous pouvez voir les métriques de suivi pour warn ou drop actions à partir de l’interface utilisateur du pipeline. Étant donné que fail provoque l’échec de la mise à jour lorsqu’un enregistrement non valide est détecté, les métriques ne sont pas enregistrées.

Pour afficher les métriques attendues, effectuez les étapes suivantes :

  1. Dans la barre latérale de votre espace de travail Azure Databricks, cliquez sur Travaux & Pipelines.
  2. Cliquez sur le Nom de votre pipeline.
  3. Cliquez sur un jeu de données avec une attente définie.
  4. Sélectionnez l’onglet Qualité des données dans la barre latérale droite.

Vous pouvez afficher les métriques de qualité des données en interrogeant le journal des événements des pipelines déclaratifs de Lakeflow. Consultez Interroger la qualité des données du journal des événements.

Conserver les enregistrements non valides

La conservation d’enregistrements non valides est le comportement par défaut des attentes. Utilisez l’opérateur expect lorsque vous souhaitez conserver des enregistrements qui violent l'attente, mais collecter des métriques sur le nombre d’enregistrements qui passent ou échouent à une contrainte. Les enregistrements qui violent l’attente sont ajoutés au jeu de données cible avec des enregistrements valides :

Python

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Supprimer les enregistrements non valides

Utilisez l’opérateur expect_or_drop pour empêcher le traitement ultérieur des enregistrements non valides. Les enregistrements qui violent l’attente sont supprimés du jeu de données cible :

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Échec sur les enregistrements non valides

Lorsque des enregistrements non valides sont inacceptables, utilisez l’opérateur expect_or_fail pour arrêter immédiatement l’exécution lorsque la validation de l’enregistrement échoue. Si l’opération est une mise à jour de table, le système restaure atomiquement la transaction :

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Importante

Si vous avez plusieurs flux parallèles définis dans un pipeline, l’échec d’un flux unique n’entraîne pas l’échec d’autres flux.

Graphique d'explication de l'échec des flux de pipelines déclaratifs Lakeflow

Résolution des problèmes liés aux mises à jour ayant échoué par rapport aux attentes

Lorsqu’un pipeline échoue en raison d’une violation des attentes, vous devez corriger le code du pipeline pour gérer correctement les données non valides avant de réexécuter le pipeline.

Les attentes configurées pour faire échouer les pipelines modifient le plan de requête Spark de vos transformations pour suivre les informations nécessaires à la détection et au signalement des violations. Vous pouvez utiliser ces informations pour identifier l’enregistrement d’entrée qui a entraîné la violation pour de nombreuses requêtes. Lakeflow Declarative Pipelines fournit un message d’erreur dédié pour signaler ces violations. Voici un exemple de message d'erreur de violation des attentes :

[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false

Gestion de plusieurs attentes

Remarque

Bien que SQL et Python prennent en charge plusieurs attentes dans un jeu de données unique, seul Python vous permet de regrouper plusieurs attentes et de spécifier des actions collectives.

Pipelines déclaratifs Lakeflow avec plusieurs attentes pour le graphe de flux

Vous pouvez regrouper plusieurs attentes et spécifier des actions collectives à l’aide des fonctions expect_all, expect_all_or_dropet expect_all_or_fail.

Ces décorateurs acceptent un dictionnaire Python comme argument, où la clé est le nom de l'expectation et la valeur est la contrainte de l'expectation. Vous pouvez réutiliser le même ensemble d’attentes dans plusieurs jeux de données de votre pipeline. Les exemples suivants illustrent chacun des opérateurs Python expect_all :

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset

Limitations de

  • Étant donné que seules les tables de diffusion en continu et les vues matérialisées prennent en charge les attentes, les métriques de qualité des données ne sont prises en charge que pour ces types d’objets.
  • Les métriques de qualité des données ne sont pas disponibles lorsque :
    • Aucune attente n’est définie sur une requête.
    • Un flux utilise un opérateur qui ne prend pas en charge les attentes.
    • Le type de flux, tel que les récepteurs de pipelines déclaratifs Lakeflow, ne prend pas en charge les attentes.
    • Il n'y a aucune mise à jour de la table de streaming ou de la vue matérialisée associée pour une exécution de flux donnée.
    • La configuration du pipeline n’inclut pas les paramètres nécessaires pour capturer les métriques, telles que pipelines.metrics.flowTimeReporter.enabled.
  • Dans certains cas, un COMPLETED flux peut ne pas contenir de métriques. Au lieu de cela, les métriques sont signalées dans chaque micro-lot dans un événement flow_progress avec le statut RUNNING.