Bagikan melalui


Mengonfigurasi penyimpanan status RocksDB di Azure Databricks

Anda dapat mengaktifkan manajemen status berbasis RocksDB dengan mengatur konfigurasi berikut di SparkSession sebelum memulai kueri streaming.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Anda dapat mengaktifkan RocksDB pada alur Delta Live Tables. Lihat Mengoptimalkan konfigurasi alur untuk pemrosesan stateful.

Mengaktifkan titik pemeriksaan changelog

Di Databricks Runtime 13.3 LTS ke atas, Anda dapat mengaktifkan titik pemeriksaan changelog ke durasi titik pemeriksaan yang lebih rendah dan latensi end-to-end untuk beban kerja Streaming Terstruktur. Databrick merekomendasikan untuk mengaktifkan titik pemeriksaan changelog untuk semua kueri stateful Streaming Terstruktur.

Secara tradisional rekam jepret RocksDB State Store dan mengunggah file data selama titik pemeriksaan. Untuk menghindari biaya ini, titik pemeriksaan changelog hanya menulis rekaman yang telah berubah sejak titik pemeriksaan terakhir menjadi penyimpanan yang tahan lama."

Changelog checkpointing dinonaktifkan secara default. Anda dapat mengaktifkan titik pemeriksaan changelog di tingkat SparkSession menggunakan sintaks berikut:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Anda dapat mengaktifkan titik pemeriksaan changelog pada aliran yang ada dan mempertahankan informasi status yang disimpan di titik pemeriksaan.

Penting

Kueri yang telah mengaktifkan titik pemeriksaan changelog hanya dapat dijalankan pada Databricks Runtime 13.3 LTS ke atas. Anda dapat menonaktifkan titik pemeriksaan changelog untuk kembali ke perilaku titik pemeriksaan warisan, tetapi Anda harus terus menjalankan kueri ini pada Databricks Runtime 13.3 LTS atau yang lebih tinggi. Anda harus memulai ulang pekerjaan agar perubahan ini terjadi.

Metrik penyimpanan status RocksDB

Setiap operator status mengumpulkan metrik yang terkait dengan operasi manajemen status, dilakukan pada instans RocksDB-nya untuk mengamati penyimpanan status dan berpotensi membantu dalam debugging kelambanan pekerjaan. Metrik ini dikumpulkan (jumlah) per operator status dalam pekerjaan di seluruh tugas tempat operator status sedang berjalan. Metrik ini adalah bagian dari peta customMetrics dalam bidang stateOperators di StreamingQueryProgress. Berikut ini adalah contoh StreamingQueryProgress dalam bentuk JSON (diperoleh menggunakan StreamingQueryProgress.json()).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Deskripsi terperinci tentang metrik adalah sebagai berikut:

Nama metrik Deskripsi
rocksdbCommitWriteBatchLatency Waktu (dalam mili) yang diperlukan untuk menerapkan penulisan bertahap pada struktur dalam memori (WriteBatch) ke RocksDB asli.
rocksdbCommitFlushLatency Waktu (dalam mili) yang diperlukan untuk mem-flush perubahan dalam memori RocksDB ke disk lokal.
rocksdbCommitCompactLatency Waktu (dalam mili) yang diperlukan untuk pemadatan (opsional) selama penerapan titik pemeriksaan.
rocksdbCommitPauseLatency Waktu (dalam mili) yang diperlukan untuk menghentikan utas pekerja di latar belakang (untuk pemadatan, dll.) sebagai bagian dari penerapan titik pemeriksaan.
rocksdbCommitCheckpointLatency Waktu (dalam mili) yang diperlukan untuk mengambil snapshot dari RocksDB asli dan menulisnya ke direktori lokal.
rocksdbCommitFileSyncLatencyMs Waktu (dalam mili) yang diperlukan untuk menyinkronkan file terkait snapshot RocksDB asli ke penyimpanan eksternal (lokasi titik pemeriksaan).
rocksdbGetLatency Waktu rata-rata (dalam nano) yang dibutuhkan per panggilan RocksDB::Get asli yang mendasarinya.
rocksdbPutCount Waktu rata-rata (dalam nano) yang dibutuhkan per panggilan RocksDB::Put asli yang mendasarinya.
rocksdbGetCount Jumlah panggilan RocksDB::Get asli (tidak termasuk Gets dari WriteBatch - dalam batch memori yang digunakan untuk penulisan bertahap).
rocksdbPutCount Jumlah panggilan RocksDB::Put asli (tidak termasuk Puts ke WriteBatch - dalam batch memori yang digunakan untuk penulisan bertahap).
rocksdbTotalBytesReadByGet Jumlah byte yang tidak terkompresi yang dibaca melalui panggilan RocksDB::Get asli.
rocksdbTotalBytesWrittenByPut Jumlah byte yang tidak terkompresi yang ditulis melalui panggilan RocksDB::Put asli.
rocksdbReadBlockCacheHitCount Berapa kali cache blok RocksDB asli digunakan untuk menghindari membaca data dari disk lokal.
rocksdbReadBlockCacheMissCount Berapa kali cache blok RocksDB asli hilang dan diperlukan pembacaan data dari disk lokal.
rocksdbTotalBytesReadByCompaction Jumlah byte yang dibaca dari disk lokal oleh proses pemadatan RocksDB asli.
rocksdbTotalBytesWrittenByCompaction Jumlah byte yang ditulis ke disk lokal oleh proses pemadatan RocksDB asli.
rocksdbTotalCompactionLatencyMs Waktu (dalam mili) yang dibutuhkan untuk pemadatan RocksDB (baik di latar belakang maupun pemadatan opsional yang dimulai selama penerapan).
rocksdbWriterStallLatencyMs Waktu (dalam mili) ketika penulis terhenti karena pemadatan latar belakang atau flushing memtable ke disk.
rocksdbTotalBytesReadThroughIterator Beberapa operasi stateful (seperti pemrosesan batas waktu di flatMapGroupsWithState atau watermarking dalam agregasi berjendela) memerlukan pembacaan seluruh data di DB melalui iterator. Ukuran total data yang tidak terkompresi yang dibaca menggunakan iterator.