Compartir a través de


Observabilidad en Azure Databricks para trabajos, canalizaciones declarativas de Lakeflow y Lakeflow Connect

La supervisión del rendimiento, el costo y el estado de las aplicaciones de streaming es esencial para crear canalizaciones de ETL confiables y eficaces. Azure Databricks proporciona un amplio conjunto de funcionalidades de observabilidad en Jobs, Canalizaciones Declarativas de Lakeflow y Lakeflow Connect para ayudar a diagnosticar cuellos de botella, optimizar el rendimiento y administrar el uso y costos de recursos.

En este artículo se describen los procedimientos recomendados en las siguientes áreas:

  • Métricas clave de rendimiento de streaming
  • Esquemas de registro de eventos y consultas de ejemplo
  • Supervisión de consultas de streaming
  • Observabilidad de costos mediante tablas del sistema
  • Exportación de registros y métricas a herramientas externas

Métricas clave para la observabilidad de streaming

Al operar canalizaciones de streaming, supervise las siguientes métricas clave:

Métrica propósito
Contrapresión Supervisa el número de archivos y desplazamientos (tamaños). Ayuda a identificar cuellos de botella y garantiza que el sistema pueda controlar los datos entrantes sin que se quede atrás.
Rendimiento Realiza un seguimiento del número de mensajes procesados por microproceso. Evalúe la eficacia de la canalización y compruebe que sigue el ritmo de la ingesta de datos.
Duración Mide la duración media de un microproceso. Indica la velocidad de procesamiento y ayuda a optimizar los intervalos por lotes.
Latencia Indica cuántos registros o mensajes se procesan con el tiempo. Ayuda a comprender los retrasos de la canalización de un extremo a otro y a optimizar las latencias más bajas.
Uso del clúster Refleja el uso de cpu y memoria (%). Garantiza un uso eficaz de los recursos y ayuda a escalar clústeres para satisfacer las demandas de procesamiento.
Red Mide los datos transferidos y recibidos. Resulta útil para identificar cuellos de botella de red y mejorar el rendimiento de la transferencia de datos.
Punto de control Identifica los datos procesados y los desplazamientos. Garantiza la coherencia y habilita la tolerancia a errores durante los errores.
Costo Muestra los costos por hora, diario y mensual de una aplicación de streaming. Ayuda en la optimización de presupuestos y recursos.
Linaje Muestra conjuntos de datos y capas creados en la aplicación de streaming. Facilita la transformación, el seguimiento, la garantía de calidad y la depuración de datos.

Registros y métricas del clúster

Los registros y métricas del clúster de Azure Databricks proporcionan información detallada sobre el rendimiento y el uso del clúster. Estos registros y métricas incluyen información sobre cpu, memoria, E/S de disco, tráfico de red y otras métricas del sistema. La supervisión de estas métricas es fundamental para optimizar el rendimiento del clúster, administrar los recursos de forma eficaz y solucionar problemas.

Los registros y las métricas del clúster de Azure Databricks ofrecen información detallada sobre el rendimiento del clúster y el uso de recursos. Entre ellas se incluyen el uso de CPU y memoria, la E/S de disco y el tráfico de red. La supervisión de estas métricas es fundamental para:

  • Optimización del rendimiento del clúster.
  • Administrar recursos de forma eficaz.
  • Solución de problemas operativos.

Las métricas se pueden aprovechar a través de la interfaz de usuario de Databricks o exportarlas a herramientas de supervisión personales. Vea Ejemplo de Notebook: Métricas de Datadog.

Interfaz de usuario de Spark

La interfaz de usuario de Spark muestra información detallada sobre el progreso de los trabajos y las fases, incluido el número de tareas completadas, pendientes y con errores. Esto te ayuda a entender el flujo de ejecución y a identificar cuellos de botella.

En el caso de las aplicaciones de streaming, la pestaña Streaming muestra métricas como la velocidad de entrada, la velocidad de procesamiento y la duración del lote. Ayuda a supervisar el rendimiento de los trabajos de streaming e identificar cualquier problema de ingesta o procesamiento de datos.

Consulte Depuración con la interfaz de usuario de Apache Spark para obtener más información.

Métricas de cálculo

Las métricas de proceso le ayudarán a comprender el uso del clúster. A medida que se ejecuta el trabajo, puede ver cómo se escala y cómo se ven afectados los recursos. Podrá encontrar presión de memoria que podría provocar errores de OOM o presión de CPU que podrían provocar retrasos prolongados. Estas son las métricas específicas que verá:

  • Distribución de carga del servidor: el uso de CPU de cada nodo durante el último minuto.
  • Uso de CPU: el porcentaje de tiempo que la CPU pasó en varios modos (por ejemplo, usuario, sistema, inactivo y iowait).
  • Uso de memoria: uso total de memoria por cada modo (por ejemplo, usado, libre, búfer y almacenado en caché).
  • Utilización del swap de memoria: uso total del swap de memoria.
  • Espacio libre del sistema de archivos: uso total del sistema de archivos por cada punto de montaje.
  • Rendimiento de red: el número de bytes recibidos y transmitidos a través de la red por cada dispositivo.
  • Número de nodos activos: número de nodos activos en cada marca de tiempo para el proceso especificado.

Consulte Supervisar el rendimiento y Gráficos de métricas de hardware para obtener más información.

Tablas del sistema

Supervisión de costos

Las tablas del sistema de Azure Databricks proporcionan un enfoque estructurado para supervisar el costo y el rendimiento del trabajo. Estas tablas incluyen:

  • Detalles de ejecución del trabajo.
  • Uso de recursos.
  • Costos asociados.

Use estas tablas para comprender el estado operativo y el impacto financiero.

Requisitos

Para usar tablas del sistema para la supervisión de costos:

  • Un administrador de cuenta debe habilitar el system.lakeflow schema.
  • Los usuarios deben, o bien:
    • Ser un administrador de metastore y un administrador de cuenta, o
    • Tener permisos USE y SELECT en los esquemas del sistema.

Consulta de ejemplo: Trabajos más caros (últimos 30 días)

Esta consulta identifica los trabajos más caros en los últimos 30 días, lo que ayuda en el análisis y la optimización de costos.

WITH list_cost_per_job AS (
     SELECT
       t1.workspace_id,
       t1.usage_metadata.job_id,
       COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
       SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
       FIRST(identity_metadata.run_as, true) AS run_as,
       FIRST(t1.custom_tags, true) AS custom_tags,
       MAX(t1.usage_end_time) AS last_seen_date
     FROM system.billing.usage t1
     INNER JOIN system.billing.list_prices list_prices ON
       t1.cloud = list_prices.cloud AND
       t1.sku_name = list_prices.sku_name AND
       t1.usage_start_time >= list_prices.price_start_time AND
       (t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
     WHERE
       t1.billing_origin_product = "JOBS"
       AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
     GROUP BY ALL
   ),
   most_recent_jobs AS (
     SELECT
       *,
       ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
     FROM
       system.lakeflow.jobs QUALIFY rn=1
   )
   SELECT
     t2.name,
     t1.job_id,
     t1.workspace_id,
     t1.runs,
     t1.run_as,
     SUM(list_cost) AS list_cost,
     t1.last_seen_date
   FROM list_cost_per_job t1
   LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
   GROUP BY ALL
   ORDER BY list_cost DESC

Canalizaciones declarativas de Lakeflow

El registro de eventos de Canalizaciones declarativas de Lakeflow captura un registro completo de todos los eventos de canalización, incluidos:

  • Registros de auditoría.
  • Comprobaciones de calidad de datos.
  • Progreso de la canalización.
  • Rastreo de origen de datos.

El registro de eventos se habilita automáticamente para todas las canalizaciones declarativas de Lakeflow y se puede acceder a él a través de:

  • Interfaz de usuario de Pipeline: vea los registros directamente.
  • API DLT: acceso mediante programación.
  • Consulta directa: consulte la tabla de registro de eventos.

Para obtener más información, consulte Esquema de registro de eventos para canalizaciones declarativas de Lakeflow.

Consultas de ejemplo

Estas consultas de ejemplo ayudan a supervisar el rendimiento y el estado de las canalizaciones proporcionando métricas clave, como la duración del lote, el rendimiento, la represión y el uso de recursos.

Duración media del lote

Esta consulta calcula la duración media de los lotes procesados por la canalización.

SELECT
  (max_t - min_t) / batch_count as avg_batch_duration_seconds,
  batch_count,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      count(*) as batch_count,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Rendimiento medio

Esta consulta calcula el rendimiento medio de la canalización en términos de filas procesadas por segundo.

SELECT
  (max_t - min_t) / total_rows as avg_throughput_rps,
  total_rows,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      sum(
        details:flow_progress:metrics:num_output_rows
      ) as total_rows,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Contrapresión

Esta consulta mide la contrapresión de la canalización comprobando la acumulación de datos.

SELECT
  timestamp,
  DOUBLE(
    details:flow_progress:metrics:backlog_bytes
  ) AS backlog_bytes,
  DOUBLE(
    details:flow_progress:metrics:backlog_files
  ) AS backlog_files
FROM
  event_log
WHERE
  event_type = 'flow_progress'

Uso de clústeres y ranuras

Esta consulta tiene información sobre el uso de clústeres o ranuras utilizadas por la canalización.

SELECT
  date_trunc("hour", timestamp) AS hour,
  AVG (
    DOUBLE (
      details:cluster_resources:num_task_slots
    )
  ) AS num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_task_slots
    )
  ) AS avg_num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:num_executors
    )
  ) AS num_executors,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_task_slot_utilization
    )
  ) AS avg_utilization,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_queued_tasks
    )
  ) AS queue_size
FROM
  event_log
WHERE
  details : cluster_resources : avg_num_queued_tasks IS NOT NULL
  AND origin.update_id = '${latest_update_id}'
GROUP BY
  1;

Trabajos

Puede supervisar las consultas de streaming en trabajos a través del agente de escucha de consultas de streaming.

Adjunte un agente de escucha a la sesión de Spark para habilitar el agente de escucha de consultas de streaming enAzure Databricks. Este agente de escucha supervisará el progreso y las métricas de las consultas de streaming. Se puede usar para insertar métricas en herramientas de supervisión externas o registrarlas para su posterior análisis.

Ejemplo: Exportación de métricas a herramientas de supervisión externas

:::nota

Esto está disponible en Databricks Runtime 11.3 LTS y versiones posteriores para Python y Scala.

:::

Puede exportar métricas de streaming a servicios externos para alertas o paneles mediante la StreamingQueryListener interfaz .

Este es un ejemplo básico de cómo implementar un agente de escucha:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event):
       print("Query started: ", event.id)

   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress)

   def onQueryTerminated(self, event):
       print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

Ejemplo: Uso del agente de escucha de consultas en Azure Databricks

A continuación se muestra un ejemplo de un registro de eventos de StreamingQueryListener para una consulta de streaming de Kafka a Delta Lake:

{
  "id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
  "runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
  "timestamp": "2024-05-15T21:57:50.782Z",
  "batchId": 0,
  "batchDuration": 3601,
  "numInputRows": 20,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 5.55401277422938,
  "durationMs": {
    "addBatch": 1544,
    "commitBatch": 686,
    "commitOffsets": 27,
    "getBatch": 12,
    "latestOffset": 577,
    "queryPlanning": 105,
    "triggerExecution": 3600,
    "walCommit": 34
  },
  "stateOperators": [
    {
      "operatorName": "symmetricHashJoin",
      "numRowsTotal": 20,
      "numRowsUpdated": 20,
      "allUpdatesTimeMs": 473,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 277,
      "memoryUsedBytes": 13120,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 5,
      "numStateStoreInstances": 20,
      "customMetrics": {
        "loadedMapCacheHitCount": 0,
        "loadedMapCacheMissCount": 0,
        "stateOnCurrentVersionSizeBytes": 5280
      }
    }
  ],
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic-1]]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "avgOffsetsBehindLatest": "0.0",
        "estimatedTotalBytesBehindLatest": "0.0",
        "maxOffsetsBehindLatest": "0",
        "minOffsetsBehindLatest": "0"
      }
    },
    {
      "description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "numBytesOutstanding": "0",
        "numFilesOutstanding": "0"
      }
    }
  ]
}

Para obtener más ejemplos, vea: Ejemplos.

Métricas de progreso de la consulta

Las métricas de progreso de las consultas son esenciales para supervisar el rendimiento y el estado de las consultas de streaming. Estas métricas incluyen el número de filas de entrada, velocidades de procesamiento y varias duraciones relacionadas con la ejecución de la consulta. Para observar estas métricas, adjunte un StreamingQueryListener elemento a la sesión de Spark. El oyente emitirá eventos que contengan estas métricas al final de cada ciclo de transmisión.

Por ejemplo, puede acceder a las métricas mediante StreamingQueryProgress.observedMetrics mapa en el método del oyente onQueryProgress. Esto le permite realizar un seguimiento y analizar el rendimiento de las consultas de streaming en tiempo real.

class MyListener(StreamingQueryListener):
   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress.observedMetrics)