Compartilhar via


Observabilidade no Azure Databricks para trabalhos, Pipelines Declarativos do Lakeflow e Lakeflow Connect

Monitorar o desempenho, o custo e a integridade dos aplicativos de streaming é essencial para criar pipelines ETL confiáveis e eficientes. O Azure Databricks fornece um conjunto avançado de recursos de observabilidade em Jobs, Lakeflow Declarative Pipelines e Lakeflow Connect para ajudar a diagnosticar gargalos, otimizar o desempenho e gerenciar o uso e os custos de recursos.

Este artigo descreve as práticas recomendadas nas seguintes áreas:

  • Principais métricas de desempenho de streaming
  • Esquemas de log de eventos e consultas de exemplo
  • Monitoramento de consulta de streaming
  • Observabilidade de custo usando tabelas do sistema
  • Exportando logs e métricas para ferramentas externas

Principais métricas para a observabilidade de streaming

Ao operar pipelines de streaming, monitore as seguintes principais métricas:

Métrica Propósito
Contrapressão Monitora o número de arquivos e deslocamentos (tamanhos). Ajuda a identificar gargalos e garante que o sistema possa lidar com dados de entrada sem ficar para trás.
Taxa de transferência Controla o número de mensagens processadas por microlote. Avalie a eficiência do pipeline e verifique se ele mantém o ritmo com a ingestão de dados.
Duração Mede a duração média de um lote pequeno. Indica a velocidade de processamento e ajuda a ajustar intervalos em lotes.
Latência Indica quantos registros/mensagens são processados ao longo do tempo. Auxilia na compreensão dos atrasos da pipeline de ponta a ponta e na otimização visando latências mais baixas.
Utilização de cluster Reflete o uso da CPU e da memória (%). Garante o uso eficiente de recursos e ajuda a dimensionar clusters para atender às demandas de processamento.
Rede Mede os dados transferidos e recebidos. Útil para identificar gargalos de rede e melhorar o desempenho de transferência de dados.
Ponto de verificação Identifica dados processados e deslocamentos. Garante a consistência e habilita a tolerância a falhas em situações de falhas.
Custo Mostra os custos por hora, diários e mensais de um aplicativo de streaming. Ajuda no orçamento e na otimização de recursos.
Linhagem Exibe conjuntos de dados e camadas criados no aplicativo de streaming. Facilita a transformação de dados, o acompanhamento, a garantia de qualidade e a depuração.

Logs e métricas de cluster

Os logs e as métricas do cluster do Azure Databricks fornecem insights detalhados sobre o desempenho e a utilização do cluster. Esses logs e métricas incluem informações sobre CPU, memória, E/S de disco, tráfego de rede e outras métricas do sistema. Monitorar essas métricas é crucial para otimizar o desempenho do cluster, gerenciar recursos com eficiência e solucionar problemas.

Os logs de cluster e as métricas do Azure Databricks oferecem insights detalhados sobre o desempenho do cluster e a utilização de recursos. Elas incluem uso de CPU e memória, E/S de disco e tráfego de rede. O monitoramento dessas métricas é fundamental para:

  • Otimizando o desempenho do cluster.
  • Gerenciando recursos com eficiência.
  • Solução de problemas operacionais.

As métricas podem ser aproveitadas por meio da interface do usuário do Databricks ou exportadas para ferramentas de monitoramento pessoal. Veja exemplo de Notebook: métricas do Datadog.

Interface do usuário do Spark

A interface do usuário do Spark mostra informações detalhadas sobre o progresso de trabalhos e estágios, incluindo o número de tarefas concluídas, pendentes e com falha. Isso ajuda você a entender o fluxo de execução e identificar gargalos.

Para aplicativos de streaming, a guia Streaming mostra métricas como taxa de entrada, taxa de processamento e duração do lote. Ele ajuda você a monitorar o desempenho dos trabalhos de streaming e a identificar problemas de ingestão ou processamento de dados.

Consulte Depuração na interface do Apache Spark para obter mais informações.

Métricas de computação

As métricas de computação ajudarão você a entender a utilização do cluster. À medida que seu trabalho é executado, você pode ver como ele é dimensionado e como seus recursos são afetados. Você poderá encontrar uma sobrecarga de memória que pode levar a falhas por falta de memória ou uma sobrecarga da CPU que pode causar longos atrasos. Aqui estão as métricas específicas que você verá:

  • Distribuição de Carga do Servidor: utilização da CPU de cada nó no último minuto.
  • Utilização da CPU: o percentual de tempo gasto pela CPU em vários modos (por exemplo, usuário, sistema, ocioso e iowait).
  • Utilização de memória: uso total de memória por cada modo (por exemplo, usado, gratuito, buffer e armazenado em cache).
  • Utilização de troca de memória: uso total de troca de memória.
  • Espaço livre do sistema de arquivos: total de uso do sistema de arquivos por cada ponto de montagem.
  • Taxa de transferência de rede: o número de bytes recebidos e transmitidos pela rede por cada dispositivo.
  • Número de nós ativos: o número de nós ativos em cada timestamp para o cálculo fornecido.

Consulte o desempenho do Monitor e gráficos de métricas de hardware para obter mais informações.

Tabelas do sistema

Monitoramento de custos

As tabelas do sistema do Azure Databricks fornecem uma abordagem estruturada para monitorar o custo e o desempenho do trabalho. Essas tabelas incluem:

  • Detalhes da execução do trabalho.
  • Utilização de recursos.
  • Custos associados.

Use essas tabelas para entender a integridade operacional e o impacto financeiro.

Requisitos

Para usar tabelas do sistema para monitoramento de custos:

  • Um administrador de conta deve habilitar o system.lakeflow schema.
  • Os usuários devem optar por:
    • Ser um administrador do metastore e um administrador de conta ou
    • Tenha permissões de USE e SELECT nos esquemas do sistema.

Consulta de exemplo: trabalhos mais caros (últimos 30 dias)

Essa consulta identifica os trabalhos mais caros nos últimos 30 dias, auxiliando na análise e otimização de custos.

WITH list_cost_per_job AS (
     SELECT
       t1.workspace_id,
       t1.usage_metadata.job_id,
       COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
       SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
       FIRST(identity_metadata.run_as, true) AS run_as,
       FIRST(t1.custom_tags, true) AS custom_tags,
       MAX(t1.usage_end_time) AS last_seen_date
     FROM system.billing.usage t1
     INNER JOIN system.billing.list_prices list_prices ON
       t1.cloud = list_prices.cloud AND
       t1.sku_name = list_prices.sku_name AND
       t1.usage_start_time >= list_prices.price_start_time AND
       (t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
     WHERE
       t1.billing_origin_product = "JOBS"
       AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
     GROUP BY ALL
   ),
   most_recent_jobs AS (
     SELECT
       *,
       ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
     FROM
       system.lakeflow.jobs QUALIFY rn=1
   )
   SELECT
     t2.name,
     t1.job_id,
     t1.workspace_id,
     t1.runs,
     t1.run_as,
     SUM(list_cost) AS list_cost,
     t1.last_seen_date
   FROM list_cost_per_job t1
   LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
   GROUP BY ALL
   ORDER BY list_cost DESC

Pipelines Declarativos do Lakeflow

O log de eventos do Lakeflow Declarative Pipelines captura um registro abrangente de todos os eventos de pipeline, incluindo:

  • Logs de auditoria.
  • Verificações de qualidade de dados.
  • Progresso do pipeline.
  • Linhagem de dados.

O log de eventos é habilitado automaticamente para todos os Pipelines Declarativos do Lakeflow e pode ser acessado por meio de:

  • Interface do usuário do pipeline: exibir logs diretamente.
  • API DLT: acesso programático.
  • Consulta direta: consulte a tabela de log de eventos.

Para obter mais informações, consulte o esquema de log de eventos para Pipelines Declarativos do Lakeflow.

Consultas de exemplo

Essas consultas de exemplo ajudam a monitorar o desempenho e a integridade dos pipelines fornecendo métricas importantes, como duração do lote, taxa de transferência, backpressure e utilização de recursos.

Duração média do lote

Essa consulta calcula a duração média dos lotes processados pelo pipeline.

SELECT
  (max_t - min_t) / batch_count as avg_batch_duration_seconds,
  batch_count,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      count(*) as batch_count,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Taxa de transferência média

Essa consulta calcula o desempenho médio do pipeline em termos de linhas processadas por segundo.

SELECT
  (max_t - min_t) / total_rows as avg_throughput_rps,
  total_rows,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      sum(
        details:flow_progress:metrics:num_output_rows
      ) as total_rows,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Contrapressão

Esta consulta mede a pressão de retorno do pipeline verificando o acúmulo de dados.

SELECT
  timestamp,
  DOUBLE(
    details:flow_progress:metrics:backlog_bytes
  ) AS backlog_bytes,
  DOUBLE(
    details:flow_progress:metrics:backlog_files
  ) AS backlog_files
FROM
  event_log
WHERE
  event_type = 'flow_progress'

Utilização de cluster e slots

Essa consulta tem insights sobre a utilização de clusters ou slots usados pelo pipeline.

SELECT
  date_trunc("hour", timestamp) AS hour,
  AVG (
    DOUBLE (
      details:cluster_resources:num_task_slots
    )
  ) AS num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_task_slots
    )
  ) AS avg_num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:num_executors
    )
  ) AS num_executors,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_task_slot_utilization
    )
  ) AS avg_utilization,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_queued_tasks
    )
  ) AS queue_size
FROM
  event_log
WHERE
  details : cluster_resources : avg_num_queued_tasks IS NOT NULL
  AND origin.update_id = '${latest_update_id}'
GROUP BY
  1;

Trabalhos

Você pode monitorar consultas de streaming em tarefas por meio do Ouvinte de Consulta de Streaming.

Anexe um ouvinte à sessão do Spark para habilitar o Ouvinte de Consulta de Streaming no Databricks do Azure. Esse ouvinte monitorará o progresso e as métricas de suas consultas de streaming. Ele pode ser usado para enviar métricas por push para ferramentas de monitoramento externas ou registrá-las para análise posterior.

Exemplo: exportar métricas para ferramentas de monitoramento externas

:::nota

Isso está disponível no Databricks Runtime 11.3 LTS e superior para Python e Scala.

:::

Você pode exportar métricas de streaming para serviços externos para alertas ou dashboards usando a StreamingQueryListener interface.

Aqui está um exemplo básico de como implementar um ouvinte:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event):
       print("Query started: ", event.id)

   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress)

   def onQueryTerminated(self, event):
       print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

Exemplo: usar o ouvinte de consulta no Azure Databricks

Veja abaixo um exemplo de um log de eventos StreamingQueryListener para uma consulta de streaming de Kafka para o Delta Lake.

{
  "id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
  "runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
  "timestamp": "2024-05-15T21:57:50.782Z",
  "batchId": 0,
  "batchDuration": 3601,
  "numInputRows": 20,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 5.55401277422938,
  "durationMs": {
    "addBatch": 1544,
    "commitBatch": 686,
    "commitOffsets": 27,
    "getBatch": 12,
    "latestOffset": 577,
    "queryPlanning": 105,
    "triggerExecution": 3600,
    "walCommit": 34
  },
  "stateOperators": [
    {
      "operatorName": "symmetricHashJoin",
      "numRowsTotal": 20,
      "numRowsUpdated": 20,
      "allUpdatesTimeMs": 473,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 277,
      "memoryUsedBytes": 13120,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 5,
      "numStateStoreInstances": 20,
      "customMetrics": {
        "loadedMapCacheHitCount": 0,
        "loadedMapCacheMissCount": 0,
        "stateOnCurrentVersionSizeBytes": 5280
      }
    }
  ],
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic-1]]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "avgOffsetsBehindLatest": "0.0",
        "estimatedTotalBytesBehindLatest": "0.0",
        "maxOffsetsBehindLatest": "0",
        "minOffsetsBehindLatest": "0"
      }
    },
    {
      "description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "numBytesOutstanding": "0",
        "numFilesOutstanding": "0"
      }
    }
  ]
}

Para obter mais exemplos, consulte: Exemplos.

Métricas de progresso da consulta

As métricas de progresso da consulta são essenciais para monitorar o desempenho e a integridade de suas consultas de streaming. Essas métricas incluem o número de linhas de entrada, taxas de processamento e várias durações relacionadas à execução da consulta. Você pode observar essas métricas anexando um StreamingQueryListener à sessão do Spark. O ouvinte emitirá eventos que contêm essas métricas no final de cada época de streaming.

Por exemplo, você pode acessar métricas usando o StreamingQueryProgress.observedMetrics mapa no método onQueryProgress do ouvinte. Isso permite que você acompanhe e analise o desempenho de suas consultas de streaming em tempo real.

class MyListener(StreamingQueryListener):
   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress.observedMetrics)