Compartir vía


Optimización y supervisión del rendimiento de las consultas en modo en tiempo real

En esta página se tratan el ajuste de proceso, las técnicas para reducir la latencia de un extremo a otro y los enfoques para medir el rendimiento de las consultas en modo en tiempo real.

Ajuste del cálculo

Al configurar su computación, tenga en cuenta lo siguiente:

  • A diferencia del modo microlote, las tareas en tiempo real pueden permanecer inactivas mientras esperan datos, por lo que el ajuste de tamaño correcto es esencial para evitar los recursos desperdiciados.
  • El objetivo es alcanzar un nivel de utilización del clúster objetivo del 50%, ajustando:
    • maxPartitions (para Kafka)
    • spark.sql.shuffle.partitions (para etapas aleatorias)
  • Databricks recomienda establecer maxPartitions para que cada tarea controle varias particiones de Kafka para reducir la sobrecarga.
  • Ajuste las ranuras de tareas por trabajador para que se ajusten a la carga de trabajo en trabajos sencillos de una sola fase.
  • Para trabajos con mucha aleatoriedad, pruebe hasta encontrar el número mínimo de particiones aleatorias que evite los retrasos y ajuste a partir de ahí. La unidad de cómputo no programará la tarea si no tiene suficientes espacios.

Nota:

Desde Databricks Runtime 16.4 LTS y versiones posteriores, todas las canalizaciones en tiempo real utilizan checkpoint v2 para permitir transiciones sin problemas entre los modos en tiempo real y de micro-lotes.

Optimización de latencia

El modo en tiempo real de Structured Streaming tiene técnicas opcionales para reducir la latencia de un extremo a otro. Ninguno está habilitado de forma predeterminada. Debe habilitarlos por separado.

  • Seguimiento de progreso asincrónico: mueve las escrituras a los registros de desplazamiento y confirmación en un subproceso asincrónico, lo que reduce el tiempo entre lotes para las consultas sin estado.
  • Punto de comprobación de estado asincrónico: comienza a procesar el siguiente microproceso tan pronto como se complete el cálculo, sin esperar a que se realicen puntos de control de estado, lo que reduce la latencia de las consultas con estado.

Supervisión y observabilidad

En el modo en tiempo real, las métricas tradicionales de duración por lotes no reflejan la latencia real de un extremo a otro. Use los enfoques siguientes para medir la latencia con precisión e identificar cuellos de botella en las consultas.

La latencia de un extremo a otro es específica de la carga de trabajo y a veces solo se puede medir con precisión con la lógica de negocios. Por ejemplo, si la marca de tiempo de origen se genera en Kafka, puede calcular la latencia como la diferencia entre la marca de tiempo generada por Kafka y la marca de tiempo de origen.

Métricas integradas con StreamingQueryProgress

El StreamingQueryProgress evento se registra automáticamente en los registros del controlador y se puede acceder a él a través de la función de callback de onQueryProgress(). Esto le permite reaccionar a los eventos de progreso mediante programación, por ejemplo, si desea publicar métricas en un sistema de supervisión externo. QueryProgressEvent.json() o toString() incluyen estas métricas en modo de tiempo real:

  1. Latencia de procesamiento (processingLatencyMs). Tiempo transcurrido entre el momento en que la consulta de modo en tiempo real lee un registro y cuando la consulta lo escribe en la siguiente fase o descendente. Para las consultas de una sola fase, esto mide la misma duración que la latencia de un extremo a otro. El sistema notifica esta métrica por tarea.
  2. Latencia de puesta en cola de origen (sourceQueuingLatencyMs). La cantidad de tiempo transcurrido entre cuando el sistema escribe un registro en un bus de mensajes (por ejemplo, el tiempo de anexión del registro en Kafka) y cuando la consulta en modo en tiempo real lee primero el registro. El sistema notifica esta métrica por tarea.
  3. Latencia de un extremo a otro (e2eLatencyMs). Tiempo entre el momento en que el sistema escribe el registro en un bus de mensajes y el momento en que la consulta en modo en tiempo real escribe el registro aguas abajo. El sistema agrega esta métrica por lote en todos los registros procesados por todas las tareas.

Por ejemplo:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    }
}

Medición de latencia personalizada con observe API

La API de Observación le permite medir la latencia en línea sin iniciar un trabajo independiente. Si tiene una marca de tiempo de origen que aproxima la hora de llegada de datos de origen, puede estimar la latencia por lote registrando una marca de tiempo antes del sumidero y luego calculando la diferencia. Los resultados aparecen en los informes de progreso y están disponibles para los oyentes.

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Resultados del ejemplo:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}