Partilhar via


Observabilidade no Azure Databricks para trabalhos, Lakeflow Spark Declarative Pipelines e Lakeflow Connect

Monitorar o desempenho, o custo e a integridade de seus aplicativos de streaming é essencial para criar pipelines de ETL confiáveis e eficientes. O Azure Databricks fornece um conjunto avançado de recursos de observabilidade em Jobs, Lakeflow Spark Declarative Pipelines e Lakeflow Connect para ajudar a diagnosticar gargalos, otimizar o desempenho e gerenciar o uso de recursos e os custos.

Este artigo descreve as melhores práticas nas seguintes áreas:

  • Principais métricas de desempenho de streaming
  • Esquemas de log de eventos e consultas de exemplo
  • Monitoramento de consultas de streaming
  • Observabilidade de custos usando tabelas do sistema
  • Exportando logs e métricas para ferramentas externas

Principais métricas para a observabilidade de streaming

Ao operar pipelines de streaming, monitore as seguintes métricas principais:

Metric Purpose
Backpressure Monitora o número de ficheiros e deslocamentos (tamanhos). Ajuda a identificar gargalos e garante que o sistema possa lidar com os dados recebidos sem ficar para trás.
Throughput Rastreia o número de mensagens processadas por microlote. Avalie a eficiência do pipeline e verifique se ele acompanha a ingestão de dados.
Duration Mede a duração média de um microlote. Indica a velocidade de processamento e ajuda a ajustar os intervalos de lote.
Latency Indica quantos registros/mensagens são processados ao longo do tempo. Ajuda a entender os atrasos de pipeline de ponta a ponta e a otimizar para latências mais baixas.
Utilização do cluster Reflete o uso da CPU e da memória (%). Garante o uso eficiente de recursos e ajuda a dimensionar clusters para atender às demandas de processamento.
Network Mede os dados transferidos e recebidos. Útil para identificar gargalos de rede e melhorar o desempenho da transferência de dados.
Checkpoint Identifica dados processados e compensações. Garante consistência e permite tolerância a falhas durante problemas.
Cost Mostra os custos horários, diários e mensais de um aplicativo de streaming. Auxilia na orçamentação e otimização de recursos.
Lineage Exibe conjuntos de dados e camadas criadas no aplicativo de streaming. Facilita a transformação, o rastreamento, a garantia de qualidade e a depuração de dados.

Registos e métricas do cluster

Os logs e métricas de cluster do Azure Databricks fornecem informações detalhadas sobre o desempenho e a utilização do cluster. Esses logs e métricas incluem informações sobre CPU, memória, E/S de disco, tráfego de rede e outras métricas do sistema. O monitoramento dessas métricas é crucial para otimizar o desempenho do cluster, gerenciar recursos de forma eficiente e solucionar problemas.

Os logs e métricas de cluster do Azure Databricks oferecem informações detalhadas sobre o desempenho do cluster e a utilização de recursos. Isso inclui uso de CPU e memória, E/S de disco e tráfego de rede. O monitoramento dessas métricas é fundamental para:

  • Otimizando o desempenho do cluster.
  • Gerir os recursos de forma eficiente.
  • Solução de problemas operacionais.

As métricas podem ser aproveitadas por meio da interface do usuário do Databricks ou exportadas para ferramentas de monitoramento pessoais. Consulte Exemplo de bloco de anotações: métricas Datadog.

Interface do usuário do Spark

A interface do usuário do Spark mostra informações detalhadas sobre o progresso de trabalhos e estágios, incluindo o número de tarefas concluídas, pendentes e com falha. Isso ajuda a entender o fluxo de execução e identificar gargalos.

Para aplicativos de streaming, a guia Streaming mostra métricas como taxa de entrada, taxa de processamento e duração do lote. Ele ajuda você a monitorar o desempenho de seus trabalhos de streaming e identificar quaisquer problemas de ingestão ou processamento de dados.

Consulte Depuração com a interface do usuário do Spark para obter mais informações.

Métricas de computação

As métricas de computação ajudarão você a entender a utilização do cluster. À medida que seu trabalho é executado, você pode ver como ele é dimensionado e como seus recursos são afetados. Você será capaz de encontrar pressão de memória que pode levar a falhas por falta de memória ou pressão sobre a CPU que pode causar longos atrasos. Aqui estão as métricas específicas que você verá:

  • Distribuição de carga do servidor: a utilização da CPU de cada nó no último minuto.
  • Utilização da CPU: A porcentagem de tempo que a CPU gastou em vários modos (por exemplo, usuário, sistema, ocioso e iowait).
  • Utilização da memória: Uso total da memória por cada modo (por exemplo, usado, livre, buffer e armazenado em cache).
  • Utilização de troca de memória: Uso total de troca de memória.
  • Espaço livre do sistema de arquivos: Uso total do sistema de arquivos por cada ponto de montagem.
  • Taxa de transferência de rede: O número de bytes recebidos e transmitidos através da rede por cada dispositivo.
  • Número de nós ativos: o número de nós ativos em cada carimbo de data/hora para a computação dada.

Consulte Monitorização de desempenho e Gráficos de métricas de hardware para mais informações.

Tabelas do sistema

Monitorização de custos

As tabelas de sistema do Azure Databricks fornecem uma abordagem estruturada para monitorar o custo e o desempenho do trabalho. Estas tabelas incluem:

  • Detalhes da execução do trabalho.
  • Utilização de recursos.
  • Custos associados.

Use estas tabelas para entender a saúde operacional e o impacto financeiro.

Requirements

Para usar tabelas do sistema para monitoramento de custos:

  • Um administrador de conta deve ativar o system.lakeflow schema.
  • Os utilizadores devem:
    • Seja um administrador de metastore e um administrador de conta, ou
    • Ter permissões de USE e SELECT nos esquemas do sistema.

Exemplo de consulta: trabalhos mais caros (últimos 30 dias)

Esta consulta identifica os trabalhos mais caros nos últimos 30 dias, auxiliando na análise e otimização de custos.

WITH list_cost_per_job AS (
     SELECT
       t1.workspace_id,
       t1.usage_metadata.job_id,
       COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
       SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
       FIRST(identity_metadata.run_as, true) AS run_as,
       FIRST(t1.custom_tags, true) AS custom_tags,
       MAX(t1.usage_end_time) AS last_seen_date
     FROM system.billing.usage t1
     INNER JOIN system.billing.list_prices list_prices ON
       t1.cloud = list_prices.cloud AND
       t1.sku_name = list_prices.sku_name AND
       t1.usage_start_time >= list_prices.price_start_time AND
       (t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
     WHERE
       t1.billing_origin_product = "JOBS"
       AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
     GROUP BY ALL
   ),
   most_recent_jobs AS (
     SELECT
       *,
       ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
     FROM
       system.lakeflow.jobs QUALIFY rn=1
   )
   SELECT
     t2.name,
     t1.job_id,
     t1.workspace_id,
     t1.runs,
     t1.run_as,
     SUM(list_cost) AS list_cost,
     t1.last_seen_date
   FROM list_cost_per_job t1
   LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
   GROUP BY ALL
   ORDER BY list_cost DESC

Oleodutos declarativos Lakeflow Spark

O log de eventos do Lakeflow Spark Declarative Pipelines captura um registro abrangente de todos os eventos do pipeline, incluindo:

  • Logs de auditoria.
  • Verificações da qualidade dos dados.
  • Progresso do gasoduto.
  • Linhagem de dados.

O log de eventos é ativado automaticamente para todos os Lakeflow Spark Declarative Pipelines e pode ser acessado via:

  • Interface do usuário do pipeline: visualize os logs diretamente.
  • API de pipelines: acesso programático.
  • Consulta direta: consulte a tabela do log de eventos.

Para obter mais informações, consulte esquema de log de eventos para Lakeflow Spark Declarative Pipelines.

Exemplos de consultas

Essas consultas de exemplo ajudam a monitorar o desempenho e a integridade dos pipelines, fornecendo principais métricas, como duração do lote, taxa de transferência, contrapressão e uso de recursos.

Duração média do lote

Esta consulta calcula o tempo médio de processamento dos lotes pelo pipeline.

SELECT
  (max_t - min_t) / batch_count as avg_batch_duration_seconds,
  batch_count,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      count(*) as batch_count,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Rendimento médio

Esta consulta calcula o desempenho médio do pipeline em termos de linhas processadas por segundo.

SELECT
  (max_t - min_t) / total_rows as avg_throughput_rps,
  total_rows,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      sum(
        details:flow_progress:metrics:num_output_rows
      ) as total_rows,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Backpressure

Esta consulta mede a retropressão do fluxo de dados verificando a lista de pendências de dados.

SELECT
  timestamp,
  DOUBLE(
    details:flow_progress:metrics:backlog_bytes
  ) AS backlog_bytes,
  DOUBLE(
    details:flow_progress:metrics:backlog_files
  ) AS backlog_files
FROM
  event_log
WHERE
  event_type = 'flow_progress'

Utilização de clusters e slots

Esta consulta fornece informações sobre como os clusters ou slots são utilizados pelo pipeline.

SELECT
  date_trunc("hour", timestamp) AS hour,
  AVG (
    DOUBLE (
      details:cluster_resources:num_task_slots
    )
  ) AS num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_task_slots
    )
  ) AS avg_num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:num_executors
    )
  ) AS num_executors,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_task_slot_utilization
    )
  ) AS avg_utilization,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_queued_tasks
    )
  ) AS queue_size
FROM
  event_log
WHERE
  details : cluster_resources : avg_num_queued_tasks IS NOT NULL
  AND origin.update_id = '${latest_update_id}'
GROUP BY
  1;

Jobs

Você pode monitorar consultas de streaming em trabalhos por meio do Streaming Query Listener.

Anexe um ouvinte à sessão do Spark para habilitar o Streaming Query Listener no Azure Databricks. Esse ouvinte monitorará o progresso e as métricas de suas consultas de streaming. Ele pode ser usado para enviar métricas para ferramentas de monitoramento externas ou registrá-las para análise posterior.

Exemplo: exportar métricas para ferramentas de monitoramento externo

Note

Isso está disponível no Databricks Runtime 11.3 LTS e superior para Python e Scala.

Você pode exportar métricas de streaming para serviços externos para alertas ou painéis usando a StreamingQueryListener interface.

Aqui está um exemplo básico de como implementar um ouvinte:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event):
       print("Query started: ", event.id)

   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress)

   def onQueryTerminated(self, event):
       print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

Exemplo: Usar ouvinte de consulta no Azure Databricks

Abaixo está um exemplo de um registo de eventos do StreamingQueryListener para um fluxo de dados Kafka to Delta Lake:

{
  "id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
  "runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
  "timestamp": "2024-05-15T21:57:50.782Z",
  "batchId": 0,
  "batchDuration": 3601,
  "numInputRows": 20,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 5.55401277422938,
  "durationMs": {
    "addBatch": 1544,
    "commitBatch": 686,
    "commitOffsets": 27,
    "getBatch": 12,
    "latestOffset": 577,
    "queryPlanning": 105,
    "triggerExecution": 3600,
    "walCommit": 34
  },
  "stateOperators": [
    {
      "operatorName": "symmetricHashJoin",
      "numRowsTotal": 20,
      "numRowsUpdated": 20,
      "allUpdatesTimeMs": 473,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 277,
      "memoryUsedBytes": 13120,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 5,
      "numStateStoreInstances": 20,
      "customMetrics": {
        "loadedMapCacheHitCount": 0,
        "loadedMapCacheMissCount": 0,
        "stateOnCurrentVersionSizeBytes": 5280
      }
    }
  ],
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic-1]]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "avgOffsetsBehindLatest": "0.0",
        "estimatedTotalBytesBehindLatest": "0.0",
        "maxOffsetsBehindLatest": "0",
        "minOffsetsBehindLatest": "0"
      }
    },
    {
      "description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "numBytesOutstanding": "0",
        "numFilesOutstanding": "0"
      }
    }
  ]
}

Para mais exemplos, consulte: Exemplos.

Métricas de progresso da consulta

As métricas de progresso da consulta são essenciais para monitorar o desempenho e a integridade de suas consultas de streaming. Essas métricas incluem o número de linhas de entrada, taxas de processamento e várias durações relacionadas à execução da consulta. Você pode observar essas métricas anexando um StreamingQueryListener à sessão do Spark. O ouvinte emitirá eventos contendo essas métricas no final de cada época de streaming.

Pode aceder a métricas utilizando o mapa StreamingQueryProgress.observedMetrics no método onQueryProgress do ouvinte. Isso permite que você acompanhe e analise o desempenho de suas consultas de streaming em tempo real.

class MyListener(StreamingQueryListener):
   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress.observedMetrics)