Share via


상태 저장 구조적 스트리밍 쿼리 최적화

상태 저장 구조적 스트리밍 쿼리의 중간 상태 정보를 관리하면 예기치 않은 대기 시간 및 프로덕션 문제를 방지할 수 있습니다.

Databricks는 다음을 권장합니다.

  • 컴퓨팅 최적화된 인스턴스를 작업자로 사용합니다.
  • 순서 섞기 파티션 수를 클러스터 코어 수의 1~2배로 설정합니다.
  • SparkSession에서 spark.sql.streaming.noDataMicroBatches.enabled 구성을 false로 설정합니다. 이렇게 하면 스트리밍 마이크로 일괄 처리 엔진이 데이터가 포함되지 않은 마이크로 일괄 처리를 처리할 수 없습니다. 또한 이 구성을 false로 설정하면 워터마크 또는 처리 시간 제한을 활용하는 상태 저장 작업이 데이터 출력을 즉시 가져오는 대신 새 데이터가 도착할 때까지 기다릴 수 있습니다.

Databricks는 상태 저장 스트림의 상태를 관리하기 위해 changelog 검사pointing과 함께 RocksDB를 사용하는 것이 좋습니다. Azure Databricks에서 RocksDB 상태 저장소 구성을 참조하세요.

참고 항목

쿼리를 다시 시작할 때 상태 관리 체계를 변경할 수 없습니다. 즉, 쿼리가 기본 관리로 시작된 경우 새 검사점 위치로 쿼리를 처음부터 시작하지 않는 한, 변경할 수 없습니다.

구조적 스트리밍에서 여러 상태 저장 연산자 작업

Databricks Runtime 13.3 LTS 이상에서 Azure Databricks는 구조적 스트리밍 워크로드의 상태 저장 연산자를 위한 고급 지원을 제공합니다. 이제 여러 상태 저장 연산자를 함께 연결할 수 있습니다. 즉, 창이 있는 집계와 같은 작업의 출력을 조인과 같은 다른 상태 저장 작업에 공급할 수 있습니다.

다음 예제에서는 사용할 수 있는 몇 가지 패턴을 보여 줍니다.

Important

여러 상태 저장 연산자를 사용하는 경우 다음과 같은 제한 사항이 있습니다.

  • FlatMapGroupWithState은 지원되지 않습니다.
  • 추가 출력 모드만 지원됩니다.

연결된 시간 창 집계

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

서로 다른 두 스트림의 시간 창 집계와 스트림 스트림 창 조인

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

스트림 스트림 시간 간격 조인 후 시간 창 집계

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

구조적 스트리밍에 대한 상태 재조정

상태 재조정은 기본적으로 Delta Live Tables의 모든 스트리밍 워크로드에 대해 사용하도록 설정됩니다. Databricks Runtime 11.3 LTS 이상에서는 Spark 클러스터 구성에서 다음 구성 옵션을 설정하여 상태 재조정을 사용하도록 설정할 수 있습니다.

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

상태 리밸런싱은 클러스터 크기 조정 이벤트를 거치는 상태 저장 구조적 스트리밍 파이프라인의 이점을 제공합니다. 상태 비저장 스트리밍 작업은 클러스터 크기 변경에 관계없이 도움이 되지 않습니다.

참고 항목

컴퓨팅 자동 크기 조정에는 구조적 스트리밍 워크로드에 대한 클러스터 크기를 축소하는 데 제한이 있습니다. Databricks는 스트리밍 워크로드에 대해 향상된 자동 크기 조정과 함께 Delta Live Tables를 사용하는 것이 좋습니다. 향상된 자동 크기 조정을 사용하여 Delta Live Tables 파이프라인의 클러스터 사용률 최적화를 참조 하세요.

클러스터 크기 조정 이벤트로 인해 상태 재조정이 트리거됩니다. 이벤트를 재조정하는 동안 상태가 클라우드 스토리지에서 새 실행자로 로드될 때 마이크로 일괄 처리의 대기 시간이 더 높을 수 있습니다.

에 대한 초기 상태 지정 mapGroupsWithState

flatMapGroupsWithState 또는 mapGroupsWithState를 사용하여 구조적 스트리밍 상태 저장 처리에 대한 사용자 정의 초기 상태를 지정할 수 있습니다. 이렇게 하면 유효한 검사점 없이 상태 저장 스트림을 시작할 때 데이터를 다시 처리하지 않도록 할 수 있습니다.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

flatMapGroupsWithState 연산자에 초기 상태를 지정하는 예제 사용 사례:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

mapGroupsWithState 연산자에 초기 상태를 지정하는 예제 사용 사례:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

mapGroupsWithState 업데이트 함수 테스트

TestGroupState API를 사용하면 Dataset.groupByKey(...).mapGroupsWithState(...)Dataset.groupByKey(...).flatMapGroupsWithState(...)에 사용된 상태 업데이트 함수를 테스트할 수 있습니다.

상태 업데이트 함수는 GroupState 형식의 개체를 통해 이전 상태를 입력으로 사용합니다. Apache Spark GroupState 참조 설명서를 참조하세요. 예시:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}