通过


优化和监视实时模式查询性能

本页介绍计算优化、减少端到端延迟的技术,以及实时模式下测量查询性能的方法。

计算优化

配置计算时,请考虑以下事项:

  • 与微批处理模式不同,实时任务在等待数据时可以保持空闲状态,因此正确调整大小对于避免浪费的资源至关重要。
  • 目标是通过调整实现群集利用率达到特定级别,比如50%:
    • maxPartitions (对于卡夫卡)
    • spark.sql.shuffle.partitions(对于 shuffle 阶段)
  • Databricks 建议设置 maxPartitions ,以便每个任务处理多个 Kafka 分区以减少开销。
  • 调整每个工作节点的任务位置,以匹配简单单阶段作业的工作负荷。
  • 对于 shuffle 密集型作业,请尝试找到避免积压的最小 shuffle 分区数,并据此进行调整。 如果计算节点没有足够的槽,作业将不会被调度。

注释

从 Databricks Runtime 16.4 LTS 及更高版本开始,所有实时流水线都使用检查点 v2,以允许在实时模式和微批处理模式之间无缝切换。

延迟优化

结构化流式处理实时模式具有用于减少端到端延迟的可选技术。 默认情况下两者均未启用。 必须单独启用它们。

  • 异步进度跟踪:将写入偏移量和提交日志移入异步线程中,从而减少无状态查询的批处理间隔时间。
  • 异步状态检查点:在计算完成后立即开始处理下一个微批处理,而无需等待状态检查点,从而减少有状态查询的延迟。

监视和可观测性

在实时模式下,传统的批处理持续时间指标不会反映实际的端到端延迟。 使用以下方法准确测量延迟,并确定查询中的瓶颈。

端到端延迟特定于工作负荷,有时只能使用业务逻辑准确测量。 例如,如果源时间戳在 Kafka 中输出,则可以计算延迟,因为 Kafka 的输出时间戳与源时间戳之间的差异。

包含的内置指标 StreamingQueryProgress

StreamingQueryProgress 事件会自动记录在驱动程序日志中,并通过 StreamingQueryListeneronQueryProgress() 回调函数”进行访问。 这样,便可以以编程方式对进度事件做出反应,例如,如果要将指标发布到外部监视系统。 QueryProgressEvent.json()toString() 包括以下实时模式指标:

  1. 处理延迟processingLatencyMs)。 从实时模式查询读取记录到查询将其写入到下一个阶段或下游所经过的时间。 对于单阶段查询,这将测量与端到端延迟相同的持续时间。 系统按任务报告此指标。
  2. 源队列延迟sourceQueuingLatencyMs)。 当系统将记录写入消息总线(例如,Kafka 中的日志追加时间)以及实时模式查询首次读取记录时所经过的时间。 系统按任务报告此指标。
  3. 端到端延迟e2eLatencyMs)。 系统将记录写入消息总线和实时模式查询写入下游记录之间的时间。 系统跨所有任务处理的所有记录按批次聚合此指标。

例如:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    }
}

使用观察 API 进行自定义延迟度量

使用观察 API 可以内联测量延迟,而无需启动单独的作业。 如果源时间戳近似于源数据到达时间,可以通过在接收器之前记录时间戳并计算差异来估计每批延迟。 结果显示在进度报告中,并供听众查看。

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

示例输出:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

其他资源