Configuración de un almacén de estados de RocksDB en Azure Databricks
Para habilitar la administración de estado basada en RocksDB, establezca la siguiente configuración en SparkSession antes de iniciar la consulta de streaming.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Puede habilitar RocksDB en canalizaciones de Delta Live Tables. Consulte Optimización de la configuración de canalización para el procesamiento con estado.
Habilitar puntos de control del registro de cambios
En Databricks Runtime 13.3 LTS y versiones posteriores, puede habilitar los puntos de control del registro de cambios para reducir la duración del punto de control y la latencia de un extremo a otro para cargas de trabajo de Structured Streaming. Databricks recomienda habilitar los puntos de control del registro de cambios para todas las consultas con estado de Structured Streaming.
Tradicionalmente, RocksDB State Store instantáneas y carga archivos de datos durante el punto de comprobación. Para evitar este costo, los puntos de comprobación del registro de cambios solo escriben registros que han cambiado desde el último punto de control en un almacenamiento duradero".
El control de puntos de comprobación del registro de cambios está deshabilitado de forma predeterminada. Puede habilitar los puntos de comprobación del registro de cambios en el nivel SparkSession mediante la sintaxis siguiente:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Puede habilitar los puntos de control del registro de cambios en una secuencia existente y mantener la información de estado almacenada en el punto de control.
Importante
Las consultas que han habilitado los puntos de comprobación del registro de cambios solo se pueden ejecutar en Databricks Runtime 13.3 LTS y versiones posteriores. Puede deshabilitar los puntos de comprobación del registro de cambios para revertir al comportamiento de puntos de comprobación heredados, pero debe seguir ejecutando estas consultas en Databricks Runtime 13.3 LTS o superior. Debe reiniciar el trabajo para que se produzcan estos cambios.
Métricas del almacén de estado de RocksDB
Cada operador de estado recopila las métricas relacionadas con las operaciones de administración de estado realizadas en su instancia de RocksDB para observar el almacén de estado y ayudar potencialmente en la depuración de la lentitud del trabajo. Estas métricas se agregan (suma) por operador de estado al trabajo en todas las tareas en las que se ejecuta el operador de estado. Estas métricas forman parte del mapa customMetrics
dentro de los campos de stateOperators
en StreamingQueryProgress
. A continuación se muestra un ejemplo de StreamingQueryProgress
en formato JSON (obtenido mediante StreamingQueryProgress.json()
).
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
Estas son las descripciones detalladas de las métricas:
Nombre de métrica | Descripción |
---|---|
rocksdbCommitWriteBatchLatency | El tiempo (en milisegundos) que se tardó en aplicar las escrituras de almacenamiento provisional en la estructura en memoria (WriteBatch) a RocksDB nativo. |
rocksdbCommitFlushLatency | El tiempo (en milisegundos) que se tardó en vaciar los cambios en memoria de RocksDB en el disco local. |
rocksdbCommitCompactLatency | El tiempo (en milisegundos) que se tardó en realizar la compactación (opcional) durante la confirmación del punto de comprobación. |
rocksdbCommitPauseLatency | Esta métrica indica el tiempo (en milisegundos) que se tardó en detener los subprocesos de trabajo en segundo plano (para compactación y otros) como parte de la confirmación del punto de comprobación. |
rocksdbCommitCheckpointLatency | El tiempo (en milisegundos) que se tardó en tomar una instantánea de una instancia de RocksDB nativa y escribirla en un directorio local. |
rocksdbCommitFileSyncLatencyMs | El tiempo (en milisegundos) que se tardó en sincronizar los archivos nativos relacionados con una instantánea de RocksDB con un almacenamiento externo (ubicación del punto de comprobación). |
rocksdbGetLatency | Promedio de tiempo (en nanosegundos) que se tardó por la llamada de RocksDB::Get nativa subyacente. |
rocksdbPutCount | Promedio de tiempo (en nanosegundos) que se tardó por la llamada de RocksDB::Put nativa subyacente. |
rocksdbGetCount | Número de llamadas de RocksDB::Get nativas (no incluye Gets desde WriteBatch: en el lote de memoria usado para escrituras de almacenamiento provisional). |
rocksdbPutCount | Número de llamadas de RocksDB::Put nativas (no incluye Puts en WriteBatch: en el lote de memoria usado para escrituras de almacenamiento provisional). |
rocksdbTotalBytesReadByGet | Número de bytes sin comprimir leídos a través de llamadas de RocksDB::Get nativas. |
rocksdbTotalBytesWrittenByPut | Número de bytes sin comprimir escritos a través de llamadas de RocksDB::Put nativas. |
rocksdbReadBlockCacheHitCount | Número de veces que se usa la caché de bloques de RocksDB nativa para evitar leer datos del disco local. |
rocksdbReadBlockCacheMissCount | Número de veces que se usa la caché de bloques de RocksDB nativa para evitar leer datos del disco local. |
rocksdbTotalBytesReadByCompaction | Número de bytes que el proceso de compactación de RocksDB nativa lee del disco local. |
rocksdbTotalBytesWrittenByCompaction | Número de bytes que el proceso de compactación de RocksDB nativa escribe en el disco local. |
rocksdbTotalCompactionLatencyMs | El tiempo (en milisegundos) se tardó en realizar las compactaciones de RocksDB (tanto en segundo plano como en la compactación opcional iniciada durante la confirmación). |
rocksdbWriterStallLatencyMs | El tiempo (en milésimos) que el escritor se ha detenido debido a una compactación en segundo plano o a un vaciado de las memtables en el disco. |
rocksdbTotalBytesReadThroughIterator | Algunas de las operaciones con estado (como el procesamiento del tiempo de espera en flatMapGroupsWithState o la marca de agua en agregaciones por ventanas) requieren que se lean todos los datos de la base de datos a través del iterador. Tamaño total de los datos sin comprimir leídos mediante el iterador. |