Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
На этой странице рассматриваются параметры вычислений, методы сокращения сквозной задержки и подходы к измерению производительности запросов в режиме реального времени.
Настройка вычислений
При настройке вычислений рассмотрите следующее:
- В отличие от микробатчевого режима, задачи в режиме реального времени могут оставаться в состоянии простоя во время ожидания данных, поэтому важно правильно распределять ресурсы, чтобы избежать их потерь.
- Достичь уровня использования кластера, например 50%, путем настройки:
-
maxPartitions(для Kafka) -
spark.sql.shuffle.partitions(для этапов перетасовки)
-
- Databricks рекомендует задать параметр
maxPartitions, чтобы каждая задача обрабатывала несколько секций Kafka, чтобы снизить затраты. - Настройте слоты задач для каждого рабочего в соответствии с рабочей нагрузкой для простых одноэтапных задач.
- Для задач с интенсивным использованием ресемплирования проведите эксперименты, чтобы определить минимальное количество секций ресемплирования, которое позволит избежать задержек, и корректируйте настройки оттуда. Вычислительный узел не запланирует задание, если у него недостаточно слотов.
Замечание
В Databricks Runtime 16.4 LTS и более поздних версиях все конвейеры в режиме реального времени используют контрольные точки v2 для простого перехода между режимами реального времени и микропакетного режима.
Оптимизация задержки
Структурированный режим потоковой передачи в режиме реального времени имеет необязательные методы снижения сквозной задержки. По умолчанию ни одна из этих функций не включена. Их необходимо включить отдельно.
- Асинхронное отслеживание хода выполнения. Перемещает записи в журналы смещения и фиксации в асинхронный поток, уменьшая меж пакетное время для запросов без отслеживания состояния.
- Асинхронная контрольная точка состояния: начинает обработку следующего микропакета сразу после завершения вычисления, не ожидая контрольных точек состояния, уменьшая задержку для запросов с отслеживанием состояния.
Мониторинг и наблюдаемость
В режиме реального времени традиционные метрики продолжительности пакета не отражают фактическую сквозную задержку. Используйте приведенные ниже подходы, чтобы точно определить задержку и определить узкие места в запросах.
Сквозная задержка по времени является специфичной для конкретной рабочей нагрузки и её точное измерение иногда возможно только с учётом бизнес-логики. Например, если исходная метка времени выводится в Kafka, можно вычислить задержку в качестве разницы между меткой времени вывода Kafka и исходной меткой времени.
Встроенные метрики с StreamingQueryProgress
Событие StreamingQueryProgress автоматически регистрируется в журналах драйвера и доступно через функцию обратного вызова StreamingQueryListener и onQueryProgress(). Это позволяет реагировать на события хода выполнения программным способом, например, если вы хотите опубликовать метрики во внешней системе мониторинга.
QueryProgressEvent.json() или toString() включите следующие метрики режима реального времени:
-
Задержка обработки (
processingLatencyMs). Время, затраченное между тем, когда запрос в режиме реального времени считывает запись и когда запрос записывает его на следующий этап или ниже. Для одноэтапных запросов это измеряет ту же длительность, что и сквозная задержка. Система сообщает эту метрику для каждой задачи. -
Задержка очереди источника (
sourceQueuingLatencyMs). Время, прошедшее с момента записи системой записи в шину сообщений (например, время добавления журнала в Kafka) до момента, когда запрос в режиме реального времени впервые считывает запись. Система сообщает эту метрику для каждой задачи. -
Сквозная задержка (
e2eLatencyMs). Время, когда система записывает запись в шину сообщений и когда запрос в режиме реального времени записывает запись вниз. Система агрегирует эту метрику на пакетной основе во всех записях, обработанных всеми задачами.
Рассмотрим пример.
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
}
}
Настраиваемое измерение задержки с помощью API наблюдения
API наблюдения позволяет измерять задержку встроенным образом, не запуская отдельное задание. Если у вас есть метка времени источника, которая приближает время прибытия исходных данных, вы можете оценить задержку на каждом пакете, записав метку времени перед узлом приема и вычислив разницу. Результаты отображаются в отчетах о ходе выполнения и доступны прослушивателям.
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
Образец вывода:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}