Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Halaman ini menjelaskan kueri Streaming Terstruktur stateful, termasuk operasi stateful, rekomendasi pengoptimalan, penautan beberapa operator stateful, dan penyeimbangan ulang status.
Kueri Streaming Terstruktur yang berstatus memerlukan pembaruan bertahap untuk informasi status menengah, sedangkan kueri Streaming Terstruktur tanpa status hanya melacak informasi tentang baris mana yang telah diproses dari sumber ke tujuan. Untuk fitur pengoptimalan yang tersedia untuk kueri stateless, lihat Mengoptimalkan kueri streaming stateless.
Operasi stateful
Operasi stateful mencakup agregasi streaming, distinct, dropDuplicates, gabungan stream-stream, dan aplikasi stateful kustom.
Informasi status perantara yang diperlukan untuk kueri Streaming Terstruktur yang berstatus dapat menyebabkan latensi dan masalah produksi yang tidak terduga jika salah dikonfigurasi.
Di Databricks Runtime 13.3 LTS atau yang lebih baru, Anda dapat mengaktifkan titik pemeriksaan changelog dengan RocksDB untuk menurunkan durasi titik pemeriksaan dan latensi end-to-end untuk beban kerja Streaming Terstruktur. Databricks merekomendasikan untuk mengaktifkan titik pemeriksaan changelog untuk semua kueri stateful pada Streaming Terstruktur. Lihat Aktifkan penandaan changelog.
Mengoptimalkan kueri Streaming Terstruktur bertstatus tetap
Databricks merekomendasikan rekomendasi berikut untuk kueri Streaming Terstruktur yang bersifat stateful:
- Gunakan instans yang dioptimalkan komputasi sebagai pekerja.
- Atur jumlah partisi acak menjadi 1-2 kali jumlah inti dalam kluster.
Penting
Jumlah partisi shuffle ditetapkan pada saat titik pemeriksaan dibuat. Mengubah spark.sql.shuffle.partitions tidak berpengaruh pada kueri streaming yang sudah memiliki titik pemeriksaan — kueri terus menggunakan jumlah partisi asli. Untuk menerapkan jumlah partisi baru, Anda harus memulai kueri dengan lokasi titik pemeriksaan baru.
Di Databricks Runtime 18.0 ke atas, kueri streaming stateless mendukung perubahan partisi acak dinamis tanpa memerlukan titik pemeriksaan baru.
- Atur
spark.sql.streaming.noDataMicroBatches.enabledkonfigurasi kefalsedalam SparkSession. Ini mencegah mesin mikro-batch streaming memproses batch mikro yang tidak berisi data. Mengatur konfigurasi ini kefalsemungkin juga mengakibatkan operasi negara yang menggunakan tanda air atau batas waktu pemrosesan tidak akan mendapatkan output data sampai data baru tiba, bukan segera.
Databricks merekomendasikan penggunaan RocksDB dengan titik pemeriksaan changelog untuk mengelola status aliran stateful. Lihat Mengonfigurasi penyimpanan status RocksDB di Azure Databricks.
Catatan
Skema manajemen status tidak dapat diubah di antara mulai ulang kueri. Jika kueri telah dimulai dengan manajemen default, Anda harus memulai ulang dari awal dengan lokasi titik pemeriksaan baru untuk mengubah penyimpanan status.
Bekerja dengan beberapa operator stateful di Streaming Terstruktur
Di Databricks Runtime 13.3 LTS atau yang lebih baru, Azure Databricks menawarkan dukungan lanjutan untuk operator stateful dalam beban kerja Streaming Terstruktur. Anda dapat menghubungkan beberapa operator stateful bersama-sama, yang artinya Anda dapat memasukkan output dari suatu operasi, seperti agregasi berjendela, ke operasi stateful lainnya, seperti join.
Di Databricks Runtime 16.2 atau yang lebih baru, Anda dapat menggunakan transformWithState dalam beban kerja yang memiliki beberapa operator stateful. Lihat Membangun aplikasi stateful kustom.
Contoh berikut menunjukkan beberapa pola yang dapat Anda gunakan.
Penting
Batasan berikut ini ada saat menangani beberapa operator yang bersifat stateful:
- Operator stateful kustom warisan (
FlatMapGroupWithStatedanapplyInPandasWithState) tidak didukung. - Hanya mode output tambahan yang didukung.
Agregasi jendela waktu berantai
Phyton
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Penggabungan jendela waktu pada dua aliran berbeda diikuti oleh penyatuan jendela antar aliran
Phyton
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Penggabungan interval waktu antar-stream diikuti oleh agregasi jendela waktu
Phyton
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Penyeimbangan ulang status untuk Streaming Terstruktur
Penyeimbangan ulang status diaktifkan secara default untuk semua beban kerja streaming di Alur Deklaratif Lakeflow Spark. Di Databricks Runtime 11.3 LTS atau yang lebih baru, Anda dapat mengatur opsi konfigurasi berikut dalam konfigurasi kluster Spark untuk mengaktifkan penyeimbangan ulang status:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Penyeimbangan ulang status menguntungkan alur Streaming Terstruktur stateful yang mengalami peristiwa perubahan ukuran kluster. Operasi streaming tanpa status tidak menunjukkan keuntungan, terlepas dari perubahan ukuran kluster.
Catatan
Penskalaan otomatis komputasi memiliki batasan dalam mengurangi ukuran kluster untuk beban kerja Streaming Terstruktur. Databricks merekomendasikan penggunaan Alur Deklaratif Lakeflow Spark dengan penskalaan otomatis yang disempurnakan untuk beban kerja streaming. Lihat Mengoptimalkan utilisasi kluster Alur Deklaratif Lakeflow Spark dengan Autoscaling.
Peristiwa pengubahan ukuran kluster memicu penyeimbangan ulang kondisi. Mikro-batch mungkin memiliki latensi yang lebih tinggi selama peristiwa penyeimbangan ulang karena status dimuat ulang dari penyimpanan cloud ke pelaksana baru.