Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
RocksDB — это поставщик хранилища состояний по умолчанию в Databricks Runtime 17.3 и выше. Для databricks Runtime версии ниже 17.3 можно включить управление состоянием на основе RocksDB, задав следующую конфигурацию в SparkSession перед началом потокового запроса.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Вы можете включить RocksDB в декларативных конвейерах Spark Lakeflow. См. раздел Оптимизация конфигурации конвейера для обработки с сохранением состояния.
Включение контрольных точек журнала изменений
В Databricks Runtime 13.3 LTS и более поздних версиях вы можете включить контрольную точку отслеживания изменений, чтобы сократить время контрольной точки и уменьшить сквозную задержку для рабочих нагрузок структурированной потоковой передачи. Databricks рекомендует включать контрольные точки журнала изменений для всех запросов структурированных потоков с отслеживанием состояния. Контрольные точки для журналов изменений по умолчанию включены в Databricks Runtime версии 17.3 и выше.
Традиционно в хранилище состояний RocksDB создаются моментальные снимки и загружаются файлы данных во время создания контрольной точки. Чтобы избежать этих затрат, контрольные точки журнала изменений записывают на надежное хранилище только те записи, которые изменились с момента последней контрольной точки.
Контрольная точка журнальных изменений отключена по умолчанию в версиях Databricks Runtime, предшествующих 17.3. Вы можете включить контрольные точки журнала изменений на уровне SparkSession с помощью следующего синтаксиса:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Вы можете включить контрольную точку журнала изменений в существующем потоке и сохранить сведения о состоянии, хранящиеся в контрольной точке.
Внимание
Запросы, в которых включена фиксация контрольных точек журнала изменений, могут выполняться только в Databricks Runtime 13.3 LTS или более поздних версиях. Вы можете отключить контрольную точку журнала изменений, чтобы вернуться к устаревшему поведению контрольных точек, но вы должны продолжать выполнять соответствующие запросы на Databricks Runtime 13.3 LTS или более поздней версии. Для выполнения этих изменений необходимо перезапустить задание.
Метрики хранилища состояний RocksDB
Каждый оператор состояния собирает метрики, связанные с операциями управления состоянием, выполненными на экземпляре RocksDB, чтобы следить за хранилищем состояний и, возможно, помочь в отладке медленной работы задач.
В Databricks Runtime 16.4 LTS и более поздних версиях метрики для определенного экземпляра хранилища состояний помечены их идентификатором раздела и именем хранилища, гарантируя, что они остаются раздельными. Все остальные метрики отчитываются как совокупная сумма для каждого оператора состояния во всех задачах, в которых выполняется оператор состояния.
Эти метрики являются частью карты customMetrics, находящейся внутри полей stateOperators в StreamingQueryProgress. Ниже приведен пример StreamingQueryProgress в форме JSON (получен с помощью StreamingQueryProgress.json()).
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
Ниже приведены подробные описания метрик.
| Имя метрики | Описание |
|---|---|
| Задержка фиксации пакета записей rocksdb (rocksdbCommitWriteBatchLatency) | Время (в миллисекундах), затраченное на применение стадированных операций записи в структуре в памяти (WriteBatch) к родной базе данных RocksDB. |
| rocksdbCommitFlushЗадержка | Время (в миллисекундах), которое понадобилось для очистки изменений в памяти RocksDB на локальном диске. |
| rocksdbCommitCompactLatency | Время (в миллисекундах), которое понадобилось для сжатия (необязательного) во время фиксации контрольной точки. |
| rocksdbCommitPauseLatency | Время (в миллисекундах), затраченное на остановку фоновых рабочих потоков (для компактизации и т. д.) в процессе фиксации контрольной точки. |
| rocksdbCommitCheckpointLatency | Время (в миллисекундах), которое понадобилось для создания моментального снимка встроенного RocksDB и его записи в локальный каталог. |
| rocksdbCommitFileSyncLatencyMs | Время (в миллисекундах), которое понадобилось на синхронизацию файлов, связанных с собственным моментальным снимком RocksDB, со внешним хранилищем в месте контрольной точки. |
| rocksdbGetLatency | Среднее время (в наносекундах), затраченное на один базовый нативный вызов RocksDB::Get. |
| Количество операций записи в RocksDB | Среднее время (в наносекундах), затраченное на один базовый нативный вызов RocksDB::Put. |
| rocksdbGetCount | Количество собственных RocksDB::Get вызовов (не включает Gets, так как WriteBatch — это пакет в памяти, используемый для промежуточной записи). |
| Количество операций записи в RocksDB | Количество собственных RocksDB::Put вызовов (не включает Puts в себя WriteBatch — в пакет памяти, используемый для промежуточной записи). |
| rocksdbОбщееКоличествоБайтовЧтенияЧерезGet | Число несжатых байтов, прочитанных с помощью собственных вызовов RocksDB::Get. |
| rocksdbОбщееЧислоБайтЗаписанныхСПомощьюPut | Число несжатых байтов, записанных с помощью нативных вызовов RocksDB::Put. |
| rocksdbReadBlockCacheHitCount (Количество попаданий в кэш блоков чтения) | Количество использований собственного кэша блоков RocksDB, чтобы избежать считывания данных с локального диска. |
| КоличествоПропусковКэшаЧтенияБлоковRocksDB | Количество промахов нативного кэша блоков RocksDB, требующего считывания данных с локального диска. |
| Общее количество байт, прочитанных при сжатии rocksdb | Число байтов, считанных с локального диска с помощью собственного процесса сжатия RocksDB. |
| rocksdbОбщееКоличествоБайтЗаписанныхКомпакцией | Число байтов, записанных на локальный диск с помощью собственного процесса сжатия RocksDB. |
| rocksdbTotalCompactionLatencyMs | Время (в миллисекундах), потраченное на сжатие RocksDB (как фоновое, так и дополнительное сжатие, инициированное во время фиксации). |
| rocksdbWriterStallLatencyMs (задержка остановки записи в RocksDB в миллисекундах) | Время (в миллисекундах), которое модуль записи простаивал из-за фоновой компакции или сброса мемтаблиц на диск. |
| "суммарное количество байт, прочитанных через итератор в базе данных RocksDB" | Некоторые из операций с отслеживанием состояния (например, обработка времени ожидания в flatMapGroupsWithState или установка водяных знаков в окнах агрегирования) требует чтения всех данных в базе данных с помощью итератора. Общий размер несжатых данных, считанных с помощью итератора. |