Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Monitorar o desempenho, o custo e a integridade dos aplicativos de streaming é essencial para criar pipelines ETL confiáveis e eficientes. O Azure Databricks fornece um conjunto avançado de recursos de observabilidade em Jobs, Lakeflow Declarative Pipelines e Lakeflow Connect para ajudar a diagnosticar gargalos, otimizar o desempenho e gerenciar o uso e os custos de recursos.
Este artigo descreve as práticas recomendadas nas seguintes áreas:
- Principais métricas de desempenho de streaming
- Esquemas de log de eventos e consultas de exemplo
- Monitoramento de consulta de streaming
- Observabilidade de custo usando tabelas do sistema
- Exportando logs e métricas para ferramentas externas
Principais métricas para a observabilidade de streaming
Ao operar pipelines de streaming, monitore as seguintes principais métricas:
Métrica | Propósito |
---|---|
Contrapressão | Monitora o número de arquivos e deslocamentos (tamanhos). Ajuda a identificar gargalos e garante que o sistema possa lidar com dados de entrada sem ficar para trás. |
Taxa de transferência | Controla o número de mensagens processadas por microlote. Avalie a eficiência do pipeline e verifique se ele mantém o ritmo com a ingestão de dados. |
Duração | Mede a duração média de um lote pequeno. Indica a velocidade de processamento e ajuda a ajustar intervalos em lotes. |
Latência | Indica quantos registros/mensagens são processados ao longo do tempo. Auxilia na compreensão dos atrasos da pipeline de ponta a ponta e na otimização visando latências mais baixas. |
Utilização de cluster | Reflete o uso da CPU e da memória (%). Garante o uso eficiente de recursos e ajuda a dimensionar clusters para atender às demandas de processamento. |
Rede | Mede os dados transferidos e recebidos. Útil para identificar gargalos de rede e melhorar o desempenho de transferência de dados. |
Ponto de verificação | Identifica dados processados e deslocamentos. Garante a consistência e habilita a tolerância a falhas em situações de falhas. |
Custo | Mostra os custos por hora, diários e mensais de um aplicativo de streaming. Ajuda no orçamento e na otimização de recursos. |
Linhagem | Exibe conjuntos de dados e camadas criados no aplicativo de streaming. Facilita a transformação de dados, o acompanhamento, a garantia de qualidade e a depuração. |
Logs e métricas de cluster
Os logs e as métricas do cluster do Azure Databricks fornecem insights detalhados sobre o desempenho e a utilização do cluster. Esses logs e métricas incluem informações sobre CPU, memória, E/S de disco, tráfego de rede e outras métricas do sistema. Monitorar essas métricas é crucial para otimizar o desempenho do cluster, gerenciar recursos com eficiência e solucionar problemas.
Os logs de cluster e as métricas do Azure Databricks oferecem insights detalhados sobre o desempenho do cluster e a utilização de recursos. Elas incluem uso de CPU e memória, E/S de disco e tráfego de rede. O monitoramento dessas métricas é fundamental para:
- Otimizando o desempenho do cluster.
- Gerenciando recursos com eficiência.
- Solução de problemas operacionais.
As métricas podem ser aproveitadas por meio da interface do usuário do Databricks ou exportadas para ferramentas de monitoramento pessoal. Veja exemplo de Notebook: métricas do Datadog.
Interface do usuário do Spark
A interface do usuário do Spark mostra informações detalhadas sobre o progresso de trabalhos e estágios, incluindo o número de tarefas concluídas, pendentes e com falha. Isso ajuda você a entender o fluxo de execução e identificar gargalos.
Para aplicativos de streaming, a guia Streaming mostra métricas como taxa de entrada, taxa de processamento e duração do lote. Ele ajuda você a monitorar o desempenho dos trabalhos de streaming e a identificar problemas de ingestão ou processamento de dados.
Consulte Depuração na interface do Apache Spark para obter mais informações.
Métricas de computação
As métricas de computação ajudarão você a entender a utilização do cluster. À medida que seu trabalho é executado, você pode ver como ele é dimensionado e como seus recursos são afetados. Você poderá encontrar uma sobrecarga de memória que pode levar a falhas por falta de memória ou uma sobrecarga da CPU que pode causar longos atrasos. Aqui estão as métricas específicas que você verá:
- Distribuição de Carga do Servidor: utilização da CPU de cada nó no último minuto.
- Utilização da CPU: o percentual de tempo gasto pela CPU em vários modos (por exemplo, usuário, sistema, ocioso e iowait).
- Utilização de memória: uso total de memória por cada modo (por exemplo, usado, gratuito, buffer e armazenado em cache).
- Utilização de troca de memória: uso total de troca de memória.
- Espaço livre do sistema de arquivos: total de uso do sistema de arquivos por cada ponto de montagem.
- Taxa de transferência de rede: o número de bytes recebidos e transmitidos pela rede por cada dispositivo.
- Número de nós ativos: o número de nós ativos em cada timestamp para o cálculo fornecido.
Consulte o desempenho do Monitor e gráficos de métricas de hardware para obter mais informações.
Tabelas do sistema
Monitoramento de custos
As tabelas do sistema do Azure Databricks fornecem uma abordagem estruturada para monitorar o custo e o desempenho do trabalho. Essas tabelas incluem:
- Detalhes da execução do trabalho.
- Utilização de recursos.
- Custos associados.
Use essas tabelas para entender a integridade operacional e o impacto financeiro.
Requisitos
Para usar tabelas do sistema para monitoramento de custos:
- Um administrador de conta deve habilitar o
system.lakeflow schema
. - Os usuários devem optar por:
- Ser um administrador do metastore e um administrador de conta ou
- Tenha permissões de
USE
eSELECT
nos esquemas do sistema.
Consulta de exemplo: trabalhos mais caros (últimos 30 dias)
Essa consulta identifica os trabalhos mais caros nos últimos 30 dias, auxiliando na análise e otimização de custos.
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC
Pipelines Declarativos do Lakeflow
O log de eventos do Lakeflow Declarative Pipelines captura um registro abrangente de todos os eventos de pipeline, incluindo:
- Logs de auditoria.
- Verificações de qualidade de dados.
- Progresso do pipeline.
- Linhagem de dados.
O log de eventos é habilitado automaticamente para todos os Pipelines Declarativos do Lakeflow e pode ser acessado por meio de:
- Interface do usuário do pipeline: exibir logs diretamente.
- API DLT: acesso programático.
- Consulta direta: consulte a tabela de log de eventos.
Para obter mais informações, consulte o esquema de log de eventos para Pipelines Declarativos do Lakeflow.
Consultas de exemplo
Essas consultas de exemplo ajudam a monitorar o desempenho e a integridade dos pipelines fornecendo métricas importantes, como duração do lote, taxa de transferência, backpressure e utilização de recursos.
Duração média do lote
Essa consulta calcula a duração média dos lotes processados pelo pipeline.
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Taxa de transferência média
Essa consulta calcula o desempenho médio do pipeline em termos de linhas processadas por segundo.
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Contrapressão
Esta consulta mede a pressão de retorno do pipeline verificando o acúmulo de dados.
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
Utilização de cluster e slots
Essa consulta tem insights sobre a utilização de clusters ou slots usados pelo pipeline.
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
Trabalhos
Você pode monitorar consultas de streaming em tarefas por meio do Ouvinte de Consulta de Streaming.
Anexe um ouvinte à sessão do Spark para habilitar o Ouvinte de Consulta de Streaming no Databricks do Azure. Esse ouvinte monitorará o progresso e as métricas de suas consultas de streaming. Ele pode ser usado para enviar métricas por push para ferramentas de monitoramento externas ou registrá-las para análise posterior.
Exemplo: exportar métricas para ferramentas de monitoramento externas
:::nota
Isso está disponível no Databricks Runtime 11.3 LTS e superior para Python e Scala.
:::
Você pode exportar métricas de streaming para serviços externos para alertas ou dashboards usando a StreamingQueryListener
interface.
Aqui está um exemplo básico de como implementar um ouvinte:
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
Exemplo: usar o ouvinte de consulta no Azure Databricks
Veja abaixo um exemplo de um log de eventos StreamingQueryListener para uma consulta de streaming de Kafka para o Delta Lake.
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
Para obter mais exemplos, consulte: Exemplos.
Métricas de progresso da consulta
As métricas de progresso da consulta são essenciais para monitorar o desempenho e a integridade de suas consultas de streaming. Essas métricas incluem o número de linhas de entrada, taxas de processamento e várias durações relacionadas à execução da consulta. Você pode observar essas métricas anexando um StreamingQueryListener
à sessão do Spark. O ouvinte emitirá eventos que contêm essas métricas no final de cada época de streaming.
Por exemplo, você pode acessar métricas usando o StreamingQueryProgress.observedMetrics
mapa no método onQueryProgress
do ouvinte. Isso permite que você acompanhe e analise o desempenho de suas consultas de streaming em tempo real.
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)