RocksDB-statusopslag configureren in Azure Databricks

U kunt statusbeheer op basis van RocksDB inschakelen door de volgende configuratie in de SparkSession in te stellen voordat u de streamingquery start.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

U kunt RocksDB inschakelen in Delta Live Tables-pijplijnen. Zie RocksDB-statusopslag inschakelen voor Delta Live Tables.

Controlepunten voor wijzigingenlogboek inschakelen

In Databricks Runtime 13.3 LTS en hoger kunt u changelog-controlepunten inschakelen om de duur van het controlepunt en de end-to-end-latentie voor structured streaming-workloads te verlagen. Databricks raadt aan controlepunten in het wijzigingenlogboek in te schakelen voor alle stateful query's voor gestructureerd streamen.

Traditionele RocksDB State Store-momentopnamen en uploadt gegevensbestanden tijdens controlepunten. Om deze kosten te voorkomen, schrijft changelog-controlepunten alleen records die zijn gewijzigd sinds het laatste controlepunt naar duurzame opslag.

Changelog-controlepunten zijn standaard uitgeschakeld. U kunt controlepunten voor wijzigingenlogboeken inschakelen in het SparkSession-niveau met behulp van de volgende syntaxis:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

U kunt controlepunten voor wijzigingenlogboeken inschakelen voor een bestaande stroom en de statusgegevens onderhouden die zijn opgeslagen in het controlepunt.

Belangrijk

Query's waarvoor controlepunten voor wijzigingenlogboeken zijn ingeschakeld, kunnen alleen worden uitgevoerd op Databricks Runtime 13.3 LTS en hoger. U kunt changelog-controlepunten uitschakelen om terug te keren naar het verouderde controlepuntgedrag, maar u moet deze query's blijven uitvoeren op Databricks Runtime 13.3 LTS of hoger. U moet de taak opnieuw starten om deze wijzigingen te kunnen uitvoeren.

Metrische gegevens van de rocksDB-statusopslag

Elke statusoperator verzamelt metrische gegevens met betrekking tot de statusbeheerbewerkingen die zijn uitgevoerd op het RocksDB-exemplaar om het statusarchief te observeren en mogelijk te helpen bij het opsporen van fouten in de taak traagheid. Deze metrische gegevens worden geaggregeerd (som) per statusoperator in de taak voor alle taken waarin de statusoperator wordt uitgevoerd. Deze metrische gegevens maken deel uit van de customMetrics kaart in de stateOperators velden in StreamingQueryProgress. Hier volgt een voorbeeld van in JSON-formulier (verkregen met behulp van StreamingQueryProgressStreamingQueryProgress.json()).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Gedetailleerde beschrijvingen van de metrische gegevens zijn als volgt:

Naam van meetwaarde Beschrijving
rocksdbCommitWriteBatchLatency De tijd (in millis) duurde voor het toepassen van de gefaseerde schrijfbewerkingen in de geheugenstructuur (WriteBatch) op systeemeigen RocksDB.
rocksdbCommitFlushLatency Tijd (in millis) voor het leegmaken van de RocksDB-in-memory wijzigingen in de lokale schijf.
rocksdbCommitCompactLatency Tijd (in millis) duurde voor compressie (optioneel) tijdens het doorvoeren van het controlepunt.
rocksdbCommitPauseLatency Tijd (in millis) voor het stoppen van de achtergrondwerkrolthreads (voor compressie, enzovoort) als onderdeel van de controlepuntdoorvoering.
rocksdbCommitCheckpointLatency Tijd (in millis) voor het maken van een momentopname van systeemeigen RocksDB en schrijven naar een lokale map.
rocksdbCommitFileSyncLatencyMs Tijd (in millis) voor het synchroniseren van de systeemeigen RocksDB-momentopname gerelateerde bestanden aan een externe opslag (controlepuntlocatie).
rocksdbGetLatency De gemiddelde tijd (in nanos) duurde per onderliggende systeemeigen RocksDB::Get aanroep.
rocksdbPutCount De gemiddelde tijd (in nanos) duurde per onderliggende systeemeigen RocksDB::Put aanroep.
rocksdbGetCount Aantal systeemeigen RocksDB::Get aanroepen (omvat Gets niet van WriteBatch- in de geheugenbatch die wordt gebruikt voor faseringsschrijfbewerkingen).
rocksdbPutCount Aantal systeemeigen RocksDB::Put aanroepen (omvat Puts niet naar WriteBatch- in de geheugenbatch die wordt gebruikt voor faseringsschrijfbewerkingen).
rocksdbTotalBytesReadByGet Aantal niet-gecomprimeerde bytes dat door systeemeigen RocksDB::Get aanroepen wordt gelezen.
rocksdbTotalBytesWrittenByPut Aantal niet-gecomprimeerde bytes geschreven via systeemeigen RocksDB::Put aanroepen.
rocksdbReadBlockCacheHitCount Aantal keren dat de systeemeigen RocksDB-blokcache wordt gebruikt om te voorkomen dat gegevens van lokale schijf worden gelezen.
rocksdbReadBlockCacheMissCount Aantal keren dat de systeemeigen RocksDB cache gemist en vereiste leesgegevens van de lokale schijf.
rocksdbTotalBytesReadByCompaction Het aantal bytes dat van de lokale schijf is gelezen door het systeemeigen RocksDB-compressieproces.
rocksdbTotalBytesWrittenByCompaction Het aantal bytes dat naar de lokale schijf is geschreven door het systeemeigen RocksDB-compressieproces.
rocksdbTotalCompactionLatencyMs Tijd (in millis) nam voor RocksDB-compressies (zowel achtergrond als de optionele compressie geïnitieerd tijdens de doorvoer).
rocksdbWriterStallLatencyMs Tijd (in millis) is de schrijver vastgelopen vanwege een achtergrondcompressie of het leegmaken van de memtables op schijf.
rocksdbTotalBytesReadThroughIterator Voor sommige stateful bewerkingen (zoals time-outverwerking in flatMapGroupsWithState of watermerken in gevensterde aggregaties) moeten volledige gegevens in DB worden gelezen via iterator. De totale grootte van niet-gecomprimeerde gegevens die worden gelezen met behulp van de iterator.