Apa itu streaming berbasis status?

Halaman ini menjelaskan kueri Streaming Terstruktur stateful, termasuk operasi stateful, rekomendasi pengoptimalan, penautan beberapa operator stateful, dan penyeimbangan ulang status.

Kueri Streaming Terstruktur yang berstatus memerlukan pembaruan bertahap untuk informasi status menengah, sedangkan kueri Streaming Terstruktur tanpa status hanya melacak informasi tentang baris mana yang telah diproses dari sumber ke tujuan. Untuk fitur pengoptimalan yang tersedia untuk kueri stateless, lihat Mengoptimalkan kueri streaming stateless.

Operasi stateful

Operasi stateful mencakup agregasi streaming, distinct, dropDuplicates, gabungan stream-stream, dan aplikasi stateful kustom.

Informasi status perantara yang diperlukan untuk kueri Streaming Terstruktur yang berstatus dapat menyebabkan latensi dan masalah produksi yang tidak terduga jika salah dikonfigurasi.

Di Databricks Runtime 13.3 LTS atau yang lebih baru, Anda dapat mengaktifkan titik pemeriksaan changelog dengan RocksDB untuk menurunkan durasi titik pemeriksaan dan latensi end-to-end untuk beban kerja Streaming Terstruktur. Databricks merekomendasikan untuk mengaktifkan titik pemeriksaan changelog untuk semua kueri stateful pada Streaming Terstruktur. Lihat Aktifkan penandaan changelog.

Mengoptimalkan kueri Streaming Terstruktur bertstatus tetap

Databricks merekomendasikan rekomendasi berikut untuk kueri Streaming Terstruktur yang bersifat stateful:

  • Gunakan instans yang dioptimalkan komputasi sebagai pekerja.
  • Atur jumlah partisi acak menjadi 1-2 kali jumlah inti dalam kluster.

Penting

Jumlah partisi shuffle ditetapkan pada saat titik pemeriksaan dibuat. Mengubah spark.sql.shuffle.partitions tidak berpengaruh pada kueri streaming yang sudah memiliki titik pemeriksaan — kueri terus menggunakan jumlah partisi asli. Untuk menerapkan jumlah partisi baru, Anda harus memulai kueri dengan lokasi titik pemeriksaan baru.

Di Databricks Runtime 18.0 ke atas, kueri streaming stateless mendukung perubahan partisi acak dinamis tanpa memerlukan titik pemeriksaan baru.

  • Atur spark.sql.streaming.noDataMicroBatches.enabled konfigurasi ke false dalam SparkSession. Ini mencegah mesin mikro-batch streaming memproses batch mikro yang tidak berisi data. Mengatur konfigurasi ini ke false mungkin juga mengakibatkan operasi negara yang menggunakan tanda air atau batas waktu pemrosesan tidak akan mendapatkan output data sampai data baru tiba, bukan segera.

Databricks merekomendasikan penggunaan RocksDB dengan titik pemeriksaan changelog untuk mengelola status aliran stateful. Lihat Mengonfigurasi penyimpanan status RocksDB di Azure Databricks.

Catatan

Skema manajemen status tidak dapat diubah di antara mulai ulang kueri. Jika kueri telah dimulai dengan manajemen default, Anda harus memulai ulang dari awal dengan lokasi titik pemeriksaan baru untuk mengubah penyimpanan status.

Bekerja dengan beberapa operator stateful di Streaming Terstruktur

Di Databricks Runtime 13.3 LTS atau yang lebih baru, Azure Databricks menawarkan dukungan lanjutan untuk operator stateful dalam beban kerja Streaming Terstruktur. Anda dapat menghubungkan beberapa operator stateful bersama-sama, yang artinya Anda dapat memasukkan output dari suatu operasi, seperti agregasi berjendela, ke operasi stateful lainnya, seperti join.

Di Databricks Runtime 16.2 atau yang lebih baru, Anda dapat menggunakan transformWithState dalam beban kerja yang memiliki beberapa operator stateful. Lihat Membangun aplikasi stateful kustom.

Contoh berikut menunjukkan beberapa pola yang dapat Anda gunakan.

Penting

Batasan berikut ini ada saat menangani beberapa operator yang bersifat stateful:

  • Operator stateful kustom warisan (FlatMapGroupWithState dan applyInPandasWithState) tidak didukung.
  • Hanya mode output tambahan yang didukung.

Agregasi jendela waktu berantai

Phyton

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Penggabungan jendela waktu pada dua aliran berbeda diikuti oleh penyatuan jendela antar aliran

Phyton

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Penggabungan interval waktu antar-stream diikuti oleh agregasi jendela waktu

Phyton

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Penyeimbangan ulang status untuk Streaming Terstruktur

Penyeimbangan ulang status diaktifkan secara default untuk semua beban kerja streaming di Alur Deklaratif Lakeflow Spark. Di Databricks Runtime 11.3 LTS atau yang lebih baru, Anda dapat mengatur opsi konfigurasi berikut dalam konfigurasi kluster Spark untuk mengaktifkan penyeimbangan ulang status:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Penyeimbangan ulang status menguntungkan alur Streaming Terstruktur stateful yang mengalami peristiwa perubahan ukuran kluster. Operasi streaming tanpa status tidak menunjukkan keuntungan, terlepas dari perubahan ukuran kluster.

Catatan

Penskalaan otomatis komputasi memiliki batasan dalam mengurangi ukuran kluster untuk beban kerja Streaming Terstruktur. Databricks merekomendasikan penggunaan Alur Deklaratif Lakeflow Spark dengan penskalaan otomatis yang disempurnakan untuk beban kerja streaming. Lihat Mengoptimalkan utilisasi kluster Alur Deklaratif Lakeflow Spark dengan Autoscaling.

Peristiwa pengubahan ukuran kluster memicu penyeimbangan ulang kondisi. Mikro-batch mungkin memiliki latensi yang lebih tinggi selama peristiwa penyeimbangan ulang karena status dimuat ulang dari penyimpanan cloud ke pelaksana baru.