Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
RocksDB adalah penyedia penyimpanan status default di Databricks Runtime 17.3 ke atas. Untuk versi Databricks Runtime di bawah 17.3, Anda dapat mengaktifkan manajemen status berbasis RocksDB dengan mengatur konfigurasi berikut di SparkSession sebelum memulai kueri streaming.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Anda dapat mengaktifkan RocksDB pada Alur Deklaratif Lakeflow Spark. Lihat Mengoptimalkan konfigurasi alur untuk pemrosesan berstatus.
Mengaktifkan penandaan changelog
Di Databricks Runtime 13.3 LTS ke atas, Anda dapat mengaktifkan pemeriksaan changelog untuk mengurangi durasi checkpoint dan latensi end-to-end pada pekerjaan streaming terstruktur. Databricks merekomendasikan untuk mengaktifkan pemeriksaan poin perubahan (changelog checkpointing) untuk semua kueri Streaming Terstruktur yang bersifat stateful. Changelog checkpointing diaktifkan secara default di Databricks Runtime 17.3 ke atas.
Secara tradisional, RocksDB State Store mengambil cuplikan dan mengunggah file data selama proses checkpoint. Untuk menghindari biaya ini, titik pemeriksaan changelog hanya menulis rekaman yang telah berubah sejak titik pemeriksaan terakhir menjadi penyimpanan yang tahan lama."
Changelog checkpointing dinonaktifkan secara default pada versi Databricks Runtime sebelum 17.3. Anda dapat mengaktifkan titik pemeriksaan changelog di tingkat SparkSession menggunakan sintaks berikut:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Anda dapat mengaktifkan titik pemeriksaan changelog pada aliran yang ada dan mempertahankan informasi status yang disimpan di titik pemeriksaan.
Penting
Kueri yang telah mengaktifkan titik pemeriksaan changelog hanya dapat dijalankan pada Databricks Runtime 13.3 LTS ke atas. Anda dapat menonaktifkan checkpointing changelog untuk kembali ke perilaku checkpointing lama, tetapi Anda harus terus menjalankan kueri ini pada Databricks Runtime 13.3 LTS atau versi yang lebih tinggi. Anda harus memulai ulang pekerjaan agar perubahan ini terjadi.
Metrik penyimpanan status RocksDB
Setiap operator status mengumpulkan metrik yang terkait dengan manajemen status yang dilakukan pada instans RocksDB-nya untuk memantau penyimpanan status dan berpotensi membantu dalam debugging kelambatan tugas.
Di Databricks Runtime 16.4 LTS ke atas, metrik untuk instans penyimpanan status tertentu diberi label dengan ID partisi dan nama penyimpanan mereka, memastikannya tetap terpisah. Semua metrik lainnya dilaporkan sebagai jumlah agregat untuk setiap operator status di semua tugas tempat operator status berjalan.
Metrik ini adalah bagian dari peta customMetrics dalam bidang stateOperators di StreamingQueryProgress. Berikut ini adalah contoh StreamingQueryProgress dalam bentuk JSON (diperoleh menggunakan StreamingQueryProgress.json()).
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
Deskripsi terperinci tentang metrik adalah sebagai berikut:
| Nama metrik | Deskripsi |
|---|---|
| LatensiKomitBerkasTulisRocksdb | Waktu (dalam milidetik) yang diperlukan untuk menerapkan penulisan bertahap pada struktur data dalam memori (WriteBatch) ke RocksDB asli. |
| rocksdbCommitFlushLatency | Waktu (dalam mili) yang diperlukan untuk mem-flush perubahan dalam memori RocksDB ke disk lokal. |
| rocksdbCommitCompactLatency | Waktu (dalam milidetik) yang diperlukan untuk pemadatan (opsional) selama komit titik pemeriksaan. |
| rocksdbCommitPauseLatency | Waktu (dalam milidetik) yang diperlukan untuk memberhentikan utas pekerja di latar belakang (untuk pemadatan, dll.) sebagai bagian dari komit titik pemeriksaan. |
| rocksdbCommitCheckpointLatency | Waktu (dalam mili) yang diperlukan untuk mengambil snapshot dari RocksDB asli dan menulisnya ke direktori lokal. |
| rocksdbCommitFileSyncLatencyMs (latensi sinkronisasi berkas komitmen rocksdb dalam ms) | Waktu (dalam mili) yang diperlukan untuk menyinkronkan file terkait snapshot RocksDB asli ke penyimpanan eksternal (lokasi titik pemeriksaan). |
| rocksdbGetLatency | Waktu rata-rata (dalam nanodetik) yang diperlukan untuk setiap panggilan RocksDB::Get asli yang mendasarinya. |
| rocksdbPutCount | Waktu rata-rata (dalam nanodetik) yang diperlukan untuk setiap panggilan RocksDB::Put asli yang mendasarinya. |
| rocksdbGetCount (fungsi untuk mendapatkan jumlah data dari RocksDB) | Jumlah panggilan asli RocksDB::Get (tidak termasuk Gets dari WriteBatch - batch memori yang digunakan untuk penulisan sementara). |
| rocksdbPutCount | Jumlah panggilan bawaan RocksDB::Put (tidak termasuk Puts ke WriteBatch - batch dalam memori yang digunakan untuk menampung tulisan sementara). |
| rocksdbTotalBytesReadByGet (total byte yang dibaca oleh operasi Get di rocksdb) | Jumlah byte yang tidak terkompresi yang dibaca melalui panggilan asli RocksDB::Get. |
| rocksdbTotalBytesDitulisOlehPut | Jumlah byte yang tidak terkompresi yang ditulis melalui pemanggilan fungsi RocksDB::Put bawaan. |
| rocksdbReadBlockCacheHitCount (jumlah hit cache blok baca di rocksdb) | Berapa kali cache blok RocksDB asli digunakan untuk menghindari membaca data dari disk lokal. |
| rocksdbJumlahKesalahanCacheBlokBaca | Berapa kali cache blok RocksDB asli hilang dan diperlukan pembacaan data dari disk lokal. |
| RocksdbTotalBytesDibacaOlehPemadatan | Jumlah byte yang dibaca dari disk lokal oleh proses pemadatan RocksDB asli. |
| rocksdbTotalBytesYangDitulisOlehPemadatan | Jumlah byte yang ditulis ke disk lokal oleh proses pemadatan RocksDB asli. |
| rocksdbTotalCompactionLatencyMs (Latensi Kompaksi Total rocksdb dalam Milidetik) | Waktu (dalam milidetik) yang dibutuhkan untuk kompaksi RocksDB (baik pada latar belakang maupun kompaksi opsional yang dimulai selama komit). |
| rocksdbWriterStallLatencyMs | Waktu (dalam milidetik) ketika penulis macet karena pemadatan latar belakang atau pengosongan tabel memori ke disk. |
| Total Byte yang Dibaca Melalui Iterator | Sejumlah operasi stateful (seperti pemrosesan batas waktu di flatMapGroupsWithState atau watermarking dalam agregasi berjendela) memerlukan pembacaan seluruh data di basis data melalui iterator. Ukuran total data yang tidak terkompresi yang dibaca menggunakan iterator. |