Configurar o repositório de estado do RocksDB no Azure Databricks
Você pode habilitar o gerenciamento de estado baseado no RockDB definindo a configuração a seguir na SparkSession antes de iniciar a consulta de streaming.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Você pode habilitar o RocksDB nos pipelines do Delta Live Tables. Consulte Otimizar a configuração do pipeline para processamento com estado.
Habilitar ponto de verificação do log de alterações
No Databricks Runtime 13.3 LTS e superior, você pode habilitar o ponto de verificação do log de alterações para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho de Fluxo Estruturado. O Databricks recomenda habilitar o ponto de verificação do log de alterações para todas as consultas com estado de Streaming Estruturado.
Tradicionalmente, o RocksDB State Store faz instantâneos e upload de arquivos de dados durante o ponto de verificação. Para evitar esse custo, o ponto de verificação do log de alterações grava no armazenamento durável apenas os registros que foram alterados desde o último ponto de verificação.”
O ponto de verificação do registro de alterações está desabilitado por padrão. É possível utilizar a seguinte sintaxe para habilitar o ponto de verificação do log de alterações no nível do SparkSession:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
É possível habilitar o ponto de verificação do log de alterações em um fluxo existente e manter as informações de estado armazenadas no ponto de verificação.
Importante
As consultas que habilitaram o ponto de verificação do log de alterações só podem ser executadas no Databricks Runtime 13.3 LTS e superior. Você pode desabilitar o ponto de verificação do log de alterações para reverter para o comportamento do ponto de verificação herdado, mas precisa continuar executando essas consultas no Databricks Runtime 13.3 LTS ou superior. Você deve reiniciar o trabalho para que essas alterações ocorram.
Métricas de armazenamento de estado RocksDB
Cada operador de estado coleta métricas relacionadas às operações de gerenciamento de estado executadas em sua instância do RocksDB para observar o armazenamento de estado e potencialmente ajudar na depuração da lentidão do trabalho. Essas métricas são agregadas (soma) por operador de estado no trabalho em todas as tarefas em que o operador de estado está em execução. Essas métricas fazem parte do mapa customMetrics
dentro dos campos stateOperators
em StreamingQueryProgress
. Veja a seguir um exemplo de StreamingQueryProgress
no formato JSON (obtido usando StreamingQueryProgress.json()
).
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
Descrições detalhadas das métricas são as seguinte:
Nome da métrica | Descrição |
---|---|
rocksdbCommitWriteBatchLatency | Tempo (mili-segundos) necessário para aplicar as gravações em fases na estrutura na memória (WriteBatch) ao RocksDB nativo. |
rocksdbCommitFlushLatency | Tempo (mili-segundos) necessário para liberar as alterações na memória do RocksDB para o disco local. |
rocksdbCommitCompactLatency | O tempo (mili-segundos) levou para compactação (opcional) durante a confirmação do ponto de verificação. |
rocksdbCommitPauseLatency | Tempo (em milissegundos) necessário para interromper os threads de trabalho em segundo plano (para compactação etc.) como parte da confirmação de ponto de verificação. |
rocksdbCommitCheckpointLatency | Tempo (mili-segundos) levou para tirar um instantâneo do RocksDB nativo e gravar em um diretório local. |
rocksdbCommitFileSyncLatencyMs | Tempo (mili-segundos) necessário para sincronizar os arquivos nativos relacionados ao instantâneo do RocksDB para um armazenamento externo (local do ponto de verificação). |
rocksdbGetLatency | O tempo médio (em nanos) levou de acordo com a chamada RocksDB::Get nativa subjacente. |
rocksdbPutCount | O tempo médio (em nanos) levou de acordo com a chamada RocksDB::Put nativa subjacente. |
rocksdbGetCount | Número de chamadas RocksDB::Get nativas (não inclui de Gets WriteBatch – no lote de memória usado para gravações de preparação). |
rocksdbPutCount | Número de chamadas RocksDB::Put nativas (não inclui Puts para WriteBatch – no lote de memória usado para gravações de preparação). |
rocksdbTotalBytesReadByGet | Número de bytes descompactados lidos por chamadas RocksDB::Get nativas. |
rocksdbTotalBytesWrittenByPut | Número de bytes descompactados gravados por chamadas RocksDB::Put nativas. |
rocksdbReadBlockCacheHitCount | Número de vezes que o cache de blocos do RocksDB nativo é usado para evitar a leitura de dados do disco local. |
rocksdbReadBlockCacheMissCount | Número de vezes que o cache de blocos do RocksDB nativo perdeu e exigiu a leitura de dados do disco local. |
rocksdbTotalBytesReadByCompaction | Número de bytes lidos do disco local pelo processo de compactação do RocksDB nativo. |
rocksdbTotalBytesWrittenByCompaction | Número de bytes gravados no disco local pelo processo de compactação do RocksDB nativo. |
rocksdbTotalCompactionLatencyMs | O tempo (mili-segundos) levou para as compactações do RocksDB (tanto em segundo plano quanto para a compactação opcional iniciada durante a confirmação). |
rocksdbWriterStallLatencyMs | Tempo (mili-segundos) o autor foi paralisado devido a uma compactação em segundo plano ou liberação das memtables para o disco. |
rocksdbTotalBytesReadThroughIterator | Algumas das operações com estado (como processamento de tempo máximo em flatMapGroupsWithState ou marca d'água em agregação em janelas) exigem a leitura de dados inteiros no BD por meio do iterador. O tamanho total dos dados descompactados lidos usando o iterador. |