다음을 통해 공유


상태 저장 스트리밍이란?

상태 저장 구조화된 스트리밍 쿼리에는 중간 상태 정보에 대한 점진적 업데이트가 필요한 반면, 상태 비저장 구조화된 스트리밍 쿼리는 원본에서 싱크로 처리된 행에 대한 정보만 추적합니다.

상태 저장 작업에는 스트림 집계, 스트림 스트림 조인, 스트리밍 dropDuplicates, 그리고 사용자 정의 상태 저장 애플리케이션이 포함됩니다.

상태 저장 구조적 스트리밍 쿼리에 필요한 중간 상태 정보는 잘못 구성된 경우 예기치 않은 대기 시간 및 프로덕션 문제를 초래할 수 있습니다.

Databricks Runtime 13.3 LTS 이상에서는 RocksDB로 변경 로그 체크포인트를 활성화하여 구조화된 스트리밍 워크로드의 체크포인트 기간과 엔드투엔드 지연 시간을 줄일 수 있습니다. Databricks는 모든 구조적 스트리밍 상태 저장 쿼리에 대해 changelog 검사포인트를 사용하도록 설정할 것을 권합니다. 변경 로그 검사점 활성화를 참조하세요.

상태 저장이 가능한 구조적 스트리밍 쿼리 최적화

상태 저장 구조적 스트리밍 쿼리의 중간 상태 정보를 관리하면 예기치 않은 대기 시간 및 프로덕션 문제를 방지할 수 있습니다.

Databricks는 다음을 권장합니다.

  • 컴퓨팅 최적화된 인스턴스를 작업자로 사용합니다.
  • 순서 섞기 파티션 수를 클러스터의 코어 수의 1~2배로 설정합니다.
  • SparkSession에서 spark.sql.streaming.noDataMicroBatches.enabled 구성을 false로 설정합니다. 이렇게 하면 스트리밍 마이크로 일괄 처리 엔진이 데이터가 포함되지 않은 마이크로 일괄 처리를 처리할 수 없습니다. 이 구성을 false로 설정할 경우, 워터마크를 사용하는 상태 저장 작업이나 처리 시간 초과 설정으로 인해 데이터 출력이 새 데이터가 도착할 때까지 즉시 발생하지 않을 수 있습니다.

Databricks는 상태 저장 스트림의 상태를 관리하기 위해 변경 로그 검사점과 함께 RocksDB를 사용하는 것이 좋습니다. Azure Databricks에서 RocksDB 상태 저장소 구성을 참조하세요.

참고

쿼리를 다시 시작할 때 상태 관리 체계를 변경할 수 없습니다. 쿼리가 기본 관리로 시작된 경우 상태 저장소를 변경하려면 새 검사점 위치로 처음부터 다시 시작해야 합니다.

구조적 스트리밍에서 여러 상태 저장 연산자 작업

Databricks Runtime 13.3 LTS 이상에서 Azure Databricks는 구조적 스트리밍 워크로드의 상태 저장 연산자를 위한 고급 지원을 제공합니다. 이제 여러 상태 저장 연산자를 함께 연결할 수 있습니다. 즉, 창이 있는 집계와 같은 작업의 출력을 조인과 같은 다른 상태 저장 작업에 공급할 수 있습니다.

Databricks Runtime 16.2 이상에서는 여러 상태 저장 연산자가 있는 워크로드에서 transformWithState 사용할 수 있습니다. 사용자 지정 상태 저장 애플리케이션빌드를 참조하세요.

다음 예제에서는 사용할 수 있는 몇 가지 패턴을 보여 줍니다.

중요

여러 상태 저장 연산자를 사용하는 경우 다음과 같은 제한 사항이 있습니다.

  • 레거시 사용자 지정 상태 저장 연산자(FlatMapGroupWithStateapplyInPandasWithState 지원되지 않습니다.
  • 추가 출력 모드만 지원됩니다.

연속된 시간 창 집계

파이썬

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

스칼라

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

서로 다른 두 스트림의 시간 창 집계 후 스트림-스트림 창 조인

파이썬

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

스칼라

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

스트림 간 시간 간격 조인 후 시간 창 집계

파이썬

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

스칼라

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

구조적 스트리밍에 대한 상태 재조정

상태 리밸런싱은 기본적으로 Lakeflow 선언적 파이프라인의 모든 스트리밍 워크로드에 대해 사용하도록 설정됩니다. Databricks Runtime 11.3 LTS 이상에서는 Spark 클러스터 구성에서 다음 구성 옵션을 설정하여 상태 재조정을 사용하도록 설정할 수 있습니다.

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

상태 리밸런싱은 클러스터 크기 조정 이벤트를 거치는 상태 저장 구조적 스트리밍 파이프라인에 이점을 가져옵니다. 상태가 없는 스트리밍 작업은 클러스터 크기의 변경과 상관없이 이점이 없습니다.

참고

구조화된 스트리밍 워크로드의 경우 컴퓨팅 자동 크기 조정에는 클러스터 크기를 스케일 다운하는 데 제한이 있습니다. Databricks는 스트리밍 워크로드에 대해 향상된 자동 크기 조정과 함께 Lakeflow 선언적 파이프라인을 사용하는 것이 좋습니다. 자동 크기 조정을 사용하여 Lakeflow 선언적 파이프라인의 클러스터 사용률 최적화를 참조하세요.

클러스터 크기 조정 이벤트는 상태 재조정을 트리거합니다. 상태가 클라우드 스토리지에서 새 실행자로 로드될 때 마이크로 일괄 처리는 이벤트의 균형을 조정하는 동안 대기 시간이 더 높을 수 있습니다.