Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
U kunt statusbeheer op basis van RocksDB inschakelen door de volgende configuratie in de SparkSession in te stellen voordat u de streamingquery start.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
U kunt RocksDB inschakelen op declaratieve pijplijnen van Lakeflow. Zie Optimaliseer de pijplijnconfiguratie voor statusgevoelige verwerking.
Changelog-controlepunten inschakelen
In Databricks Runtime 13.3 LTS en hoger kunt u changelog-controlepunten inschakelen om de duur van controlepunten en de end-to-end-latentie voor workloads met Structured Streaming te verlagen. Databricks raadt aan controlepunten in het wijzigingenlogboek in te schakelen voor alle stateful queries voor gestructureerde streaming.
Traditioneel maakt RocksDB State Store momentopnamen en uploadt het gegevensbestanden tijdens het checkpuntproces. Om deze kosten te voorkomen, schrijven changelog-checkpoints alleen records die zijn gewijzigd sinds het laatste controlepunt naar duurzame opslag.
Changelog-controlepunten zijn standaard uitgeschakeld. U kunt controlepunten voor wijzigingenlogboeken inschakelen in het SparkSession-niveau met behulp van de volgende syntaxis:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
U kunt controlepunten voor wijzigingenlogboeken inschakelen voor een bestaande stroom en de statusgegevens onderhouden die zijn opgeslagen in het controlepunt.
Belangrijk
Query's waarvoor controlepunten voor wijzigingenlogboeken zijn ingeschakeld, kunnen alleen worden uitgevoerd op Databricks Runtime 13.3 LTS en hoger. U kunt changelog-controlepunten uitschakelen om terug te keren naar het verouderde controlepuntgedrag, maar u moet deze query's blijven uitvoeren op Databricks Runtime 13.3 LTS of hoger. U moet de taak opnieuw starten om deze wijzigingen te kunnen uitvoeren.
Metrische gegevens van de rocksDB-statusopslag
Elke toestandsoperator verzamelt statistieken met betrekking tot de toestandsbeheerbewerkingen die zijn uitgevoerd op zijn RocksDB-instantie om de toestandsopslag te observeren en mogelijk te helpen bij het analyseren van traagheid in de taakuitvoering.
In Databricks Runtime 16.4 LTS en hoger worden de metrieken voor een specifiek exemplaar van het statusarchief gelabeld met hun partitie-ID en opslagplaatsnaam, zodat ze gescheiden blijven. Alle andere metrische gegevens worden als de som van de geaggregeerde waarden voor elke toestandsoperator gerapporteerd, voor alle taken waarin de toestandsoperator wordt gebruikt.
Deze metrische gegevens maken deel uit van de customMetrics
kaart in de stateOperators
velden in StreamingQueryProgress
. Hier volgt een voorbeeld van StreamingQueryProgress
in JSON-vorm (verkregen met behulp van StreamingQueryProgress.json()
).
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
Gedetailleerde beschrijvingen van de metrische gegevens zijn als volgt:
Naam van meetwaarde | Beschrijving |
---|---|
rocksdbCommitWriteBatchLatency (latentie voor het committeren van schrijfbatch in RocksDB) | De tijd (in milliseconden) die benodigd was voor het toepassen van de gefaseerde schrijfbewerkingen in de in-geheugenstructuur (WriteBatch) op de native RocksDB. |
rocksdbCommitFlushLatency | Tijd (in milliseconden) die is verstreken voor het doorspoelen van de in-memory wijzigingen van RocksDB naar de lokale schijf. |
rocksdbCommitCompactLatency | Tijd (in milliseconden) die nodig is voor compactie (optioneel) tijdens het doorvoeren van het controlepunt. |
rocksdbCommitPauseLatency | De tijd (in milliseconden) die nodig is voor het stoppen van de achtergrondwerkdraden als onderdeel van het vastleggen van het controlepunt (voor compactie, enzovoort). |
rocksdbBevestigingControlepuntLatentie | Tijd (in milliseconden) die is genomen om een snapshot van de native RocksDB te maken en naar een lokale map te schrijven. |
rocksdbCommitFileSyncLatencyMs | Tijd (in millis) die het kostte om de native RocksDB-snapshot gerelateerde bestanden naar een externe opslag (controlepuntlocatie) te synchroniseren. |
rocksdbGetLatency | De gemiddelde tijd (in nanos) die nodig was voor de onderliggende systeemeigen RocksDB::Get aanroep. |
rocksdbPutCount | De gemiddelde tijd (in nanos) die nodig was voor de onderliggende systeemeigen RocksDB::Put aanroep. |
rocksdbGetCount | Aantal native RocksDB::Get aanroepen (dit omvat Gets niet van WriteBatch - een geheugenbatch die wordt gebruikt voor faseringsschrijfbewerkingen). |
rocksdbPutCount | Aantal systeemeigen RocksDB::Put aanroepen (omvat Puts niet naar WriteBatch- in de geheugenbatch die wordt gebruikt voor faseringsschrijfbewerkingen). |
rocksdbTotalBytesReadByGet | Aantal niet-gecomprimeerde bytes dat gelezen wordt via systeemeigen RocksDB::Get -aanroepen. |
rocksdbTotaalBytesGeschrevenDoorPlaats | Aantal niet-gecomprimeerde bytes geschreven door inheemse RocksDB::Put oproepen. |
rocksdbReadBlockCacheHitCount | Aantal keren dat de systeemeigen RocksDB-blokcache wordt gebruikt om te voorkomen dat gegevens van lokale schijf worden gelezen. |
AantalGemisteCacheBlokkenBijLezenInRocksdb | Aantal keren dat de systeemeigen RocksDB blokcache miste en het nodig was om gegevens van de lokale schijf te lezen. |
rocksdbTotaalBytesGelezenDoorCompactie | Het aantal bytes dat van de lokale schijf is gelezen door het systeemeigen RocksDB-compressieproces. |
rocksdbTotaalAantalBytesGeschrevenDoorCompactie | Het aantal bytes dat naar de lokale schijf is geschreven door het systeemeigen RocksDB-compressieproces. |
rocksdbTotalCompactionLatencyMs | Tijd (in milliseconden) genomen voor het uitvoeren van RocksDB-compacties, zowel in de achtergrond als de optionele compactie die tijdens de commit wordt geïnitieerd. |
rocksdbWriterStallLatencyMs | Tijd (in millis) dat de schrijver geblokkeerd is vanwege een achtergrondcompactie of het doorspoelen van de memtables naar schijf. |
rocksdbTotaalBytesGelezenViaIterator | Voor sommige stateful bewerkingen (zoals time-outverwerking in flatMapGroupsWithState of watermerken in vensteraggregaties) moet alle gegevens in de DB worden gelezen met een iterator. De totale grootte van niet-gecomprimeerde gegevens die worden gelezen met behulp van de iterator. |