Condividi tramite


Ottimizzare e monitorare le prestazioni delle query in modalità in tempo reale

Questa pagina illustra l'ottimizzazione del calcolo, le tecniche per ridurre la latenza end-to-end e gli approcci per misurare le prestazioni delle query in modalità in tempo reale.

Ottimizzazione del calcolo

Quando si configura il calcolo, tenere presente quanto segue:

  • A differenza della modalità micro batch, le attività in tempo reale possono rimanere inattive durante l'attesa dei dati, quindi il ridimensionamento corretto è essenziale per evitare sprechi di risorse.
  • Puntare a un livello di utilizzo del cluster di destinazione, ad esempio 50%, ottimizzando:
    • maxPartitions (per Kafka)
    • spark.sql.shuffle.partitions (per fasi di rimescolamento)
  • Databricks consiglia di impostare maxPartitions in modo che ogni attività gestisca più partizioni Kafka per ridurre il sovraccarico.
  • Modificare le fasi delle attività per ogni lavoratore in modo che corrispondano al carico di lavoro per processi a una fase.
  • Per i lavori con carichi intensivi di shuffle, provare a trovare il numero minimo di partizioni di shuffle che evitano i backlog e regolare di conseguenza. Il calcolo non pianifica il processo se non dispone di slot sufficienti.

Annotazioni

Da Databricks Runtime 16.4 LTS e versioni successive, tutte le pipeline in tempo reale utilizzano checkpoint v2 per facilitare una commutazione senza problemi tra modalità in tempo reale e micro-batch.

Ottimizzazione della latenza

La modalità in tempo reale di Structured Streaming include tecniche facoltative per ridurre la latenza end-to-end. Nessuno dei due è abilitato per impostazione predefinita. È necessario abilitarli separatamente.

  • Monitoraggio asincrono del progresso: sposta le scritture nei log di offset e commit in un thread asincrono, riducendo il tempo tra batch per le query senza stato.
  • Checkpoint dello stato asincrono: inizia l'elaborazione del micro batch successivo non appena viene completato il calcolo, senza attendere il checkpoint dello stato, riducendo la latenza per le query con stato.

Monitoraggio e osservabilità

In modalità in tempo reale, le metriche della durata del batch tradizionale non riflettono la latenza end-to-end effettiva. Utilizzare gli approcci seguenti per misurare la latenza in modo accurato e identificare i colli di bottiglia nelle query.

La latenza end-to-end è specifica del carico di lavoro e talvolta può essere misurata con precisione solo con la logica di business. Ad esempio, se il timestamp di origine viene restituito in Kafka, è possibile calcolare la latenza come differenza tra il timestamp di output di Kafka e il timestamp di origine.

Metriche predefinite con StreamingQueryProgress

L'evento StreamingQueryProgress viene registrato automaticamente nei log del driver ed è accessibile tramite la funzione di callback di StreamingQueryListeneronQueryProgress(). Ciò consente di reagire agli eventi di stato a livello di codice, ad esempio se si desidera pubblicare metriche in un sistema di monitoraggio esterno. QueryProgressEvent.json() o toString() includono queste metriche in modalità in tempo reale:

  1. Latenza di elaborazione (processingLatencyMs). Tempo trascorso tra quando la query in modalità in tempo reale legge un record e quando la query lo scrive nella fase successiva o downstream. Per le query a fase singola, questa misura del tempo corrisponde alla durata della latenza end-to-end. Il sistema segnala questa metrica per ogni attività.
  2. Latenza di accodamento della sorgente (sourceQueuingLatencyMs). La quantità di tempo trascorsa tra quando il sistema scrive un record in un bus di messaggi, ad esempio il tempo di accodamento del log in Kafka, e quando la query in modalità in tempo reale legge il record per la prima volta. Il sistema segnala questa metrica per ogni attività.
  3. Latenza end-to-end (e2eLatencyMs). Tempo compreso tra il momento in cui il sistema scrive il record in un bus di messaggi e quando la query in modalità in tempo reale scrive il record downstream. Il sistema aggrega questa metrica per batch in tutti i record elaborati da tutte le attività.

Per esempio:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    }
}

Misurazione della latenza personalizzata con l'API Observe

L'API Observe consente di misurare la latenza inline senza avviare un processo separato. Se si dispone di un timestamp di origine che approssima l'ora di arrivo dei dati di origine, è possibile stimare la latenza per batch registrando un timestamp prima del sink e calcolando la differenza. I risultati vengono visualizzati nei report in corso e sono disponibili per i listener.

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Output di esempio:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}