Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Para habilitar la administración de estado basada en RocksDB, establezca la siguiente configuración en SparkSession antes de iniciar la consulta de streaming.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Puede habilitar RocksDB en las canalizaciones declarativas de Lakeflow. Consulte Optimización de la configuración de canalización para el procesamiento con estado.
Habilitar puntos de control del registro de cambios
En Databricks Runtime 13.3 LTS y versiones posteriores, puede habilitar los puntos de control del registro de cambios para reducir la duración del punto de control y la latencia de un extremo a otro para cargas de trabajo de Structured Streaming. Databricks recomienda habilitar los puntos de control del registro de cambios para todas las consultas con estado de Structured Streaming.
Tradicionalmente, RocksDB State Store instantáneas y carga archivos de datos durante el punto de comprobación. Para evitar este costo, los puntos de comprobación del registro de cambios solo escriben registros que han cambiado desde el último punto de control en un almacenamiento duradero".
El control de puntos de comprobación del registro de cambios está deshabilitado de forma predeterminada. Puede habilitar los puntos de comprobación del registro de cambios en el nivel SparkSession mediante la sintaxis siguiente:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Puede habilitar los puntos de control del registro de cambios en una secuencia existente y mantener la información de estado almacenada en el punto de control.
Importante
Las consultas que han habilitado los puntos de comprobación del registro de cambios solo se pueden ejecutar en Databricks Runtime 13.3 LTS y versiones posteriores. Puede deshabilitar los puntos de comprobación del registro de cambios para revertir al comportamiento de puntos de comprobación heredados, pero debe seguir ejecutando estas consultas en Databricks Runtime 13.3 LTS o superior. Debe reiniciar el trabajo para que se produzcan estos cambios.
Métricas del almacén de estado de RocksDB
Cada operador de estado recopila las métricas relacionadas con las operaciones de administración de estado realizadas en su instancia de RocksDB para observar el almacén de estado y ayudar potencialmente en la depuración de la lentitud del trabajo.
En Databricks Runtime 16.4 LTS y versiones posteriores, las métricas de una instancia de almacén de estado específica se etiquetan con su identificador de partición y el nombre del almacén, lo que garantiza que permanecen independientes. Todas las demás métricas se notifican como la suma agregada de cada operador de estado en todas las tareas en las que se ejecuta el operador de estado.
Estas métricas forman parte del mapa customMetrics
dentro de los campos de stateOperators
en StreamingQueryProgress
. A continuación se muestra un ejemplo de StreamingQueryProgress
en formato JSON (obtenido mediante StreamingQueryProgress.json()
).
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
Estas son las descripciones detalladas de las métricas:
Nombre de métrica | Descripción |
---|---|
rocksdbCommitWriteBatchLatency | El tiempo (en milisegundos) que se tardó en aplicar las escrituras de almacenamiento provisional en la estructura en memoria (WriteBatch) a RocksDB nativo. |
rocksdbCommitFlushLatency | El tiempo (en milisegundos) que se tardó en vaciar los cambios en memoria de RocksDB en el disco local. |
rocksdbCommitCompactLatency | El tiempo (en milisegundos) que se tardó en realizar la compactación (opcional) durante la confirmación del punto de comprobación. |
Latencia de pausa en confirmación de RocksDB | Esta métrica indica el tiempo (en milisegundos) que se tardó en detener los subprocesos de trabajo en segundo plano (para compactación y otros) como parte de la confirmación del punto de comprobación. |
rocksdb Latencia de Compromiso del Punto de Control | El tiempo (en milisegundos) que se tardó en tomar una instantánea de una instancia de RocksDB nativa y escribirla en un directorio local. |
rocksdbCommitFileSyncLatencyMs | El tiempo (en milisegundos) que se tardó en sincronizar los archivos nativos relacionados con una instantánea de RocksDB con un almacenamiento externo (ubicación del punto de comprobación). |
rocksdbGetLatency | Promedio de tiempo (en nanosegundos) que se tardó por la llamada de RocksDB::Get nativa subyacente. |
rocksdbPutCount | Promedio de tiempo (en nanosegundos) que se tardó por la llamada de RocksDB::Put nativa subyacente. |
rocksdbGetCount | Número de llamadas nativas RocksDB::Get (no incluye Gets de WriteBatch: un lote en memoria utilizado para preparar escrituras). |
rocksdbPutCount | Número de llamadas nativas RocksDB::Put (no incluye llamadas a Puts WriteBatch, que es un lote de memoria usado para escrituras provisionales). |
BytesTotalesLeídosPorGetEnRocksdb | Número de bytes sin comprimir leídos a través de llamadas de RocksDB::Get nativas. |
rocksdbTotalBytesWrittenByPut | Número de bytes sin comprimir escritos a través de llamadas de RocksDB::Put nativas. |
rocksdbReadBlockCacheHitCount | Número de veces que se usa la caché de bloques de RocksDB nativa para evitar leer datos del disco local. |
rocksdbReadBlockCacheMissCount | Número de veces que se usa la caché de bloques de RocksDB nativa para evitar leer datos del disco local. |
Total de bytes leídos por compactación de RocksDB | Número de bytes que el proceso de compactación de RocksDB nativa lee del disco local. |
BytesTotalesEscritosPorCompactionEnRocksdb | Número de bytes que el proceso de compactación de RocksDB nativa escribe en el disco local. |
rocksdbTotalCompactionLatencyMs | El tiempo (en milisegundos) se tardó en realizar las compactaciones de RocksDB (tanto en segundo plano como en la compactación opcional iniciada durante la confirmación). |
rocksdbWriterStallLatencyMs | El tiempo (en milésimos) que el escritor se ha detenido debido a una compactación en segundo plano o a un vaciado de las memtables en el disco. |
rocksdbTotalBytesReadThroughIterator | Algunas de las operaciones con estado (como el procesamiento del tiempo de espera en flatMapGroupsWithState o la marca de agua en agregaciones por ventanas) requieren que se lean todos los datos de la base de datos a través del iterador. Tamaño total de los datos sin comprimir leídos mediante el iterador. |