A RocksDB állapottároló konfigurálása az Azure Databricksben
A RocksDB-alapú állapotkezelés engedélyezéséhez állítsa be a következő konfigurációt a SparkSessionban, mielőtt elindítja a streamelési lekérdezést.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
A RocksDB-t a Delta Live Tables-folyamatokon engedélyezheti. Lásd: Folyamatkonfiguráció optimalizálása állapotalapú feldolgozáshoz.
Changelog checkpointing engedélyezése
A Databricks Runtime 13.3 LTS-ben és újabb verziókban engedélyezheti a változásnapló-ellenőrzőpont-ellenőrzést, hogy csökkentse az ellenőrzőpontok időtartamát és a végpontok közötti késést a strukturált streamelési számítási feladatok esetében. A Databricks azt javasolja, hogy engedélyezze a változásnapló-ellenőrzőpontozást az összes strukturált streamelési állapotalapú lekérdezéshez.
A RocksDB State Store hagyományosan pillanatképeket ad meg, és adatfájlokat tölt fel az ellenőrzés során. A költség elkerülése érdekében a changelog checkpointing csak azokat a rekordokat írja, amelyek az utolsó ellenőrzőpont óta tartós tárolóra változtak."
A changelog checkpointing alapértelmezés szerint le van tiltva. A változásnapló-ellenőrzőpontozást a SparkSession szinten az alábbi szintaxissal engedélyezheti:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Engedélyezheti a changelog checkpointingot egy meglévő streamen, és fenntarthatja az ellenőrzőpontban tárolt állapotinformációkat.
Fontos
A változásnapló-ellenőrzőpontozást engedélyező lekérdezések csak a Databricks Runtime 13.3 LTS-en és újabb verziókon futtathatók. Letilthatja a changelog checkpointing funkciót, hogy visszaváltson az örökölt ellenőrzőpont-megjelenítési viselkedésre, de ezeket a lekérdezéseket továbbra is futtatnia kell a Databricks Runtime 13.3 LTS-en vagy újabb verzióban. A módosítások elvégzéséhez újra kell indítania a feladatot.
A RocksDB állapottároló metrikái
Minden állapotkezelő összegyűjti a RocksDB-példányán végrehajtott állapotkezelési műveletekhez kapcsolódó metrikákat, hogy megfigyelje az állapottárolót, és segítsen a feladatok lassúságának hibakeresésében. Ezek a metrikák állapotoperátoronként összesítve (összegként) jelennek meg a feladatokban az összes olyan tevékenység esetében, ahol az állapotkezelő fut. Ezek a metrikák a customMetrics
térkép részét képezik a mezőkben.stateOperators
StreamingQueryProgress
Az alábbiakban egy példa StreamingQueryProgress
látható JSON-formában (amelyet a következővel StreamingQueryProgress.json()
szerezünk be).
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
A metrikák részletes leírása a következő:
Metrika neve | Leírás |
---|---|
rocksdbCommitWriteBatchLatency | Az idő (ezredmásodpercben) a memóriabeli strukturált szakaszos írások (WriteBatch) natív RocksDB-re való alkalmazásához szükséges idő. |
rocksdbCommitFlushLatency | A RocksDB memóriabeli változásainak helyi lemezre való kiürítéséhez szükséges idő (ezredmásodpercben). |
rocksdbCommitCompactLatency | Az ellenőrzőpont véglegesítése során (ezredmásodpercben) a tömörítés ideje (nem kötelező). |
rocksdbCommitPauseLatency | Az ellenőrzőpont-véglegesítés részeként (ezredmásodpercben) időt vett igénybe a háttérmunkaszálak leállítása (tömörítéshez stb.). |
rocksdbCommitCheckpointLatency | Az idő (ezredmásodpercben) pillanatképet készített a natív RocksDB-ről, és egy helyi könyvtárba írta. |
rocksdbCommitFileSyncLatencyMs | A natív RocksDB-pillanatképhez kapcsolódó fájlok külső tárolóba (ellenőrzőpont helyére) való szinkronizálásához szükséges idő (ezredmásodpercben). |
rocksdbGetLatency | Az alapul szolgáló natív RocksDB::Get hívás átlagos időtartama (nanoban). |
rocksdbPutCount | Az alapul szolgáló natív RocksDB::Put hívás átlagos időtartama (nanoban). |
rocksdbGetCount | Natív RocksDB::Get hívások száma (nem szerepel a Gets WriteBatchből – az előkészítési írásokhoz használt memória kötegben). |
rocksdbPutCount | Natív RocksDB::Put hívások száma (nem tartalmazza Puts a WriteBatchet – az előkészítési írásokhoz használt memória kötegben). |
rocksdbTotalBytesReadByGet | Natív RocksDB::Get hívásokon keresztül beolvasott tömörítetlen bájtok száma. |
rocksdbTotalBytesWrittenByPut | Natív RocksDB::Put hívásokon keresztül írt tömörítetlen bájtok száma. |
rocksdbReadBlockCacheHitCount | Hányszor használja a natív RocksDB blokkgyorsítótárat a helyi lemezről származó adatok olvasásának elkerülése érdekében. |
rocksdbReadBlockCacheMissCount | Hányszor hiányzott a natív RocksDB blokkgyorsítótár, és hány adatot kellett beolvasni a helyi lemezről. |
rocksdbTotalBytesReadByCompaction | A helyi lemezről a natív RocksDB tömörítési folyamat által beolvasott bájtok száma. |
rocksdbTotalBytesWrittenByCompaction | A natív RocksDB tömörítési folyamat által a helyi lemezre írt bájtok száma. |
rocksdbTotalCompactionLatencyMs | A RocksDB tömörítése (a háttérben és a véglegesítés során kezdeményezett opcionális tömörítésnél is) időt vett igénybe (ezredmásodpercben). |
rocksdbWriterStallLatencyMs | Az író (ezredmásodpercben) leállt a memtables háttértömörítése vagy a lemezre való kiürítése miatt. |
rocksdbTotalBytesReadThroughIterator | Az állapotalapú műveletek némelyikéhez (például az időtúllépés feldolgozásához flatMapGroupsWithState vagy az ablakos aggregációk vízjelezéséhez) a teljes adatkészlet iterátoron keresztüli olvasása szükséges. A tömörítetlen adatok teljes mérete az iterátor használatával. |