Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
La supervisión del rendimiento, el costo y el estado de las aplicaciones de streaming es esencial para crear canalizaciones de ETL confiables y eficaces. Azure Databricks proporciona un amplio conjunto de funcionalidades de observabilidad en Jobs, Canalizaciones Declarativas de Lakeflow y Lakeflow Connect para ayudar a diagnosticar cuellos de botella, optimizar el rendimiento y administrar el uso y costos de recursos.
En este artículo se describen los procedimientos recomendados en las siguientes áreas:
- Métricas clave de rendimiento de streaming
- Esquemas de registro de eventos y consultas de ejemplo
- Supervisión de consultas de streaming
- Observabilidad de costos mediante tablas del sistema
- Exportación de registros y métricas a herramientas externas
Métricas clave para la observabilidad de streaming
Al operar canalizaciones de streaming, supervise las siguientes métricas clave:
Métrica | propósito |
---|---|
Contrapresión | Supervisa el número de archivos y desplazamientos (tamaños). Ayuda a identificar cuellos de botella y garantiza que el sistema pueda controlar los datos entrantes sin que se quede atrás. |
Rendimiento | Realiza un seguimiento del número de mensajes procesados por microproceso. Evalúe la eficacia de la canalización y compruebe que sigue el ritmo de la ingesta de datos. |
Duración | Mide la duración media de un microproceso. Indica la velocidad de procesamiento y ayuda a optimizar los intervalos por lotes. |
Latencia | Indica cuántos registros o mensajes se procesan con el tiempo. Ayuda a comprender los retrasos de la canalización de un extremo a otro y a optimizar las latencias más bajas. |
Uso del clúster | Refleja el uso de cpu y memoria (%). Garantiza un uso eficaz de los recursos y ayuda a escalar clústeres para satisfacer las demandas de procesamiento. |
Red | Mide los datos transferidos y recibidos. Resulta útil para identificar cuellos de botella de red y mejorar el rendimiento de la transferencia de datos. |
Punto de control | Identifica los datos procesados y los desplazamientos. Garantiza la coherencia y habilita la tolerancia a errores durante los errores. |
Costo | Muestra los costos por hora, diario y mensual de una aplicación de streaming. Ayuda en la optimización de presupuestos y recursos. |
Linaje | Muestra conjuntos de datos y capas creados en la aplicación de streaming. Facilita la transformación, el seguimiento, la garantía de calidad y la depuración de datos. |
Registros y métricas del clúster
Los registros y métricas del clúster de Azure Databricks proporcionan información detallada sobre el rendimiento y el uso del clúster. Estos registros y métricas incluyen información sobre cpu, memoria, E/S de disco, tráfico de red y otras métricas del sistema. La supervisión de estas métricas es fundamental para optimizar el rendimiento del clúster, administrar los recursos de forma eficaz y solucionar problemas.
Los registros y las métricas del clúster de Azure Databricks ofrecen información detallada sobre el rendimiento del clúster y el uso de recursos. Entre ellas se incluyen el uso de CPU y memoria, la E/S de disco y el tráfico de red. La supervisión de estas métricas es fundamental para:
- Optimización del rendimiento del clúster.
- Administrar recursos de forma eficaz.
- Solución de problemas operativos.
Las métricas se pueden aprovechar a través de la interfaz de usuario de Databricks o exportarlas a herramientas de supervisión personales. Vea Ejemplo de Notebook: Métricas de Datadog.
Interfaz de usuario de Spark
La interfaz de usuario de Spark muestra información detallada sobre el progreso de los trabajos y las fases, incluido el número de tareas completadas, pendientes y con errores. Esto te ayuda a entender el flujo de ejecución y a identificar cuellos de botella.
En el caso de las aplicaciones de streaming, la pestaña Streaming muestra métricas como la velocidad de entrada, la velocidad de procesamiento y la duración del lote. Ayuda a supervisar el rendimiento de los trabajos de streaming e identificar cualquier problema de ingesta o procesamiento de datos.
Consulte Depuración con la interfaz de usuario de Apache Spark para obtener más información.
Métricas de cálculo
Las métricas de proceso le ayudarán a comprender el uso del clúster. A medida que se ejecuta el trabajo, puede ver cómo se escala y cómo se ven afectados los recursos. Podrá encontrar presión de memoria que podría provocar errores de OOM o presión de CPU que podrían provocar retrasos prolongados. Estas son las métricas específicas que verá:
- Distribución de carga del servidor: el uso de CPU de cada nodo durante el último minuto.
- Uso de CPU: el porcentaje de tiempo que la CPU pasó en varios modos (por ejemplo, usuario, sistema, inactivo y iowait).
- Uso de memoria: uso total de memoria por cada modo (por ejemplo, usado, libre, búfer y almacenado en caché).
- Utilización del swap de memoria: uso total del swap de memoria.
- Espacio libre del sistema de archivos: uso total del sistema de archivos por cada punto de montaje.
- Rendimiento de red: el número de bytes recibidos y transmitidos a través de la red por cada dispositivo.
- Número de nodos activos: número de nodos activos en cada marca de tiempo para el proceso especificado.
Consulte Supervisar el rendimiento y Gráficos de métricas de hardware para obtener más información.
Tablas del sistema
Supervisión de costos
Las tablas del sistema de Azure Databricks proporcionan un enfoque estructurado para supervisar el costo y el rendimiento del trabajo. Estas tablas incluyen:
- Detalles de ejecución del trabajo.
- Uso de recursos.
- Costos asociados.
Use estas tablas para comprender el estado operativo y el impacto financiero.
Requisitos
Para usar tablas del sistema para la supervisión de costos:
- Un administrador de cuenta debe habilitar el
system.lakeflow schema
. - Los usuarios deben, o bien:
- Ser un administrador de metastore y un administrador de cuenta, o
- Tener permisos
USE
ySELECT
en los esquemas del sistema.
Consulta de ejemplo: Trabajos más caros (últimos 30 días)
Esta consulta identifica los trabajos más caros en los últimos 30 días, lo que ayuda en el análisis y la optimización de costos.
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC
Canalizaciones declarativas de Lakeflow
El registro de eventos de Canalizaciones declarativas de Lakeflow captura un registro completo de todos los eventos de canalización, incluidos:
- Registros de auditoría.
- Comprobaciones de calidad de datos.
- Progreso de la canalización.
- Rastreo de origen de datos.
El registro de eventos se habilita automáticamente para todas las canalizaciones declarativas de Lakeflow y se puede acceder a él a través de:
- Interfaz de usuario de Pipeline: vea los registros directamente.
- API DLT: acceso mediante programación.
- Consulta directa: consulte la tabla de registro de eventos.
Para obtener más información, consulte Esquema de registro de eventos para canalizaciones declarativas de Lakeflow.
Consultas de ejemplo
Estas consultas de ejemplo ayudan a supervisar el rendimiento y el estado de las canalizaciones proporcionando métricas clave, como la duración del lote, el rendimiento, la represión y el uso de recursos.
Duración media del lote
Esta consulta calcula la duración media de los lotes procesados por la canalización.
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Rendimiento medio
Esta consulta calcula el rendimiento medio de la canalización en términos de filas procesadas por segundo.
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Contrapresión
Esta consulta mide la contrapresión de la canalización comprobando la acumulación de datos.
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
Uso de clústeres y ranuras
Esta consulta tiene información sobre el uso de clústeres o ranuras utilizadas por la canalización.
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
Trabajos
Puede supervisar las consultas de streaming en trabajos a través del agente de escucha de consultas de streaming.
Adjunte un agente de escucha a la sesión de Spark para habilitar el agente de escucha de consultas de streaming enAzure Databricks. Este agente de escucha supervisará el progreso y las métricas de las consultas de streaming. Se puede usar para insertar métricas en herramientas de supervisión externas o registrarlas para su posterior análisis.
Ejemplo: Exportación de métricas a herramientas de supervisión externas
:::nota
Esto está disponible en Databricks Runtime 11.3 LTS y versiones posteriores para Python y Scala.
:::
Puede exportar métricas de streaming a servicios externos para alertas o paneles mediante la StreamingQueryListener
interfaz .
Este es un ejemplo básico de cómo implementar un agente de escucha:
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
Ejemplo: Uso del agente de escucha de consultas en Azure Databricks
A continuación se muestra un ejemplo de un registro de eventos de StreamingQueryListener para una consulta de streaming de Kafka a Delta Lake:
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
Para obtener más ejemplos, vea: Ejemplos.
Métricas de progreso de la consulta
Las métricas de progreso de las consultas son esenciales para supervisar el rendimiento y el estado de las consultas de streaming. Estas métricas incluyen el número de filas de entrada, velocidades de procesamiento y varias duraciones relacionadas con la ejecución de la consulta. Para observar estas métricas, adjunte un StreamingQueryListener
elemento a la sesión de Spark. El oyente emitirá eventos que contengan estas métricas al final de cada ciclo de transmisión.
Por ejemplo, puede acceder a las métricas mediante StreamingQueryProgress.observedMetrics
mapa en el método del oyente onQueryProgress
. Esto le permite realizar un seguimiento y analizar el rendimiento de las consultas de streaming en tiempo real.
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)