Delen via


RocksDB-statusopslag configureren in Azure Databricks

U kunt statusbeheer op basis van RocksDB inschakelen door de volgende configuratie in de SparkSession in te stellen voordat u de streamingquery start.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

U kunt RocksDB inschakelen op declaratieve pijplijnen van Lakeflow. Zie Optimaliseer de pijplijnconfiguratie voor statusgevoelige verwerking.

Changelog-controlepunten inschakelen

In Databricks Runtime 13.3 LTS en hoger kunt u changelog-controlepunten inschakelen om de duur van controlepunten en de end-to-end-latentie voor workloads met Structured Streaming te verlagen. Databricks raadt aan controlepunten in het wijzigingenlogboek in te schakelen voor alle stateful queries voor gestructureerde streaming.

Traditioneel maakt RocksDB State Store momentopnamen en uploadt het gegevensbestanden tijdens het checkpuntproces. Om deze kosten te voorkomen, schrijven changelog-checkpoints alleen records die zijn gewijzigd sinds het laatste controlepunt naar duurzame opslag.

Changelog-controlepunten zijn standaard uitgeschakeld. U kunt controlepunten voor wijzigingenlogboeken inschakelen in het SparkSession-niveau met behulp van de volgende syntaxis:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

U kunt controlepunten voor wijzigingenlogboeken inschakelen voor een bestaande stroom en de statusgegevens onderhouden die zijn opgeslagen in het controlepunt.

Belangrijk

Query's waarvoor controlepunten voor wijzigingenlogboeken zijn ingeschakeld, kunnen alleen worden uitgevoerd op Databricks Runtime 13.3 LTS en hoger. U kunt changelog-controlepunten uitschakelen om terug te keren naar het verouderde controlepuntgedrag, maar u moet deze query's blijven uitvoeren op Databricks Runtime 13.3 LTS of hoger. U moet de taak opnieuw starten om deze wijzigingen te kunnen uitvoeren.

Metrische gegevens van de rocksDB-statusopslag

Elke toestandsoperator verzamelt statistieken met betrekking tot de toestandsbeheerbewerkingen die zijn uitgevoerd op zijn RocksDB-instantie om de toestandsopslag te observeren en mogelijk te helpen bij het analyseren van traagheid in de taakuitvoering.

In Databricks Runtime 16.4 LTS en hoger worden de metrieken voor een specifiek exemplaar van het statusarchief gelabeld met hun partitie-ID en opslagplaatsnaam, zodat ze gescheiden blijven. Alle andere metrische gegevens worden als de som van de geaggregeerde waarden voor elke toestandsoperator gerapporteerd, voor alle taken waarin de toestandsoperator wordt gebruikt.

Deze metrische gegevens maken deel uit van de customMetrics kaart in de stateOperators velden in StreamingQueryProgress. Hier volgt een voorbeeld van StreamingQueryProgress in JSON-vorm (verkregen met behulp van StreamingQueryProgress.json()).

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

Gedetailleerde beschrijvingen van de metrische gegevens zijn als volgt:

Naam van meetwaarde Beschrijving
rocksdbCommitWriteBatchLatency (latentie voor het committeren van schrijfbatch in RocksDB) De tijd (in milliseconden) die benodigd was voor het toepassen van de gefaseerde schrijfbewerkingen in de in-geheugenstructuur (WriteBatch) op de native RocksDB.
rocksdbCommitFlushLatency Tijd (in milliseconden) die is verstreken voor het doorspoelen van de in-memory wijzigingen van RocksDB naar de lokale schijf.
rocksdbCommitCompactLatency Tijd (in milliseconden) die nodig is voor compactie (optioneel) tijdens het doorvoeren van het controlepunt.
rocksdbCommitPauseLatency De tijd (in milliseconden) die nodig is voor het stoppen van de achtergrondwerkdraden als onderdeel van het vastleggen van het controlepunt (voor compactie, enzovoort).
rocksdbBevestigingControlepuntLatentie Tijd (in milliseconden) die is genomen om een snapshot van de native RocksDB te maken en naar een lokale map te schrijven.
rocksdbCommitFileSyncLatencyMs Tijd (in millis) die het kostte om de native RocksDB-snapshot gerelateerde bestanden naar een externe opslag (controlepuntlocatie) te synchroniseren.
rocksdbGetLatency De gemiddelde tijd (in nanos) die nodig was voor de onderliggende systeemeigen RocksDB::Get aanroep.
rocksdbPutCount De gemiddelde tijd (in nanos) die nodig was voor de onderliggende systeemeigen RocksDB::Put aanroep.
rocksdbGetCount Aantal native RocksDB::Get aanroepen (dit omvat Gets niet van WriteBatch - een geheugenbatch die wordt gebruikt voor faseringsschrijfbewerkingen).
rocksdbPutCount Aantal systeemeigen RocksDB::Put aanroepen (omvat Puts niet naar WriteBatch- in de geheugenbatch die wordt gebruikt voor faseringsschrijfbewerkingen).
rocksdbTotalBytesReadByGet Aantal niet-gecomprimeerde bytes dat gelezen wordt via systeemeigen RocksDB::Get-aanroepen.
rocksdbTotaalBytesGeschrevenDoorPlaats Aantal niet-gecomprimeerde bytes geschreven door inheemse RocksDB::Put oproepen.
rocksdbReadBlockCacheHitCount Aantal keren dat de systeemeigen RocksDB-blokcache wordt gebruikt om te voorkomen dat gegevens van lokale schijf worden gelezen.
AantalGemisteCacheBlokkenBijLezenInRocksdb Aantal keren dat de systeemeigen RocksDB blokcache miste en het nodig was om gegevens van de lokale schijf te lezen.
rocksdbTotaalBytesGelezenDoorCompactie Het aantal bytes dat van de lokale schijf is gelezen door het systeemeigen RocksDB-compressieproces.
rocksdbTotaalAantalBytesGeschrevenDoorCompactie Het aantal bytes dat naar de lokale schijf is geschreven door het systeemeigen RocksDB-compressieproces.
rocksdbTotalCompactionLatencyMs Tijd (in milliseconden) genomen voor het uitvoeren van RocksDB-compacties, zowel in de achtergrond als de optionele compactie die tijdens de commit wordt geïnitieerd.
rocksdbWriterStallLatencyMs Tijd (in millis) dat de schrijver geblokkeerd is vanwege een achtergrondcompactie of het doorspoelen van de memtables naar schijf.
rocksdbTotaalBytesGelezenViaIterator Voor sommige stateful bewerkingen (zoals time-outverwerking in flatMapGroupsWithState of watermerken in vensteraggregaties) moet alle gegevens in de DB worden gelezen met een iterator. De totale grootte van niet-gecomprimeerde gegevens die worden gelezen met behulp van de iterator.