Supervisión de canalizaciones de Delta Live Tables

En este artículo se describe cómo usar características integradas para supervisar y observar canalizaciones de Delta Live Tables, incluyendo el linaje de datos, el historial de actualizaciones y los informes de calidad de los datos.

Es posible revisar la mayoría de datos de supervisión manualmente a través de la interfaz de usuario de detalles de la canalización. Algunas tareas son más fáciles de realizar consultando los metadatos del registro de eventos. Consulte ¿Qué es el registro de eventos de Delta Live Tables?.

¿Qué detalles de canalización están disponibles en la interfaz de usuario?

El gráfico de canalización se muestra en cuanto se ha iniciado correctamente una actualización de una canalización. Las flechas representan dependencias entre conjuntos de datos de la canalización. De forma predeterminada, la página de detalles de la canalización muestra la actualización más reciente de la tabla, pero puede seleccionar actualizaciones anteriores desde un menú desplegable.

Los detalles mostrados incluyen el identificador de canalización, las bibliotecas de origen, el costo de proceso, la edición del producto y el canal configurado para la canalización.

Para ver una vista tabular de conjuntos de datos, haga clic en la pestaña Lista. La vista lista permite ver todos los conjuntos de datos de la canalización representados como una fila de una tabla y resulta útil cuando el DAG de canalización es demasiado grande para visualizar en la vista Grafo. Puede controlar los conjuntos de datos que se muestran en la tabla mediante varios filtros, como el nombre, el tipo y el estado del conjunto de datos. Para volver a la visualización de DAG, haga clic en Grafo.

El usuario de Ejecutar como es el propietario de la canalización y las actualizaciones de canalización se ejecutan con los permisos de este usuario. Para cambiar el usuario run as, haga clic en Permisos y cambie el propietario de la canalización.

¿Cómo se pueden ver los detalles del conjunto de datos?

Al hacer clic en un conjunto de datos en el Grafo de canalización o en la lista de conjuntos de datos se muestran detalles sobre el conjunto de datos. Los detalles incluyen el esquema del conjunto de datos, las métricas de calidad de los datos y un vínculo al código fuente que define el conjunto de datos.

Visualización del historial de actualizaciones

Para ver el historial y el estado de las actualizaciones de canalización, haga clic en el menú desplegable Historial de actualizaciones de la barra superior.

Para ver el gráfico, los detalles y los eventos de una actualización, seleccione la actualización en el menú desplegable. Para volver a la actualización más reciente, haga clic en Mostrar la actualización más reciente.

Obtención de notificaciones para eventos de canalización

Para recibir notificaciones en tiempo real para los eventos de canalización como la finalización correcta de una actualización de canalización o un error de una actualización de canalización, agregue Agregar notificaciones por correo electrónico para los eventos de canalización al crear o editar una canalización.

¿Qué es el registro de eventos de Delta Live Tables?

El registro de eventos de Delta Live Tables contiene toda la información relacionada con una canalización, incluidos los registros de auditoría, las comprobaciones de la calidad de los datos, el progreso de la canalización y el linaje de datos. Puede usar el registro de eventos para realizar un seguimiento, comprender y supervisar el estado de las canalizaciones de datos.

Puede ver las entradas del registro de eventos en la interfaz de usuario o la API de Delta Live Tables, o bien mediante la consulta directa del registro de eventos. Este artículo se centra en consultar el registro de eventos directamente.

También puede definir acciones personalizadas para ejecutarse cuando se registran eventos, por ejemplo, enviar alertas, con enlaces de eventos.

Esquema del registro de eventos

En la tabla siguiente se describe el esquema del registro de eventos. Algunos de estos campos contienen datos JSON que requieren análisis para realizar algunas consultas, como el campo details. Azure Databricks admite el operador : para analizar campos JSON. Consulte : operador (signo de dos puntos).

Campo Descripción
id Un identificador único para el registro de eventos.
sequence Documento JSON que contiene metadatos para identificar y ordenar los eventos.
origin Un documento JSON que contiene metadatos para el origen del evento, por ejemplo, el proveedor de nube, la región del proveedor de nube, user_id, pipeline_id o pipeline_type para mostrar dónde se creó la canalización, ya sea DBSQL o WORKSPACE.
timestamp Hora a la que se registró el evento.
message Mensaje en lenguaje natural que describe el evento.
level Tipo de evento. Por ejemplo: INFO, WARN, ERROR o METRICS.
error Detalles que describen un error, si se ha producido.
details Documento JSON que contiene los detalles estructurados del evento. Este es el campo principal que se usa para analizar eventos.
event_type El tipo de evento.
maturity_level Estabilidad del esquema de eventos. Los valores posibles son:

* STABLE: el esquema es estable y no cambiará.
* NULL: el esquema es estable y no cambiará. El valor puede ser NULL si el registro se creó antes de agregar el maturity_level campo (versión 2022.37).
* EVOLVING: el esquema no es estable y puede cambiar.
* DEPRECATED: el esquema está en desuso y el tiempo de ejecución de Delta Live Tables puede dejar de producir este evento en cualquier momento.

Consulta del registro de eventos

La ubicación del registro de eventos y la interfaz para consultarlo dependen de si la canalización está configurada para usar el metastore de Hive o el catálogo de Unity.

Metastore de Hive

Si la canalización publica tablas en el metastore de Hive, el registro de eventos se almacenará en /system/events, bajo la ubicación de storage. Por ejemplo, si ha configurado el valor storage de la canalización como /Users/username/data, el registro de eventos se almacena en la ruta de acceso /Users/username/data/system/events en DBFS.

Si no ha configurado el valor storage, la ubicación predeterminada del registro de eventos es /pipelines/<pipeline-id>/system/events en DBFS. Por ejemplo, si el identificador de la canalización es 91de5e48-35ed-11ec-8d3d-0242ac130003, la ubicación de almacenamiento es /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events.

Puede crear una vista para simplificar la consulta del registro de eventos. En el ejemplo siguiente, se crea una vista temporal llamada event_log_raw. Esta vista se usa en las consultas de registro de eventos de ejemplo incluidas en este artículo:

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

Reemplace <event-log-path> por la ubicación del registro de eventos.

Cada instancia de una ejecución de canalización se denomina actualización. A menudo, querrá extraer información de la actualización más reciente. Ejecute las consultas siguientes para buscar el identificador de la actualización más reciente y guardarlo en la vista temporal latest_update_id. Esta vista se usa en las consultas de registro de eventos de ejemplo incluidas en este artículo:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Puede consultar el registro de eventos en un cuaderno de Azure Databricks o en el editor de SQL. Use un cuaderno o el editor de SQL para ejecutar las consultas de registro de eventos de ejemplo.

Catálogo de Unity

Si la canalización publica tablas en Unity Catalog, será necesario usar la event_logfunción con valores de tabla (TVF) para capturar el registro de eventos de la canalización. Para recuperar el registro de eventos de una canalización, pase el identificador de canalización o un nombre de tabla a la TVF. Por ejemplo, para recuperar los registros de eventos de la canalización con el identificador 04c78631-3dd7-4856-b2a6-7d84e9b2638b:

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

Para recuperar los registros de eventos de la canalización que creó o posee la tabla my_catalog.my_schema.table1:

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

Para llamar a la TVF, deberá usar un clúster compartido o una instancia de SQL Warehouse. Por ejemplo, podría usar un cuaderno adjunto a un clúster compartido o usar el editor de SQL conectado a una instancia de SQL Warehouse.

Para simplificar la consulta de eventos de una canalización, el propietario de la canalización puede crear una vista a través de la TVF event_log. En el ejemplo siguiente, se crea una vista sobre el registro de eventos de una canalización. Esta vista se usa en las consultas de registro de eventos de ejemplo incluidas en este artículo.

Nota:

Solo el propietario de la canalización puede llamar a la TVF event_log y solo el propietario de la canalización puede consultar una vista creada a través de la TVF event_log. La vista no se puede compartir con otros usuarios.

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

Reemplace <pipeline-ID> por el identificador único de la canalización de Delta Live Tables. Es posible encontrar el identificador en el panel Detalles de la canalización, en la interfaz de usuario de Delta Live Tables.

Cada instancia de una ejecución de canalización se denomina actualización. A menudo, querrá extraer información de la actualización más reciente. Ejecute las consultas siguientes para buscar el identificador de la actualización más reciente y guardarlo en la vista temporal latest_update_id. Esta vista se usa en las consultas de registro de eventos de ejemplo incluidas en este artículo:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Consultar la información de linaje desde el registro de eventos

Los eventos que contienen información sobre el linaje tienen el tipo de evento flow_definition. El objeto details:flow_definition contiene output_dataset y input_datasets definiendo cada relación en el gráfico.

Es posible usar la siguiente consulta para extraer los conjuntos de datos de entrada y salida para ver la información de linaje:

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
1 customers null
2 sales_orders_raw null
3 sales_orders_cleaned ["customers", "sales_orders_raw"]
4 sales_order_in_la ["sales_orders_cleaned"]

Consultar la calidad de los datos desde el registro de eventos

Si define expectativas en los conjuntos de datos de la canalización, las métricas de calidad de datos se almacenarán en el objeto details:flow_progress.data_quality.expectations. Los eventos que contienen información sobre la calidad de los datos tienen el tipo de evento flow_progress. En el ejemplo siguiente se consultan las métricas de calidad de los datos de la última actualización de la canalización:

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
1 sales_orders_cleaned valid_order_number 4083 0

Supervisar el trabajo pendiente de datos consultando el registro de eventos

Delta Live Tables realiza un seguimiento de la cantidad de datos presentes en los trabajos pendiente del objeto details:flow_progress.metrics.backlog_bytes. Los eventos donde se incluyen las métricas de trabajo pendiente tienen el tipo de evento flow_progress. En el siguiente ejemplo, se consultan las métricas de trabajo pendiente de la última actualización de la canalización:

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

Nota:

Es posible que las métricas del trabajo pendiente no estén disponibles según el tipo de origen de datos de la canalización y la versión de Databricks Runtime.

Supervisar los eventos de escalado automático mejorados desde el registro de eventos

El registro de eventos captura el tamaño del clúster cuando el escalado automático mejorado está habilitado en las canalizaciones. Los eventos que contienen información sobre el autoescalado mejorado tienen el tipo de evento autoscale. La información de solicitud de cambio de tamaño del clúster se almacena en el objeto details:autoscale. En el ejemplo siguiente se consultan las solicitudes de cambio de tamaño del clúster de escalado automático mejorado para la última actualización de canalización:

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Supervisar la utilización de recursos de proceso

Los eventos cluster_resources proporcionan métricas sobre el número de ranuras de tarea del clúster, la cantidad de ranuras de tarea que se usan y el número de tareas que están a la espera de ser programadas.

Cuando el escalado automático mejorado está habilitado, los eventos cluster_resources también contienen métricas para el algoritmo de escalado automático, incluidos latest_requested_num_executors y optimal_num_executors. Los eventos también muestran el estado del algoritmo como estados diferentes, como CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS y BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Esta información se puede ver junto con los eventos de escalado automático para tener una perspectiva general del escalado automático mejorado.

En el ejemplo siguiente se consulta el historial de tamaño de la cola de tareas correspondiente a la última actualización de la canalización:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

En el ejemplo siguiente se consulta el historial de uso correspondiente a la última actualización de la canalización:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

En el siguiente ejemplo se consulta el historial de recuento de ejecutores, junto con las métricas disponibles únicamente para las canalizaciones de escalado automático mejorado, incluido el número de ejecutores solicitados por el algoritmo en la solicitud más reciente, el número óptimo de ejecutores recomendado por el algoritmo en función de las métricas más recientes y el estado del algoritmo de escalado automático:

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Auditoría de canalizaciones de Delta Live Tables

Puede usar los registros de eventos de Delta Live Tables y otros registros de auditoría de Azure Databricks para obtener una imagen completa de cómo se actualizan los datos en Delta Live Tables.

Delta Live Tables usa las credenciales del propietario de la canalización para ejecutar las actualizaciones. Puede cambiar las credenciales usadas mediante la actualización del propietario de la canalización. Delta Live Tables registra al usuario para las acciones de canalización, incluida la creación de canalizaciones, las modificaciones en la configuración y el desencadenamiento de actualizaciones.

Consulte Eventos del catálogo de Unity para obtener una referencia de los eventos de auditoría del catálogo de Unity.

Consulta de acciones de usuario en el registro de eventos

Puede usar el registro de eventos para auditar eventos, por ejemplo, acciones del usuario. Los eventos que contienen información sobre las acciones del usuario tienen el tipo de evento user_action.

La información sobre la acción se almacena en el objeto user_action, en el campo details. Use la consulta siguiente para construir un registro de auditoría de eventos de usuario. Para crear la vista event_log_raw usada en esta consulta, consulte Consulta del registro de eventos.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
1 2021-05-20T19:36:03.517+0000 START user@company.com
2 2021-05-20T19:35:59.913+0000 CREATE user@company.com
3 2021-05-27T00:35:51.971+0000 START user@company.com

Información en tiempo de ejecución

Puede ver la información en tiempo de ejecución de un proceso de actualización de canalización; por ejemplo, la versión de Databricks Runtime para la siguiente actualización:

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
1 11.0