Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Monitorar o desempenho, o custo e a integridade de seus aplicativos de streaming é essencial para criar pipelines de ETL confiáveis e eficientes. O Azure Databricks fornece um conjunto avançado de recursos de observabilidade em Jobs, Lakeflow Spark Declarative Pipelines e Lakeflow Connect para ajudar a diagnosticar gargalos, otimizar o desempenho e gerenciar o uso de recursos e os custos.
Este artigo descreve as melhores práticas nas seguintes áreas:
- Principais métricas de desempenho de streaming
- Esquemas de log de eventos e consultas de exemplo
- Monitoramento de consultas de streaming
- Observabilidade de custos usando tabelas do sistema
- Exportando logs e métricas para ferramentas externas
Principais métricas para a observabilidade de streaming
Ao operar pipelines de streaming, monitore as seguintes métricas principais:
| Metric | Purpose |
|---|---|
| Backpressure | Monitora o número de ficheiros e deslocamentos (tamanhos). Ajuda a identificar gargalos e garante que o sistema possa lidar com os dados recebidos sem ficar para trás. |
| Throughput | Rastreia o número de mensagens processadas por microlote. Avalie a eficiência do pipeline e verifique se ele acompanha a ingestão de dados. |
| Duration | Mede a duração média de um microlote. Indica a velocidade de processamento e ajuda a ajustar os intervalos de lote. |
| Latency | Indica quantos registros/mensagens são processados ao longo do tempo. Ajuda a entender os atrasos de pipeline de ponta a ponta e a otimizar para latências mais baixas. |
| Utilização do cluster | Reflete o uso da CPU e da memória (%). Garante o uso eficiente de recursos e ajuda a dimensionar clusters para atender às demandas de processamento. |
| Network | Mede os dados transferidos e recebidos. Útil para identificar gargalos de rede e melhorar o desempenho da transferência de dados. |
| Checkpoint | Identifica dados processados e compensações. Garante consistência e permite tolerância a falhas durante problemas. |
| Cost | Mostra os custos horários, diários e mensais de um aplicativo de streaming. Auxilia na orçamentação e otimização de recursos. |
| Lineage | Exibe conjuntos de dados e camadas criadas no aplicativo de streaming. Facilita a transformação, o rastreamento, a garantia de qualidade e a depuração de dados. |
Registos e métricas do cluster
Os logs e métricas de cluster do Azure Databricks fornecem informações detalhadas sobre o desempenho e a utilização do cluster. Esses logs e métricas incluem informações sobre CPU, memória, E/S de disco, tráfego de rede e outras métricas do sistema. O monitoramento dessas métricas é crucial para otimizar o desempenho do cluster, gerenciar recursos de forma eficiente e solucionar problemas.
Os logs e métricas de cluster do Azure Databricks oferecem informações detalhadas sobre o desempenho do cluster e a utilização de recursos. Isso inclui uso de CPU e memória, E/S de disco e tráfego de rede. O monitoramento dessas métricas é fundamental para:
- Otimizando o desempenho do cluster.
- Gerir os recursos de forma eficiente.
- Solução de problemas operacionais.
As métricas podem ser aproveitadas por meio da interface do usuário do Databricks ou exportadas para ferramentas de monitoramento pessoais. Consulte Exemplo de bloco de anotações: métricas Datadog.
Interface do usuário do Spark
A interface do usuário do Spark mostra informações detalhadas sobre o progresso de trabalhos e estágios, incluindo o número de tarefas concluídas, pendentes e com falha. Isso ajuda a entender o fluxo de execução e identificar gargalos.
Para aplicativos de streaming, a guia Streaming mostra métricas como taxa de entrada, taxa de processamento e duração do lote. Ele ajuda você a monitorar o desempenho de seus trabalhos de streaming e identificar quaisquer problemas de ingestão ou processamento de dados.
Consulte Depuração com a interface do usuário do Spark para obter mais informações.
Métricas de computação
As métricas de computação ajudarão você a entender a utilização do cluster. À medida que seu trabalho é executado, você pode ver como ele é dimensionado e como seus recursos são afetados. Você será capaz de encontrar pressão de memória que pode levar a falhas por falta de memória ou pressão sobre a CPU que pode causar longos atrasos. Aqui estão as métricas específicas que você verá:
- Distribuição de carga do servidor: a utilização da CPU de cada nó no último minuto.
- Utilização da CPU: A porcentagem de tempo que a CPU gastou em vários modos (por exemplo, usuário, sistema, ocioso e iowait).
- Utilização da memória: Uso total da memória por cada modo (por exemplo, usado, livre, buffer e armazenado em cache).
- Utilização de troca de memória: Uso total de troca de memória.
- Espaço livre do sistema de arquivos: Uso total do sistema de arquivos por cada ponto de montagem.
- Taxa de transferência de rede: O número de bytes recebidos e transmitidos através da rede por cada dispositivo.
- Número de nós ativos: o número de nós ativos em cada carimbo de data/hora para a computação dada.
Consulte Monitorização de desempenho e Gráficos de métricas de hardware para mais informações.
Tabelas do sistema
Monitorização de custos
As tabelas de sistema do Azure Databricks fornecem uma abordagem estruturada para monitorar o custo e o desempenho do trabalho. Estas tabelas incluem:
- Detalhes da execução do trabalho.
- Utilização de recursos.
- Custos associados.
Use estas tabelas para entender a saúde operacional e o impacto financeiro.
Requirements
Para usar tabelas do sistema para monitoramento de custos:
- Um administrador de conta deve ativar o
system.lakeflow schema. - Os utilizadores devem:
- Seja um administrador de metastore e um administrador de conta, ou
- Ter permissões de
USEeSELECTnos esquemas do sistema.
Exemplo de consulta: trabalhos mais caros (últimos 30 dias)
Esta consulta identifica os trabalhos mais caros nos últimos 30 dias, auxiliando na análise e otimização de custos.
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC
Oleodutos declarativos Lakeflow Spark
O log de eventos do Lakeflow Spark Declarative Pipelines captura um registro abrangente de todos os eventos do pipeline, incluindo:
- Logs de auditoria.
- Verificações da qualidade dos dados.
- Progresso do gasoduto.
- Linhagem de dados.
O log de eventos é ativado automaticamente para todos os Lakeflow Spark Declarative Pipelines e pode ser acessado via:
- Interface do usuário do pipeline: visualize os logs diretamente.
- API de pipelines: acesso programático.
- Consulta direta: consulte a tabela do log de eventos.
Para obter mais informações, consulte esquema de log de eventos para Lakeflow Spark Declarative Pipelines.
Exemplos de consultas
Essas consultas de exemplo ajudam a monitorar o desempenho e a integridade dos pipelines, fornecendo principais métricas, como duração do lote, taxa de transferência, contrapressão e uso de recursos.
Duração média do lote
Esta consulta calcula o tempo médio de processamento dos lotes pelo pipeline.
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Rendimento médio
Esta consulta calcula o desempenho médio do pipeline em termos de linhas processadas por segundo.
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Backpressure
Esta consulta mede a retropressão do fluxo de dados verificando a lista de pendências de dados.
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
Utilização de clusters e slots
Esta consulta fornece informações sobre como os clusters ou slots são utilizados pelo pipeline.
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
Jobs
Você pode monitorar consultas de streaming em trabalhos por meio do Streaming Query Listener.
Anexe um ouvinte à sessão do Spark para habilitar o Streaming Query Listener no Azure Databricks. Esse ouvinte monitorará o progresso e as métricas de suas consultas de streaming. Ele pode ser usado para enviar métricas para ferramentas de monitoramento externas ou registrá-las para análise posterior.
Exemplo: exportar métricas para ferramentas de monitoramento externo
Note
Isso está disponível no Databricks Runtime 11.3 LTS e superior para Python e Scala.
Você pode exportar métricas de streaming para serviços externos para alertas ou painéis usando a StreamingQueryListener interface.
Aqui está um exemplo básico de como implementar um ouvinte:
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
Exemplo: Usar ouvinte de consulta no Azure Databricks
Abaixo está um exemplo de um registo de eventos do StreamingQueryListener para um fluxo de dados Kafka to Delta Lake:
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
Para mais exemplos, consulte: Exemplos.
Métricas de progresso da consulta
As métricas de progresso da consulta são essenciais para monitorar o desempenho e a integridade de suas consultas de streaming. Essas métricas incluem o número de linhas de entrada, taxas de processamento e várias durações relacionadas à execução da consulta. Você pode observar essas métricas anexando um StreamingQueryListener à sessão do Spark. O ouvinte emitirá eventos contendo essas métricas no final de cada época de streaming.
Pode aceder a métricas utilizando o mapa StreamingQueryProgress.observedMetrics no método onQueryProgress do ouvinte. Isso permite que você acompanhe e analise o desempenho de suas consultas de streaming em tempo real.
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)