优化有状态结构化流式处理查询

管理有状态结构化流式处理查询的中间状态信息有助于防止意外的延迟和生产问题。

Databricks 建议:

  • 使用计算优化的实例作为工作器。
  • 将无序分区的数量设置为群集中的核心数的 1-2 倍。
  • 在 SparkSession 中,将 spark.sql.streaming.noDataMicroBatches.enabled 配置设置为 false。 这可阻止流式微批处理引擎处理不包含数据的微批处理。 另请注意,将此配置设置为 false 可能会导致有状态操作,该操作利用水印或处理时间超时,直到新数据到达时才获取数据输出,而不是立即获取数据输出。

Databricks 建议将 RocksDB 与 changelog 检查点配合使用来管理有状态流的状态。 请参阅在 Azure Databricks 上配置 RocksDB 状态存储

注意

无法在两次查询重启之间更改状态管理方案。 也就是说,如果使用默认管理启动了某个查询,但不使用新的检查点位置从头启动查询,则无法更改该查询。

在结构化流式处理中使用多个有状态运算符

在 Databricks Runtime 13.3 LTS 及更高版本中,Azure Databricks 为结构化流式处理工作负载中的有状态运算符提供高级支持。 现在可以将多个有状态运算符链接在一起,这意味着可以将操作的输出(如开窗聚合)馈送到另一个有状态操作(如联接)。

以下示例演示了可以使用的几种模式。

重要

使用多个有状态运算符时存在以下限制:

  • 不支持 FlatMapGroupWithState
  • 仅支持追加输出模式。

链接的时间窗口聚合

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

两个不同流中的时间窗口聚合,后跟流间窗口联接

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

流间时间间隔联接,后跟时间窗口聚合

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

结构化流的状态重新平衡

默认情况下,为 Delta Live Tables 中的所有流式处理工作负载启用状态重新平衡。 在 Databricks Runtime 11.3 LTS 及更高版本中,可以在 Spark 群集配置中设置以下配置选项,以启用状态再平衡:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

状态再平衡有利于经历群集大小重设事件的有状态结构化流式处理管道。 无论是否更改群集大小,无状态流式处理操作都不会受益。

注意

计算自动缩放在缩减结构化流式处理工作负载的群集大小方面存在限制。 Databricks 建议将增量实时表与增强式自动缩放用于流式处理工作负载。 请参阅使用增强型自动缩放来优化增量实时表管道的群集利用率

群集大小调整事件会导致触发状态重新平衡。 在重新平衡事件期间,当状态从云存储加载到新执行程序时,微批处理可能具有较高的延迟。

mapGroupsWithState 指定初始状态

可以使用 flatMapGroupsWithStatemapGroupsWithState 为结构化流式处理有状态处理指定用户定义的初始状态。 当在不使用有效检查点的情况下启动有状态流时,这样做就可避免重新处理数据。

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

以下示例用例指定 flatMapGroupsWithState 运算符的初始状态:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

以下示例用例指定 mapGroupsWithState 运算符的初始状态:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

测试 mapGroupsWithState 更新函数

利用 TestGroupState API,你可以测试用于 Dataset.groupByKey(...).mapGroupsWithState(...)Dataset.groupByKey(...).flatMapGroupsWithState(...) 的状态更新函数。

状态更新函数使用 GroupState 类型的对象获取先前的状态作为输入。 请参阅 Apache Spark GroupState 参考文档。 例如:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}