Megosztás a következőn keresztül:


A RocksDB állapottároló konfigurálása az Azure Databricksben

A RocksDB-alapú állapotkezelés engedélyezéséhez állítsa be a következő konfigurációt a SparkSessionban, mielőtt elindítja a streamelési lekérdezést.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

A RocksDB-t a Delta Live Tables-folyamatokon engedélyezheti. Lásd: Folyamatkonfiguráció optimalizálása állapotalapú feldolgozáshoz.

Changelog checkpointing engedélyezése

A Databricks Runtime 13.3 LTS-ben és újabb verziókban engedélyezheti a változásnapló-ellenőrzőpont-ellenőrzést, hogy csökkentse az ellenőrzőpontok időtartamát és a végpontok közötti késést a strukturált streamelési számítási feladatok esetében. A Databricks azt javasolja, hogy engedélyezze a változásnapló-ellenőrzőpontozást az összes strukturált streamelési állapotalapú lekérdezéshez.

A RocksDB State Store hagyományosan pillanatképeket ad meg, és adatfájlokat tölt fel az ellenőrzés során. A költség elkerülése érdekében a changelog checkpointing csak azokat a rekordokat írja, amelyek az utolsó ellenőrzőpont óta tartós tárolóra változtak."

A changelog checkpointing alapértelmezés szerint le van tiltva. A változásnapló-ellenőrzőpontozást a SparkSession szinten az alábbi szintaxissal engedélyezheti:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Engedélyezheti a changelog checkpointingot egy meglévő streamen, és fenntarthatja az ellenőrzőpontban tárolt állapotinformációkat.

Fontos

A változásnapló-ellenőrzőpontozást engedélyező lekérdezések csak a Databricks Runtime 13.3 LTS-en és újabb verziókon futtathatók. Letilthatja a changelog checkpointing funkciót, hogy visszaváltson az örökölt ellenőrzőpont-megjelenítési viselkedésre, de ezeket a lekérdezéseket továbbra is futtatnia kell a Databricks Runtime 13.3 LTS-en vagy újabb verzióban. A módosítások elvégzéséhez újra kell indítania a feladatot.

A RocksDB állapottároló metrikái

Minden állapotkezelő összegyűjti a RocksDB-példányán végrehajtott állapotkezelési műveletekhez kapcsolódó metrikákat, hogy megfigyelje az állapottárolót, és segítsen a feladatok lassúságának hibakeresésében. Ezek a metrikák állapotoperátoronként összesítve (összegként) jelennek meg a feladatokban az összes olyan tevékenység esetében, ahol az állapotkezelő fut. Ezek a metrikák a customMetrics térkép részét képezik a mezőkben.stateOperators StreamingQueryProgress Az alábbiakban egy példa StreamingQueryProgress látható JSON-formában (amelyet a következővel StreamingQueryProgress.json()szerezünk be).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

A metrikák részletes leírása a következő:

Metrika neve Leírás
rocksdbCommitWriteBatchLatency Az idő (ezredmásodpercben) a memóriabeli strukturált szakaszos írások (WriteBatch) natív RocksDB-re való alkalmazásához szükséges idő.
rocksdbCommitFlushLatency A RocksDB memóriabeli változásainak helyi lemezre való kiürítéséhez szükséges idő (ezredmásodpercben).
rocksdbCommitCompactLatency Az ellenőrzőpont véglegesítése során (ezredmásodpercben) a tömörítés ideje (nem kötelező).
rocksdbCommitPauseLatency Az ellenőrzőpont-véglegesítés részeként (ezredmásodpercben) időt vett igénybe a háttérmunkaszálak leállítása (tömörítéshez stb.).
rocksdbCommitCheckpointLatency Az idő (ezredmásodpercben) pillanatképet készített a natív RocksDB-ről, és egy helyi könyvtárba írta.
rocksdbCommitFileSyncLatencyMs A natív RocksDB-pillanatképhez kapcsolódó fájlok külső tárolóba (ellenőrzőpont helyére) való szinkronizálásához szükséges idő (ezredmásodpercben).
rocksdbGetLatency Az alapul szolgáló natív RocksDB::Get hívás átlagos időtartama (nanoban).
rocksdbPutCount Az alapul szolgáló natív RocksDB::Put hívás átlagos időtartama (nanoban).
rocksdbGetCount Natív RocksDB::Get hívások száma (nem szerepel a Gets WriteBatchből – az előkészítési írásokhoz használt memória kötegben).
rocksdbPutCount Natív RocksDB::Put hívások száma (nem tartalmazza Puts a WriteBatchet – az előkészítési írásokhoz használt memória kötegben).
rocksdbTotalBytesReadByGet Natív RocksDB::Get hívásokon keresztül beolvasott tömörítetlen bájtok száma.
rocksdbTotalBytesWrittenByPut Natív RocksDB::Put hívásokon keresztül írt tömörítetlen bájtok száma.
rocksdbReadBlockCacheHitCount Hányszor használja a natív RocksDB blokkgyorsítótárat a helyi lemezről származó adatok olvasásának elkerülése érdekében.
rocksdbReadBlockCacheMissCount Hányszor hiányzott a natív RocksDB blokkgyorsítótár, és hány adatot kellett beolvasni a helyi lemezről.
rocksdbTotalBytesReadByCompaction A helyi lemezről a natív RocksDB tömörítési folyamat által beolvasott bájtok száma.
rocksdbTotalBytesWrittenByCompaction A natív RocksDB tömörítési folyamat által a helyi lemezre írt bájtok száma.
rocksdbTotalCompactionLatencyMs A RocksDB tömörítése (a háttérben és a véglegesítés során kezdeményezett opcionális tömörítésnél is) időt vett igénybe (ezredmásodpercben).
rocksdbWriterStallLatencyMs Az író (ezredmásodpercben) leállt a memtables háttértömörítése vagy a lemezre való kiürítése miatt.
rocksdbTotalBytesReadThroughIterator Az állapotalapú műveletek némelyikéhez (például az időtúllépés feldolgozásához flatMapGroupsWithState vagy az ablakos aggregációk vízjelezéséhez) a teljes adatkészlet iterátoron keresztüli olvasása szükséges. A tömörítetlen adatok teljes mérete az iterátor használatával.