Оптимизация и мониторинг производительности запросов в режиме реального времени

На этой странице рассматриваются параметры вычислений, методы сокращения сквозной задержки и подходы к измерению производительности запросов в режиме реального времени.

Настройка вычислений

При настройке вычислений рассмотрите следующее:

  • В отличие от микробатчевого режима, задачи в режиме реального времени могут оставаться в состоянии простоя во время ожидания данных, поэтому важно правильно распределять ресурсы, чтобы избежать их потерь.
  • Достичь уровня использования кластера, например 50%, путем настройки:
    • maxPartitions (для Kafka)
    • spark.sql.shuffle.partitions (для этапов перетасовки)
  • Databricks рекомендует задать параметр maxPartitions , чтобы каждая задача обрабатывала несколько секций Kafka, чтобы снизить затраты.
  • Настройте слоты задач для каждого рабочего в соответствии с рабочей нагрузкой для простых одноэтапных задач.
  • Для задач с интенсивным использованием ресемплирования проведите эксперименты, чтобы определить минимальное количество секций ресемплирования, которое позволит избежать задержек, и корректируйте настройки оттуда. Вычислительный узел не запланирует задание, если у него недостаточно слотов.

Замечание

В Databricks Runtime 16.4 LTS и более поздних версиях все конвейеры в режиме реального времени используют контрольные точки v2 для простого перехода между режимами реального времени и микропакетного режима.

Оптимизация задержки

Структурированный режим потоковой передачи в режиме реального времени имеет необязательные методы снижения сквозной задержки. По умолчанию ни одна из этих функций не включена. Их необходимо включить отдельно.

Мониторинг и наблюдаемость

В режиме реального времени традиционные метрики продолжительности пакета не отражают фактическую сквозную задержку. Используйте приведенные ниже подходы, чтобы точно определить задержку и определить узкие места в запросах.

Сквозная задержка по времени является специфичной для конкретной рабочей нагрузки и её точное измерение иногда возможно только с учётом бизнес-логики. Например, если исходная метка времени выводится в Kafka, можно вычислить задержку в качестве разницы между меткой времени вывода Kafka и исходной меткой времени.

Встроенные метрики с StreamingQueryProgress

Событие StreamingQueryProgress автоматически регистрируется в журналах драйвера и доступно через функцию обратного вызова StreamingQueryListener и onQueryProgress(). Это позволяет реагировать на события хода выполнения программным способом, например, если вы хотите опубликовать метрики во внешней системе мониторинга. QueryProgressEvent.json() или toString() включите следующие метрики режима реального времени:

  1. Задержка обработки (processingLatencyMs). Время, затраченное между тем, когда запрос в режиме реального времени считывает запись и когда запрос записывает его на следующий этап или ниже. Для одноэтапных запросов это измеряет ту же длительность, что и сквозная задержка. Система сообщает эту метрику для каждой задачи.
  2. Задержка очереди источника (sourceQueuingLatencyMs). Время, прошедшее с момента записи системой записи в шину сообщений (например, время добавления журнала в Kafka) до момента, когда запрос в режиме реального времени впервые считывает запись. Система сообщает эту метрику для каждой задачи.
  3. Сквозная задержка (e2eLatencyMs). Время, когда система записывает запись в шину сообщений и когда запрос в режиме реального времени записывает запись вниз. Система агрегирует эту метрику на пакетной основе во всех записях, обработанных всеми задачами.

Рассмотрим пример.

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    }
}

Настраиваемое измерение задержки с помощью API наблюдения

API наблюдения позволяет измерять задержку встроенным образом, не запуская отдельное задание. Если у вас есть метка времени источника, которая приближает время прибытия исходных данных, вы можете оценить задержку на каждом пакете, записав метку времени перед узлом приема и вычислив разницу. Результаты отображаются в отчетах о ходе выполнения и доступны прослушивателям.

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Образец вывода:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Дополнительные ресурсы