Partager via


Surveiller les pipelines Delta Live Tables

Cet article explique comment utiliser les fonctionnalités intégrées de surveillance et d’observabilité des pipelines Delta Live Tables, notamment la traçabilité des données, l’historique des mises à jour et les rapports sur la qualité des données.

Vous pouvez passer en revue la plupart des données de surveillance manuellement via l’interface utilisateur des détails du pipeline. Certaines tâches sont plus faciles à accomplir en interrogeant les métadonnées du journal des événements. Consultez Qu’est-ce que le journal des événements Delta Live Tables ?.

Quels détails du pipeline sont disponibles dans l’interface utilisateur ?

Le graphe de pipeline s’affiche dès qu’une mise à jour d’un pipeline a démarré avec succès. Les flèches représentent les dépendances entre les jeux de données de votre pipeline. Par défaut, la page des détails du pipeline affiche la mise à jour la plus récente pour la table, mais vous pouvez sélectionner des mises à jour plus anciennes dans un menu déroulant.

Les détails affichés comprennent l’ID de pipeline, les bibliothèques sources, le coût de calcul, l’édition du produit et le canal configuré pour le pipeline.

Pour afficher une vue tabulaire des jeux de données, cliquez sur l’onglet Liste. La vue Liste vous permet de voir tous les jeux de données de votre pipeline représentés sous forme de ligne dans une table et est utile lorsque votre DAG de pipeline est trop volumineux pour visualiser dans la vue Graphe. Vous pouvez contrôler les jeux de données affichés dans la table à l’aide de plusieurs filtres tels que le nom, le type et l’état du jeu de données. Pour revenir à la visualisation DAG, cliquez sur Graphe.

L’utilisateur Exécuter en tant que est le propriétaire du pipeline, et les mises à jour de pipeline s’exécutent avec les autorisations de cet utilisateur. Pour modifier l’utilisateur run as, cliquez sur Autorisations et modifiez le propriétaire du pipeline.

Comment pouvez-vous afficher les détails du jeu de données ?

Le fait de cliquer sur un jeu de données dans le graphe de pipeline ou la liste de jeu de données affiche des détails sur le jeu de données. Les détails comprennent le schéma du jeu de données, les métriques de qualité des données et un lien vers le code source qui définissent le jeu de données.

Consulter l’historique des mises à jour

Pour afficher l’historique et l’état des mises à jour du pipeline, cliquez sur le menu déroulant Historique des mises à jour dans la barre supérieure.

Pour afficher le graphique, les détails et les événements d’une mise à jour, sélectionnez la mise à jour dans le menu déroulant. Pour revenir à la dernière mise à jour, cliquez sur Afficher la dernière mise à jour.

Obtenir des notifications pour les événements de pipeline

Pour recevoir des notifications en temps réel pour les événements de pipeline, tels que la réussite ou l’échec d’une mise à jour du pipeline, ajoutez Ajouter des notifications par courrier électronique pour les événements de pipeline lorsque vous créez ou modifiez un pipeline.

Qu’est-ce que le journal des événements Delta Live Tables ?

Le journal des événements Delta Live Tables contient toutes les informations ayant trait à un pipeline, y compris les journaux d’audit, les vérifications de la qualité des données, la progression du pipeline et la traçabilité des données. Vous pouvez utiliser le journal des événements pour suivre, comprendre et surveiller l’état de vos pipelines de données.

Vous pouvez examiner les entrées du journal des événements dans l’interface utilisateur Delta Live Tables, dans l’API Delta Live Tables ou en interrogeant directement le journal des événements. Cette section se concentre sur la façon d’interroger directement le journal des événements.

Vous pouvez également définir des actions personnalisées à exécuter lorsque des événements sont enregistrés, par exemple en envoyant des alertes, avec des hooks d’événements.

Schéma du journal des événements

Le tableau suivant décrit le schéma du journal des événements. Certains de ces champs contiennent des données JSON qui nécessitent une analyse pour effectuer des requêtes, comme le champ details. Azure Databricks prend en charge l’opérateur : pour analyser les champs JSON. Consultez  : opérateur (signe deux-points).

Champ Description
id Identificateur unique de l’enregistrement de journal des événements.
sequence Document JSON contenant les métadonnées d’identification et d’ordre des événements.
origin Document JSON contenant des métadonnées pour l’origine de l’événement, par exemple, le fournisseur de cloud, la région du fournisseur de cloud, user_id, pipeline_id ou pipeline_type pour montrer où le pipeline a été créé, soit DBSQL, soit WORKSPACE.
timestamp Heure à laquelle l’événement a été enregistré.
message Message explicite décrivant l’événement.
level Type d’événement, par exemple INFOWARN, ERROR ou METRICS.
error Détails décrivant l’erreur qui est survenue, le cas échéant.
details Document JSON contenant les détails structurés de l’événement. Il s’agit du champ principal utilisé pour l’analyse des événements.
event_type Le type d'événement.
maturity_level Stabilité du schéma d’événement. Les valeurs possibles sont les suivantes :

- STABLE : le schéma est stable et ne changera pas.
- NULL : le schéma est stable et ne changera pas. La valeur peut être NULL si l’enregistrement a été créé avant l’ajout du champ maturity_level (version 2022.37).
- EVOLVING : le schéma n’est pas stable et peut changer.
- DEPRECATED : le schéma est déprécié et le runtime Delta Live Tables peut cesser de produire cet événement à tout moment.

Interrogation du journal des événements

L’emplacement du journal des événements et l’interface pour interroger le journal des événements varient selon que votre pipeline est configuré pour utiliser le metastore Hive ou Unity Catalog.

Metastore Hive

Si votre pipeline publie des tables dans le metastore Hive, le journal des événements est stocké dans /system/events sous l’emplacement storage. Par exemple, si vous avez configuré le paramètre storage de votre pipeline sur /Users/username/data, le journal des événements est stocké dans le chemin /Users/username/data/system/events dans DBFS.

Si vous n’avez pas configuré le paramètre storage, l’emplacement du journal des événements par défaut est /pipelines/<pipeline-id>/system/events dans DBFS. Par exemple, si l’ID de votre pipeline est 91de5e48-35ed-11ec-8d3d-0242ac130003, l’emplacement de stockage est /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events.

Vous pouvez créer une vue pour simplifier l’interrogation du journal des événements. L’exemple suivant crée une vue temporaire appelée event_log_raw. Cette vue est utilisée dans les exemples de requêtes de journal des événements compris dans cet article :

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

Remplacez <event-log-path> par l’emplacement du journal des événements.

Chaque instance d’une exécution de pipeline est appelée mise à jour. Vous souhaitez souvent extraire des informations pour la mise à jour la plus récente. Exécutez la requête suivante pour rechercher l’identificateur de la dernière mise à jour et l’enregistrer dans la vue temporaire latest_update_id. Cette vue est utilisée dans les exemples de requêtes de journal des événements compris dans cet article :

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Vous pouvez interroger le journal des événements dans un notebook Azure Databricks ou dans l’éditeur SQL. Utilisez un notebook ou l’éditeur SQL pour exécuter les exemples de requêtes de journal des événements.

Unity Catalog

Si votre pipeline publie des tables dans Unity Catalog, vous devez utiliser la event_log fonction de table (TVF) pour récupérer le journal des événements du pipeline. Vous récupérez le journal des événements d’un pipeline en passant l’ID de pipeline ou un nom de table à la fonction table. Par exemple, pour récupérer les enregistrements du journal des événements pour le pipeline avec l’ID 04c78631-3dd7-4856-b2a6-7d84e9b2638b :

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

Pour récupérer les enregistrements du journal des événements pour le pipeline qui a créé ou possède la table my_catalog.my_schema.table1 :

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

Pour appeler la fonction table, vous devez utiliser un cluster partagé ou un entrepôt SQL. Par exemple, vous pouvez utiliser un notebook attaché à un cluster partagé ou utiliser l’éditeur SQL connecté à un entrepôt SQL.

Pour simplifier les événements d’interrogation d’un pipeline, le propriétaire du pipeline peut créer une vue sur la fonction table event_log. L’exemple suivant crée une vue sur le journal des événements d’un pipeline. Cette vue est utilisée dans les exemples de requêtes de journal des événements compris dans cet article.

Remarque

La fonction table event_log peut être appelée uniquement par le propriétaire du pipeline et une vue créée sur la fonction table event_log peut être interrogée uniquement par le propriétaire du pipeline. La vue ne peut pas être partagée avec d’autres utilisateurs.

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

Remplacez <pipeline-ID> par l’identificateur unique du pipeline Delta Live Tables. Vous trouverez l’ID dans le panneau Détails du pipeline dans l’interface utilisateur Delta Live Tables.

Chaque instance d’une exécution de pipeline est appelée mise à jour. Vous souhaitez souvent extraire des informations pour la mise à jour la plus récente. Exécutez la requête suivante pour rechercher l’identificateur de la dernière mise à jour et l’enregistrer dans la vue temporaire latest_update_id. Cette vue est utilisée dans les exemples de requêtes de journal des événements compris dans cet article :

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Interroger les informations de traçabilité du journal des événements

Les événements contenant des informations de traçabilité ont le type d’événement flow_definition. L’objet details:flow_definition contient output_dataset et input_datasets qui définissent chaque relation dans le graphe.

Vous pouvez utiliser la requête suivante pour extraire les jeux de données d’entrée et de sortie afin de voir les informations de traçabilité :

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
customers null
sales_orders_raw null
sales_orders_cleaned ["customers", "sales_orders_raw"]
sales_order_in_la ["sales_orders_cleaned"]

Interroger la qualité des données du journal des événements

Si vous définissez des attentes sur les jeux de données dans votre pipeline, les métriques de qualité des données sont stockées dans l’objet details:flow_progress.data_quality.expectations. Les événements contenant des informations sur la qualité des données ont le type d’événement flow_progress. L’exemple suivant interroge les métriques de qualité des données relatives à la dernière mise à jour du pipeline :

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
sales_orders_cleaned valid_order_number 4083 0

Surveiller le backlog de données en interrogeant le journal des événements

Delta Live Tables suit la quantité de données présentes dans le backlog de l’objet details:flow_progress.metrics.backlog_bytes. Les événements contenant les métriques de backlog ont le type d’événement flow_progress. L’exemple suivant interroge les métriques du backlog relatives à la dernière mise à jour du pipeline :

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

Notes

Les métriques du backlog peuvent ne pas être disponibles selon le type de source de données du pipeline et la version de Databricks Runtime.

Surveiller les événements de mise à l’échelle automatique améliorée du journal des événements

Le journal des événements capture les redimensionnements de cluster lorsque la mise à l’échelle automatique améliorée est activée dans vos pipelines. Les événements contenant des informations sur la mise à l’échelle automatique améliorée dont de type autoscale. Les informations de demande de redimensionnement de cluster sont stockées dans l’objet details:autoscale. L’exemple suivant interroge les demandes de redimensionnement de cluster de mise à l’échelle automatique améliorée sur la dernière mise à jour du pipeline :

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Surveiller l’utilisation des ressources de calcul

Des événements cluster_resources fournissent des métriques sur le nombre d’emplacements de tâches dans le cluster, le nombre d’emplacements de tâches utilisés et le nombre de tâches en attente de planification.

Lorsque la mise à l’échelle automatique améliorée est activée, les événements cluster_resources contiennent également des métriques pour l’algorithme de mise à l’échelle automatique, notamment latest_requested_num_executors et optimal_num_executors. Les événements montrent également l’état de l’algorithme sous la forme d’états différents tels que CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS et BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Ces informations peuvent être consultées conjointement avec les événements de mise à l’échelle automatique pour fournir une vue d’ensemble de la mise à l’échelle automatique améliorée.

L’exemple suivant interroge l’historique de taille de la file d’attente des tâches pour la dernière mise à jour du pipeline :

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

L’exemple suivant interroge l’historique d’utilisation de la dernière mise à jour du pipeline :

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

L’exemple suivant interroge l’historique du nombre d’exécuteurs, accompagné de métriques disponibles uniquement pour des pipelines de mise à l’échelle automatique améliorés, y compris le nombre d’exécuteurs demandés par l’algorithme dans la dernière requête, le nombre optimal d’exécuteurs recommandé par l’algorithme en fonction des métriques les plus récentes et l’état de l’algorithme de mise à l’échelle automatique :

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Auditer les pipelines Delta Live Tables

Vous pouvez utiliser les enregistrements du journal des événements Delta Live Tables et d’autres journaux d’audit Azure Databricks pour obtenir une image complète de la façon dont les données sont mises à jour dans Delta Live Tables.

Delta Live Tables utilise les informations d’identification du propriétaire du pipeline pour exécuter les mises à jour. Vous pouvez modifier les informations d’identification utilisées en mettant à jour le propriétaire du pipeline. Delta Live Tables enregistre les actions de l’utilisateur sur le pipeline, notamment la création du pipeline, les modifications apportées à la configuration et le déclenchement des mises à jour.

Consultez Événements Unity Catalog pour obtenir des informations de référence sur les événements d’audit Unity Catalog.

Interroger les actions utilisateur dans le journal des événements

Vous pouvez utiliser le journal des événements pour auditer des événements, comme les actions utilisateur. Les événements contenant des informations sur les actions utilisateur ont le type d’événement user_action.

Les informations relatives à chaque action sont stockées dans l’objet user_action dans le champ details. Utilisez la requête suivante pour créer un journal d’audit des événements utilisateur. Pour créer la vue event_log_raw utilisée dans cette requête, consultez Interrogation du journal des événements.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

Informations sur le runtime

Vous pouvez afficher des informations sur le runtime pour une mise à jour de pipeline, par exemple, la version de Databricks Runtime pour la mise à jour :

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11.0