Bagikan melalui


Mengoptimalkan kueri Streaming Terstruktur yang stateful

Mengelola informasi status menengah dari kueri Streaming Terstruktur stateful dapat membantu mencegah latensi dan masalah produksi yang tidak terduga.

Databricks merekomendasikan:

  • Gunakan instans yang dioptimalkan komputasi sebagai pekerja.
  • Atur jumlah partisi shuffle ke 1-2 kali jumlah inti dalam kluster.
  • Atur spark.sql.streaming.noDataMicroBatches.enabled konfigurasi ke false dalam SparkSession. Pengaturan ini mencegah mesin batch mikro streaming memproses batch mikro yang tidak berisi data. Perhatikan juga bahwa mengatur konfigurasi ini ke false dapat mengakibatkan operasi stateful, yang memanfaatkan tanda air atau batas waktu pemrosesan, akan tidak mendapatkan output data sampai data baru tiba, alih-alih mendapatkannya segera.

Databricks merekomendasikan penggunaan RocksDB dengan titik pemeriksaan changelog untuk mengelola status aliran stateful. Lihat Mengonfigurasi penyimpanan status RocksDB di Azure Databricks.

Catatan

Skema manajemen status tidak dapat diubah di antara restart kueri. Artinya, jika kueri telah dimulai dengan manajemen default, maka kueri tidak dapat berubah tanpa memulai kueri dari awal dengan lokasi titik pemeriksaan baru.

Bekerja dengan beberapa operator stateful di Streaming Terstruktur

Di Databricks Runtime 13.3 LTS ke atas, Azure Databricks menawarkan dukungan tingkat lanjut untuk operator stateful dalam beban kerja Streaming Terstruktur. Anda sekarang dapat menautkan beberapa operator stateful bersama-sama, yang berarti Anda dapat memberi umpan output operasi seperti agregasi berjendela ke operasi stateful lainnya seperti gabungan.

Contoh berikut menunjukkan beberapa pola yang dapat Anda gunakan.

Penting

Batasan berikut ada saat bekerja dengan beberapa operator stateful:

  • FlatMapGroupWithState tidak didukung.
  • Hanya mode output tambahan yang didukung.

Agregasi jendela waktu berantai

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agregasi jendela waktu di dua aliran berbeda diikuti oleh gabungan jendela stream-stream

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Gabungan interval waktu stream-stream diikuti oleh agregasi jendela waktu

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Penyeimbangan ulang status untuk Streaming Terstruktur

Penyeimbangan ulang status diaktifkan secara default untuk semua beban kerja streaming di Tabel Langsung Delta. Di Databricks Runtime 11.3 LTS ke atas, Anda dapat mengatur opsi konfigurasi berikut dalam konfigurasi kluster Spark untuk mengaktifkan penyeimbangan ulang status:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Penyeimbangan ulang status menguntungkan alur Streaming Terstruktur stateful yang mengalami peristiwa perubahan ukuran kluster. Operasi streaming stateless tidak mendapat manfaat, terlepas dari perubahan ukuran kluster.

Catatan

Penskalaan otomatis komputasi memiliki batasan penskalaan ukuran kluster untuk beban kerja Streaming Terstruktur. Databricks merekomendasikan penggunaan Live Delta Tables dengan Penskalaan Otomatis yang Disempurnakan untuk beban kerja streaming. Lihat Mengoptimalkan pemanfaatan kluster alur Delta Live Tables dengan Penskalaan Otomatis yang Ditingkatkan.

Peristiwa pengurangan ukuran kluster menyebabkan penyeimbangan ulang status terpicu. Selama menyeimbangkan ulang peristiwa, batch mikro mungkin memiliki latensi yang lebih tinggi karena status dimuat dari penyimpanan cloud ke pelaksana baru.

Tentukan status awal untuk mapGroupsWithState

Anda dapat menentukan status awal yang ditentukan pengguna untuk pemrosesan stateful Streaming Terstruktur menggunakan flatMapGroupsWithStateatau mapGroupsWithState. Ini memungkinkan Anda untuk menghindari pemrosesan ulang data saat memulai aliran stateful tanpa titik pemeriksaan yang valid.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Contoh kasus penggunaan yang menentukan status awal ke operator flatMapGroupsWithState:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Contoh kasus penggunaan yang menentukan status awal ke operator mapGroupsWithState:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

mapGroupsWithState Menguji fungsi pembaruan

API TestGroupState memungkinkan Anda untuk menguji fungsi pembaruan status yang digunakan untuk Dataset.groupByKey(...).mapGroupsWithState(...) dan Dataset.groupByKey(...).flatMapGroupsWithState(...).

Fungsi pembaruan status mengambil status sebelumnya sebagai input menggunakan objek berjenis GroupState. Lihat dokumentasi referensi GroupState dari Apache Spark. Contohnya:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}