Configuración de un almacén de estados de RocksDB en Azure Databricks

Para habilitar la administración de estado basada en RocksDB, establezca la siguiente configuración en SparkSession antes de iniciar la consulta de streaming.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Puede habilitar RocksDB en canalizaciones de Delta Live Tables. Consulte Habilitación del almacén de estado de RocksDB para Delta Live Tables.

Habilitar puntos de control del registro de cambios

En Databricks Runtime 13.3 LTS y versiones posteriores, puede habilitar los puntos de control del registro de cambios para reducir la duración del punto de control y la latencia de un extremo a otro para cargas de trabajo de Structured Streaming. Databricks recomienda habilitar los puntos de control del registro de cambios para todas las consultas con estado de Structured Streaming.

Tradicionalmente, RocksDB State Store instantáneas y carga archivos de datos durante el punto de comprobación. Para evitar este costo, los puntos de comprobación del registro de cambios solo escriben registros que han cambiado desde el último punto de control en un almacenamiento duradero".

El control de puntos de comprobación del registro de cambios está deshabilitado de forma predeterminada. Puede habilitar los puntos de comprobación del registro de cambios en el nivel SparkSession mediante la sintaxis siguiente:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Puede habilitar los puntos de control del registro de cambios en una secuencia existente y mantener la información de estado almacenada en el punto de control.

Importante

Las consultas que han habilitado los puntos de comprobación del registro de cambios solo se pueden ejecutar en Databricks Runtime 13.3 LTS y versiones posteriores. Puede deshabilitar los puntos de comprobación del registro de cambios para revertir al comportamiento de puntos de comprobación heredados, pero debe seguir ejecutando estas consultas en Databricks Runtime 13.3 LTS o superior. Debe reiniciar el trabajo para que se produzcan estos cambios.

Métricas del almacén de estado de RocksDB

Cada operador de estado recopila las métricas relacionadas con las operaciones de administración de estado realizadas en su instancia de RocksDB para observar el almacén de estado y ayudar potencialmente en la depuración de la lentitud del trabajo. Estas métricas se agregan (suma) por operador de estado al trabajo en todas las tareas en las que se ejecuta el operador de estado. Estas métricas forman parte del mapa customMetrics dentro de los campos de stateOperators en StreamingQueryProgress. A continuación se muestra un ejemplo de StreamingQueryProgress en formato JSON (obtenido mediante StreamingQueryProgress.json()).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Estas son las descripciones detalladas de las métricas:

Nombre de métrica Descripción
rocksdbCommitWriteBatchLatency El tiempo (en milisegundos) que se tardó en aplicar las escrituras de almacenamiento provisional en la estructura en memoria (WriteBatch) a RocksDB nativo.
rocksdbCommitFlushLatency El tiempo (en milisegundos) que se tardó en vaciar los cambios en memoria de RocksDB en el disco local.
rocksdbCommitCompactLatency El tiempo (en milisegundos) que se tardó en realizar la compactación (opcional) durante la confirmación del punto de comprobación.
rocksdbCommitPauseLatency Esta métrica indica el tiempo (en milisegundos) que se tardó en detener los subprocesos de trabajo en segundo plano (para compactación y otros) como parte de la confirmación del punto de comprobación.
rocksdbCommitCheckpointLatency El tiempo (en milisegundos) que se tardó en tomar una instantánea de una instancia de RocksDB nativa y escribirla en un directorio local.
rocksdbCommitFileSyncLatencyMs El tiempo (en milisegundos) que se tardó en sincronizar los archivos nativos relacionados con una instantánea de RocksDB con un almacenamiento externo (ubicación del punto de comprobación).
rocksdbGetLatency Promedio de tiempo (en nanosegundos) que se tardó por la llamada de RocksDB::Get nativa subyacente.
rocksdbPutCount Promedio de tiempo (en nanosegundos) que se tardó por la llamada de RocksDB::Put nativa subyacente.
rocksdbGetCount Número de llamadas de RocksDB::Get nativas (no incluye Gets desde WriteBatch: en el lote de memoria usado para escrituras de almacenamiento provisional).
rocksdbPutCount Número de llamadas de RocksDB::Put nativas (no incluye Puts en WriteBatch: en el lote de memoria usado para escrituras de almacenamiento provisional).
rocksdbTotalBytesReadByGet Número de bytes sin comprimir leídos a través de llamadas de RocksDB::Get nativas.
rocksdbTotalBytesWrittenByPut Número de bytes sin comprimir escritos a través de llamadas de RocksDB::Put nativas.
rocksdbReadBlockCacheHitCount Número de veces que se usa la caché de bloques de RocksDB nativa para evitar leer datos del disco local.
rocksdbReadBlockCacheMissCount Número de veces que se usa la caché de bloques de RocksDB nativa para evitar leer datos del disco local.
rocksdbTotalBytesReadByCompaction Número de bytes que el proceso de compactación de RocksDB nativa lee del disco local.
rocksdbTotalBytesWrittenByCompaction Número de bytes que el proceso de compactación de RocksDB nativa escribe en el disco local.
rocksdbTotalCompactionLatencyMs El tiempo (en milisegundos) se tardó en realizar las compactaciones de RocksDB (tanto en segundo plano como en la compactación opcional iniciada durante la confirmación).
rocksdbWriterStallLatencyMs El tiempo (en milésimos) que el escritor se ha detenido debido a una compactación en segundo plano o a un vaciado de las memtables en el disco.
rocksdbTotalBytesReadThroughIterator Algunas de las operaciones con estado (como el procesamiento del tiempo de espera en flatMapGroupsWithState o la marca de agua en agregaciones por ventanas) requieren que se lean todos los datos de la base de datos a través del iterador. Tamaño total de los datos sin comprimir leídos mediante el iterador.