Partager via


Optimiser et surveiller les performances des requêtes en mode temps réel

Cette page traite du réglage du calcul, des techniques de réduction de la latence de bout en bout et des approches pour mesurer les performances des requêtes en mode temps réel.

Optimisation du calcul

Lorsque vous configurez votre calcul, tenez compte des éléments suivants :

  • Contrairement au mode micro-batch, les tâches en temps réel peuvent rester inactives lors de l’attente des données, de sorte que le dimensionnement approprié est essentiel pour éviter les ressources perdues.
  • Visez un niveau d’utilisation du cluster cible, tel que 50%, en paramétrant :
    • maxPartitions (pour Kafka)
    • spark.sql.shuffle.partitions (pour les phases de lecture aléatoire)
  • Databricks recommande de définir maxPartitions afin que chaque tâche gère plusieurs partitions Kafka pour réduire la surcharge.
  • Ajustez les emplacements de tâche par travailleur pour qu'ils correspondent à la charge de travail des tâches simples à une étape.
  • Pour les travaux à forte charge de lecture aléatoire, essayez de trouver le nombre minimal de partitions de lecture aléatoire qui évitent les retards, et ajustez-les en fonction. Le système de calcul ne planifiera pas la tâche s'il n'a pas suffisamment de capacités.

Note

À partir de Databricks Runtime 16.4 LTS et des versions ultérieures, tous les pipelines en temps réel utilisent le point de contrôle v2 pour permettre des transitions transparentes entre les modes en temps réel et en micro-lots.

Optimisation de la latence

Le mode streaming structuré en temps réel présente des techniques facultatives pour réduire la latence de bout en bout. Aucun des deux n’est activé par défaut. Vous devez les activer séparément.

  • Suivi de progression asynchrone : déplace les écritures vers un thread asynchrone dans les journaux d'offset et de commit, ce qui réduit le temps d'attente entre les lots pour les requêtes sans état.
  • Point de contrôle d’état asynchrone : commence à traiter le micro-lot suivant dès que le calcul se termine, sans attendre le point de contrôle d’état, ce qui réduit la latence des requêtes avec état.

Surveillance et observabilité

En mode temps réel, les métriques de durée de traitement par lots traditionnelles ne reflètent pas la latence réelle de bout en bout. Utilisez les approches ci-dessous pour mesurer la latence avec précision et identifier les goulots d’étranglement dans vos requêtes.

La latence de bout en bout est spécifique à la charge de travail et peut parfois être mesurée avec précision avec la logique métier. Par exemple, si l’horodatage de la source est émis dans Kafka, vous pouvez calculer la latence comme la différence entre l’horodatage de sortie de Kafka et l’horodatage de la source.

Métriques intégrées avec StreamingQueryProgress

L’événement StreamingQueryProgress est automatiquement journalisé dans les journaux du pilote et accessible via la StreamingQueryListener fonction de rappel onQueryProgress(). Cela vous permet de réagir aux événements de progression par programmation, par exemple si vous souhaitez publier des métriques sur un système de supervision externe. QueryProgressEvent.json() ou toString() incluez ces métriques en mode temps réel :

  1. Latence de traitement (processingLatencyMs). Temps écoulé entre le moment où la requête en mode temps réel lit un enregistrement et lorsque la requête l’écrit à la phase suivante ou en aval. Pour les requêtes à étape unique, cela mesure la même durée que la latence de bout en bout. Le système signale cette métrique par tâche.
  2. Latence de file d’attente de la source (sourceQueuingLatencyMs). Durée écoulée entre le moment où le système écrit un enregistrement dans un bus de messages( par exemple, le temps d’ajout du journal dans Kafka) et lorsque la requête en mode temps réel lit d’abord l’enregistrement. Le système signale cette métrique par tâche.
  3. Latence de bout en bout (e2eLatencyMs). Temps entre le moment où le système écrit l’enregistrement dans un bus de messages et lorsque la requête en mode temps réel écrit l’enregistrement en aval. Le système agrège cette métrique par lot sur tous les enregistrements traités par toutes les tâches.

Par exemple:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    }
}

Mesure de latence personnalisée avec l’API Observer

L’API Observer vous permet de mesurer la latence inline sans lancer un travail distinct. Si vous disposez d’un horodatage source qui correspond approximativement à l’heure d’arrivée des données sources, il est possible d’estimer la latence par lot en enregistrant un horodatage avant le puits et en calculant la différence. Les résultats apparaissent dans les rapports de progression et sont disponibles pour les écouteurs.

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Exemple de sortie :

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Ressources additionnelles