A strukturált streamelés lekérdezéseinek monitorozása az Azure Databricksben
Az Azure Databricks beépített monitorozást biztosít strukturált streamelési alkalmazásokhoz a Stream lap Spark felhasználói felületén keresztül.
Strukturált streamelési lekérdezések megkülönböztetése a Spark felhasználói felületén
Adjon meg egy egyedi lekérdezésnevet a streameknek, ha hozzáadja .queryName(<query-name>)
a kódhoz, writeStream
így könnyen megkülönböztetheti, hogy mely metrikák tartoznak a Spark felhasználói felületén lévő streamhez.
Strukturált streamelési metrikák leküldése külső szolgáltatásokba
A streamelési metrikák az Apache Spark Stream Query Listener felületének használatával küldhetők le külső szolgáltatásokba riasztási vagy irányítópult-használati esetekhez. A Databricks Runtime 11.3 LTS-ben és újabb verziókban a streamlekérdezés figyelője elérhető a Pythonban és a Scalában.
Fontos
A Unity Catalog által felügyelt hitelesítő adatok és objektumok nem használhatók a logikában StreamingQueryListener
.
Feljegyzés
A figyelőkkel társított feldolgozási késés hátrányosan befolyásolhatja a lekérdezések feldolgozását. A Databricks azt javasolja, hogy minimalizálja a feldolgozási logikát ezekben a figyelőkben, és alacsony késésű fogadókba, például a Kafkába írjon.
Az alábbi kód alapvető példákat tartalmaz a figyelők implementálására szolgáló szintaxisra:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Megfigyelhető metrikák definiálása a strukturált streamelésben
A megfigyelhető metrikák tetszőleges aggregátumfüggvényeket neveznek el, amelyek meghatározhatók egy lekérdezésen (DataFrame). Amint egy DataFrame végrehajtása eléri a befejezési pontot (azaz befejez egy kötegelt lekérdezést vagy elér egy streamelési időszakot), a rendszer egy elnevezett eseményt bocsát ki, amely az utolsó befejezési pont óta feldolgozott adatok metrikáit tartalmazza.
Ezeket a metrikákat úgy figyelheti meg, hogy egy figyelőt csatol a Spark-munkamenethez. A figyelő a végrehajtási módtól függ:
Batch mód: Használat
QueryExecutionListener
.QueryExecutionListener
a lekérdezés befejezésekor hívjuk meg. A metrikák elérése aQueryExecution.observedMetrics
térkép használatával.Streamelés vagy mikroköteg: Használja
StreamingQueryListener
.StreamingQueryListener
akkor lesz meghívva, amikor a streamelési lekérdezés befejez egy korszakot. A metrikák elérése aStreamingQueryProgress.observedMetrics
térkép használatával. Az Azure Databricks nem támogatja a folyamatos végrehajtási streamelést.
Példa:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
StreamingQueryListener objektummetrikák
Metrika | Leírás |
---|---|
id |
Az újraindítások között megmaradó egyedi lekérdezésazonosító. Lásd: StreamingQuery.id(). |
runId |
Egyedi lekérdezésazonosító minden indításhoz vagy újraindításhoz. Lásd: StreamingQuery.runId(). |
name |
A lekérdezés felhasználó által megadott neve. Null érték, ha nincs megadva. |
timestamp |
A mikro köteg végrehajtásának időbélyege. |
batchId |
A feldolgozandó adatok aktuális kötegének egyedi azonosítója. Vegye figyelembe, hogy hiba utáni újrapróbálkozás esetén egy adott kötegazonosító többször is végrehajtható. Hasonlóképpen, ha nincs feldolgozandó adat, a kötegazonosító nem növekszik. |
numInputRows |
Az eseményindítókban feldolgozott rekordok összesítése (az összes forrásban). |
inputRowsPerSecond |
A beérkező adatok összesített (minden forrásra kiterjedő) aránya. |
processedRowsPerSecond |
Összesítési sebesség (az összes forrásban) az adatok feldolgozásának gyakorisága a Sparkban. |
durationMs objektum
Információ a mikroköteg-végrehajtási folyamat különböző szakaszainak elvégzéséhez szükséges időről.
Metrika | Leírás |
---|---|
durationMs.addBatch |
A mikrobatch végrehajtásához szükséges idő. Ez kizárja a Spark által a mikrobatch tervezéséhez szükséges időt. |
durationMs.getBatch |
Az eltolások metaadatainak lekérése a forrásból időbe telik. |
durationMs.latestOffset |
A mikrobatchhez felhasznált legújabb eltolás. Ez a folyamatobjektum arra az időre vonatkozik, amely a legújabb eltolás lekéréséhez szükséges a forrásokból. |
durationMs.queryPlanning |
A végrehajtási terv létrehozásához szükséges idő. |
durationMs.triggerExecution |
A mikrobatch megtervezéséhez és végrehajtásához szükséges idő. |
durationMs.walCommit |
Az új elérhető eltolások véglegesítéséhez szükséges idő. |
eventTime objektum
Információ a mikro kötegben feldolgozott adatokban látható eseményidő értékéről. Ezeket az adatokat a vízjel használja a strukturált streamelési feladatban definiált állapotalapú összesítések feldolgozásának állapotának vágására.
Metrika | Leírás |
---|---|
eventTime.avg |
Az eseményindítóban látható átlagos eseményidő. |
eventTime.max |
Az eseményindítóban látható maximális eseményidő. |
eventTime.min |
Az eseményindítóban látható minimális eseményidő. |
eventTime.watermark |
Az eseményindítóban használt vízjel értéke. |
stateOperators objektum
Információk a strukturált streamelési feladatban meghatározott állapotalapú műveletekről és az ezekből előállított összesítésekről.
Metrika | Leírás |
---|---|
stateOperators.operatorName |
Annak az állapotalapú operátornak a neve, amelyre a metrikák vonatkoznak. Például, symmetricHashJoin , dedupe . stateStoreSave |
stateOperators.numRowsTotal |
Az állapotban lévő sorok száma az állapotalapú operátor vagy -összesítés eredményeként. |
stateOperators.numRowsUpdated |
Az állapotalapú operátor vagy -összesítés eredményeként az állapotban frissített sorok száma. |
stateOperators.numRowsRemoved |
Az állapotkezelő operátor vagy -összesítés eredményeként eltávolított sorok száma. |
stateOperators.commitTimeMs |
Az összes frissítés véglegesítéséhez (az eltávolításokhoz) és az új verzió visszaadásához szükséges idő. |
stateOperators.memoryUsedBytes |
Az állapottároló által használt memória. |
stateOperators.numRowsDroppedByWatermark |
Az állapotalapú összesítésben túl későnek ítélt sorok száma. Csak streamelési aggregációk: Az összesítés után elvetett sorok száma, nyers bemeneti sorok helyett. A szám nem pontos, de azt jelezheti, hogy a késői adatok elvetése történik. |
stateOperators.numShufflePartitions |
Az állapotalapú operátor shuffle partícióinak száma. |
stateOperators.numStateStoreInstances |
Az operátor által inicializált és karbantartott állapottároló tényleges példánya. Sok állapotalapú operátor esetében ez megegyezik a partíciók számával, de a stream-stream illesztés partíciónként négy állapottárpéldányt inicializál. |
stateOperators.customMetrics objektum
A RocksDB-től gyűjtött információk, amelyek metrikákat rögzítenek a teljesítményéről és műveleteiről a strukturált streamelési feladathoz fenntartott állapotalapú értékek tekintetében. További információ: A RocksDB állapottároló konfigurálása az Azure Databricksben.
Metrika | Leírás |
---|---|
customMetrics.rocksdbBytesCopied |
A RocksDB Fájlkezelő által nyomon követett bájtok száma. |
customMetrics.rocksdbCommitCheckpointLatency |
Ezredmásodpercben idő a natív RocksDB pillanatképének készítésére és egy helyi könyvtárba való írására. |
customMetrics.rocksdbCompactLatency |
Az ellenőrzőpont véglegesítése során ezredmásodpercben megadott tömörítési idő (nem kötelező). |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Ezredmásodpercben eltelt idő a natív RocksDB-pillanatképhez kapcsolódó fájlok külső tárterületre (ellenőrzőpont helyére) való szinkronizálására. |
customMetrics.rocksdbCommitFlushLatency |
Ezredmásodpercben eltelt idő a RocksDB memóriabeli változásainak kiürítéséhez a helyi lemezen. |
customMetrics.rocksdbCommitPauseLatency |
Ezredmásodpercben meg kell állítani a háttérmunkaszálakat (például tömörítés esetén) az ellenőrzőpont-véglegesítés részeként. |
customMetrics.rocksdbCommitWriteBatchLatency |
Ezredmásodpercben eltelt idő a memórián belüli strukturált szakaszos írások (WriteBatch ) natív RocksDB-ben való alkalmazásához. |
customMetrics.rocksdbFilesCopied |
A RocksDB Fájlkezelő által nyomon követett fájlok száma. |
customMetrics.rocksdbFilesReused |
A RocksDB Fájlkezelő által nyomon követett fájlok száma. |
customMetrics.rocksdbGetCount |
A get db-hez intézett hívások száma (Ez nem tartalmazza gets WriteBatch az előkészítési írásokhoz használt memóriabeli köteget). |
customMetrics.rocksdbGetLatency |
A mögöttes natív RocksDB::Get hívás átlagos időtartama nanoszekundumokban. |
customMetrics.rocksdbReadBlockCacheHitCount |
A RocksDB blokkgyorsítótárának nagy része hasznos vagy nem hasznos, és elkerüli a helyi lemezolvasásokat. |
customMetrics.rocksdbReadBlockCacheMissCount |
A RocksDB blokkgyorsítótárának nagy része hasznos vagy nem hasznos, és elkerüli a helyi lemezolvasásokat. |
customMetrics.rocksdbSstFileSize |
Az összes SST-fájl mérete. Az SST a statikus rendezésű táblázatot jelenti, amely a RocksDB által az adatok tárolására használt táblázatos struktúra. |
customMetrics.rocksdbTotalBytesRead |
Műveletek által get beolvasott tömörítetlen bájtok száma. |
customMetrics.rocksdbTotalBytesReadByCompaction |
A tömörítési folyamat által a lemezről beolvasott bájtok száma. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Az állapotalapú műveletek némelyike (például időtúllépési feldolgozás FlatMapGroupsWithState és vízjelezés) a DB-ben lévő adatok iterátoron keresztüli olvasását igényli. Ez a metrika az iterátor használatával beolvasott tömörítetlen adatok méretét jelöli. |
customMetrics.rocksdbTotalBytesWritten |
Műveletek által put írt tömörítetlen bájtok száma. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
A tömörítési folyamat által a lemezre írt bájtok száma. |
customMetrics.rocksdbTotalCompactionLatencyMs |
A RocksDB tömörítéseinek idő ezredmásodpercei, beleértve a háttértömörítéseket és a véglegesítés során kezdeményezett opcionális tömörítést. |
customMetrics.rocksdbTotalFlushLatencyMs |
Kiürítési idő, beleértve a háttér kiürítését is. A kiürítési műveletek olyan folyamatok, amelyekkel a MemTable ki lesz ürítve a tárolóba, miután megtelt. A MemTables az első szint, ahol az adatok a RocksDB-ben vannak tárolva. |
customMetrics.rocksdbZipFileBytesUncompressed |
A RocksDB File Manager kezeli a fizikai SST-fájl lemezterületének kihasználtságát és törlését. Ez a metrika a tömörítetlen zip-fájlokat jelöli bájtokban a Fájlkezelő által jelentett módon. |
forrásobjektum (Kafka)
Metrika | Leírás |
---|---|
sources.description |
Annak a forrásnak a neve, amelyből a streamlekérdezés olvas. Például: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
sources.startOffset Objektum |
A streamelési feladat által elindított Kafka-témakör eltolásszámának kezdőszáma. |
sources.endOffset Objektum |
A mikrobatch által feldolgozott legújabb eltolás. Ez egyenlő latestOffset lehet egy folyamatban lévő mikrobatch-végrehajtással. |
sources.latestOffset Objektum |
A mikrobatch által kitalált legújabb eltolás. Szabályozás esetén előfordulhat, hogy a mikrokötegelési folyamat nem dolgozza fel az összes eltolást endOffset latestOffset , ami és eltérő lehet. |
sources.numInputRows |
A forrásból feldolgozott bemeneti sorok száma. |
sources.inputRowsPerSecond |
Az adatok forrásként való feldolgozásának sebessége. |
sources.processedRowsPerSecond |
A Spark által a forráshoz tartozó adatok feldolgozásának sebessége. |
sources.metrics object (Kafka)
Metrika | Leírás |
---|---|
sources.metrics.avgOffsetsBehindLatest |
A streamelési lekérdezés által az összes előfizetett témakör közül a legújabb elérhető eltolás mögött található eltolások átlagos száma. |
sources.metrics.estimatedTotalBytesBehindLatest |
Becsült bájtok száma, amelyeket a lekérdezési folyamat nem használt fel az előfizetett témakörökből. |
sources.metrics.maxOffsetsBehindLatest |
A streamelési lekérdezés által az összes előfizetett témakör legújabb elérhető eltolása mögött található eltolások maximális száma. |
sources.metrics.minOffsetsBehindLatest |
A streamelési lekérdezés által az összes előfizetett témakör közül a legújabb elérhető eltolás mögött található eltolások minimális száma. |
fogadó objektum (Kafka)
Metrika | Leírás |
---|---|
sink.description |
Annak a fogadónak a neve, amelybe a streamelési lekérdezés ír. Például: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
A kimeneti táblába vagy fogadóba a mikrobatch részeként írt sorok száma. Bizonyos helyzetekben ez az érték lehet "-1", és általában "ismeretlenként" értelmezhető. |
forrásobjektum (Delta Lake)
Metrika | Leírás |
---|---|
sources.description |
Annak a forrásnak a neve, amelyből a streamlekérdezés olvas. Például: “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
A szerializálás azon verziója, amellyel ez az eltolás kódolva van. |
sources.[startOffset/endOffset].reservoirId |
Annak a táblázatnak az azonosítója, amelyből olvas. Ez a lekérdezés újraindítása során a helytelen konfiguráció észlelésére szolgál. |
sources.[startOffset/endOffset].reservoirVersion |
A jelenleg feldolgozni kívánt tábla verziója. |
sources.[startOffset/endOffset].index |
Indexelés az ebben a verzióban lévő sorrendben AddFiles . Ez arra szolgál, hogy a nagy véglegesítéseket több kötegre bontsa. Ez az index a rendezés és path a modificationTimestamp . |
sources.[startOffset/endOffset].isStartingVersion |
Azt jelzi, hogy ez az eltolás olyan lekérdezést jelöl-e, amely a módosítások feldolgozása helyett indul el. Új lekérdezés indításakor a rendszer feldolgoz minden olyan adatot, amely a tábla elején található, majd az új adatok is megérkeztek. |
sources.latestOffset |
A mikrobatch-lekérdezés által feldolgozott legújabb eltolás. |
sources.numInputRows |
A forrásból feldolgozott bemeneti sorok száma. |
sources.inputRowsPerSecond |
Az adatok forrásként való feldolgozásának sebessége. |
sources.processedRowsPerSecond |
A Spark által a forráshoz tartozó adatok feldolgozásának sebessége. |
sources.metrics.numBytesOutstanding |
A kiemelkedő fájlok (a RocksDB által nyomon követett fájlok) mérete kombinálva. Ez a Delta és az Automatikus betöltő szolgáltatás hátralékmetrikája streamforrásként. |
sources.metrics.numFilesOutstanding |
A feldolgozandó fájlok száma. Ez a Delta és az Automatikus betöltő szolgáltatás hátralékmetrikája streamforrásként. |
fogadó objektum (Delta Lake)
Metrika | Leírás |
---|---|
sink.description |
Annak a fogadónak a neve, amelybe a streamelési lekérdezés ír. Például: “DeltaSink[table]” . |
sink.numOutputRows |
A metrika sorainak száma "-1", mivel a Spark nem tud kimeneti sorokat kikövetkeztetni a DSv1-fogadókhoz, ami a Delta Lake-fogadó besorolása. |
Példák
Példa Kafka-to-Kafka StreamingQueryListener eseményre
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Példa Delta Lake-to-Delta Lake StreamingQueryListener eseményre
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Példa a Delta Lake StreamingQueryListener esemény sebességének forrására
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}