Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
La surveillance des performances, des coûts et de l’intégrité de vos applications de streaming est essentielle pour créer des pipelines ETL fiables et efficaces. Azure Databricks fournit un ensemble complet de fonctionnalités d’observabilité pour les Jobs, les pipelines déclaratifs Lakeflow Spark et Lakeflow Connect afin de diagnostiquer les goulots d’étranglement, optimiser les performances et gérer l’utilisation et les coûts des ressources.
Cet article décrit les meilleures pratiques dans les domaines suivants :
- Métriques de performances de diffusion en continu clés
- Schémas du journal des événements et exemples de requêtes
- Surveillance des requêtes de streaming
- Observabilité des coûts à l’aide de tables système
- Exportation des logs et des métriques vers des outils externes
Métriques clés pour l’observabilité de streaming
Lors de l’exploitation des pipelines de diffusion en continu, surveillez les métriques clés suivantes :
| Metric | Purpose |
|---|---|
| Backpressure | Surveille le nombre de fichiers et les décalages (tailles). Permet d’identifier les goulots d’étranglement et de s’assurer que le système peut gérer les données entrantes sans tomber en arrière. |
| Throughput | Suit le nombre de messages traités par micro-lot. Évaluez l’efficacité du pipeline et vérifiez qu’il suit le rythme de l’ingestion des données. |
| Duration | Mesure la durée moyenne d’un micro-lot. Indique la vitesse de traitement et aide à paramétrer les intervalles de traitement. |
| Latency | Indique le nombre d’enregistrements/messages traités au fil du temps. Permet de comprendre les retards de pipeline de bout en bout et d’optimiser les latences inférieures. |
| Utilisation du cluster | Reflète l’utilisation du processeur et de la mémoire (%). Garantit une utilisation efficace des ressources et aide à mettre à l’échelle les clusters pour répondre aux exigences en matière de traitement. |
| Network | Mesure les données transférées et reçues. Utile pour identifier les goulots d’étranglement réseau et améliorer les performances de transfert de données. |
| Checkpoint | Identifie les données traitées et les décalages. Garantit la cohérence et active la tolérance de panne pendant les défaillances. |
| Cost | Affiche les coûts horaires, quotidiens et mensuels d’une application de diffusion en continu. Aide à la planification budgétaire et à l’optimisation des ressources. |
| Lineage | Affiche les jeux de données et les couches créés dans l’application de diffusion en continu. Facilite la transformation des données, le suivi, l’assurance qualité et le débogage. |
Journaux et métriques de cluster
Les journaux et métriques du cluster Azure Databricks fournissent des insights détaillés sur les performances et l’utilisation du cluster. Ces journaux et métriques incluent des informations sur le processeur, la mémoire, les E/S de disque, le trafic réseau; et d’autres métriques systèmes. La surveillance de ces métriques est essentielle pour optimiser les performances du cluster, gérer efficacement les ressources et résoudre les problèmes.
Les journaux et métriques du cluster Azure Databricks offrent des insights détaillés sur les performances du cluster et l’utilisation des ressources. Il s’agit notamment de l’utilisation du processeur et de la mémoire, des E/S disque et du trafic réseau. La surveillance de ces métriques est essentielle pour :
- Optimisation des performances du cluster.
- Gestion efficace des ressources.
- Résolution des problèmes opérationnels.
Les métriques peuvent être exploitées via l’interface utilisateur Databricks ou exportées vers des outils de supervision personnelle. Consultez Exemple de notebook : métriques Datadog.
Interface utilisateur Spark
L’interface utilisateur Spark affiche des informations détaillées sur la progression des travaux et des étapes, notamment le nombre de tâches terminées, en attente et ayant échoué. Cela vous aide à comprendre le flux d’exécution et à identifier les goulots d’étranglement.
Pour les applications de diffusion en continu, l’onglet Streaming affiche des métriques telles que le taux d’entrée, le taux de traitement et la durée du traitement. Il vous aide à surveiller les performances de vos travaux de streaming et à identifier les problèmes d’ingestion ou de traitement des données.
Pour plus d’informations, consultez Débogage avec l’interface utilisateur Spark .
Métriques de calcul
Les métriques de calcul vous aideront à comprendre l’utilisation du cluster. Pendant que votre travail s’exécute, vous pouvez voir comment il évolue ainsi que l’impact sur vos ressources. Vous serez en mesure de détecter une pression exercée sur la mémoire qui pourrait entraîner des défaillances OOM ou une pression exercée sur le processeur qui pourrait causer de longs retards. Voici les métriques spécifiques que vous verrez :
- Distribution de charge du serveur : utilisation du processeur de chaque nœud au cours de la dernière minute.
- Utilisation du processeur : pourcentage de temps passé par l’UC dans différents modes (par exemple, utilisateur, système, inactif et iowait).
- Utilisation de la mémoire : utilisation totale de la mémoire par chaque mode (par exemple, utilisé, libre, tampon et mis en cache).
- Utilisation de l’échange de mémoire : utilisation totale de l’échange de mémoire.
- Espace système de fichiers libre : utilisation totale du système de fichiers par chaque point de montage.
- Débit réseau : nombre d’octets reçus et transmis via le réseau par chaque appareil.
- Nombre de nœuds actifs : nombre de nœuds actifs à chaque horodatage pour le calcul donné.
Pour plus d’informations, consultez Surveiller les performances et les graphiques de métriques matériels .
Tables système
Surveillance des coûts
Les tables système Azure Databricks fournissent une approche structurée pour surveiller les coûts et les performances des travaux. Ces tables incluent :
- Détails de l’exécution du travail.
- Utilisation des ressources.
- Coûts associés.
Utilisez ces tableaux pour comprendre l'impact opérationnel et financier.
Requirements
Pour utiliser des tables système pour la surveillance des coûts :
- Un administrateur de compte doit activer le
system.lakeflow schema. - Les utilisateurs doivent :
- Être à la fois un administrateur de metastore et un administrateur de compte, ou
- Disposez d’autorisations
USEetSELECTsur les schémas système.
Exemple de requête : travaux les plus coûteux (30 derniers jours)
Cette requête identifie les travaux les plus coûteux au cours des 30 derniers jours, ce qui facilite l’analyse et l’optimisation des coûts.
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC
Pipelines déclaratifs Lakeflow Spark
Le journal des événements des pipelines déclaratifs Spark de Lakeflow capture un enregistrement complet de tous les événements de pipeline, notamment :
- Journaux d’audit.
- Vérifications de la qualité des données.
- Progression du pipeline.
- Traçabilité des données.
Le journal des événements est automatiquement activé pour tous les pipelines déclaratifs Spark Lakeflow et est accessible via :
- Interface utilisateur du pipeline : afficher les journaux directement.
- API Pipelines : accès par programmation.
- Requête directe : interrogez la table du journal des événements.
Pour plus d’informations, consultez le schéma du journal des événements pour les pipelines déclaratifs Spark Lakeflow.
Exemples de requêtes
Ces exemples de requêtes permettent de surveiller les performances et l’intégrité des pipelines en fournissant des métriques clés telles que la durée du lot, le débit, la rétropression et l’utilisation des ressources.
Durée moyenne du lot
Cette requête calcule la durée moyenne des lots traités par le pipeline.
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Débit moyen
Cette requête calcule le débit moyen du pipeline en termes de lignes traitées par seconde.
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Backpressure
Cette requête mesure la contre-pression du pipeline en vérifiant le retard accumulé des données.
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
Utilisation du cluster et des emplacements
Cette requête fournit des insights sur l’utilisation des clusters ou des emplacements utilisés par le pipeline.
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
Jobs
Vous pouvez surveiller les requêtes de streaming dans les travaux à l’aide de l’écouteur de requêtes de streaming.
Attachez un écouteur à la session Spark pour activer l’écouteur de requête de streaming dansAzure Databricks. Cet écouteur surveille la progression et les métriques de vos requêtes de streaming. Il peut être utilisé pour envoyer (push) des métriques vers des outils de supervision externes ou les journaliser pour une analyse plus approfondie.
Exemple : Exporter des métriques vers des outils de supervision externes
Note
Ceci est disponible dans Databricks Runtime 11.3 LTS et versions ultérieures pour Python et Scala.
Vous pouvez exporter des métriques de diffusion en continu vers des services externes pour l’alerte ou le tableau de bord à l’aide de l’interface StreamingQueryListener .
Voici un exemple de base de l’implémentation d’un écouteur :
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
Exemple : Utiliser l’écouteur de requête dans Azure Databricks
Voici un exemple de log des événements du StreamingQueryListener pour une requête de streaming de Kafka vers Delta Lake.
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
Pour plus d’exemples, consultez : Exemples.
Métriques de progression des requêtes
Les métriques de progression des requêtes sont essentielles pour surveiller les performances et l’intégrité de vos requêtes de diffusion en continu. Ces métriques incluent le nombre de lignes d’entrée, les taux de traitement et différentes durées liées à l’exécution de la requête. Vous pouvez observer ces métriques en attachant une StreamingQueryListener à la session Spark. L’écouteur émet des événements contenant ces métriques à la fin de chaque époque de diffusion en continu.
Par exemple, vous pouvez accéder aux métriques à l’aide de la carte StreamingQueryProgress.observedMetrics dans la méthode onQueryProgress de l’écouteur. Cela vous permet de suivre et d’analyser les performances de vos requêtes de streaming en temps réel.
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)