A strukturált streamelés lekérdezéseinek monitorozása az Azure Databricksben

Az Azure Databricks beépített monitorozást biztosít strukturált streamelési alkalmazásokhoz a Stream lap Spark felhasználói felületén keresztül.

Strukturált streamelési lekérdezések megkülönböztetése a Spark felhasználói felületén

Adjon meg egy egyedi lekérdezésnevet a streameknek, ha hozzáadja .queryName(<query-name>) a kódhoz, writeStream így könnyen megkülönböztetheti, hogy mely metrikák tartoznak a Spark felhasználói felületén lévő streamhez.

Strukturált streamelési metrikák leküldése külső szolgáltatásokba

A streamelési metrikák az Apache Spark Stream Query Listener felületének használatával küldhetők le külső szolgáltatásokba riasztási vagy irányítópult-használati esetekhez. A Databricks Runtime 11.3 LTS-ben és újabb verziókban a streamlekérdezés figyelője elérhető a Pythonban és a Scalában.

Fontos

A Unity Catalog által felügyelt hitelesítő adatok és objektumok nem használhatók a logikában StreamingQueryListener .

Feljegyzés

A figyelőkkel társított feldolgozási késés hátrányosan befolyásolhatja a lekérdezések feldolgozását. A Databricks azt javasolja, hogy minimalizálja a feldolgozási logikát ezekben a figyelőkben, és alacsony késésű fogadókba, például a Kafkába írjon.

Az alábbi kód alapvető példákat tartalmaz a figyelők implementálására szolgáló szintaxisra:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Megfigyelhető metrikák definiálása a strukturált streamelésben

A megfigyelhető metrikák tetszőleges aggregátumfüggvényeket neveznek el, amelyek meghatározhatók egy lekérdezésen (DataFrame). Amint egy DataFrame végrehajtása eléri a befejezési pontot (azaz befejez egy kötegelt lekérdezést vagy elér egy streamelési időszakot), a rendszer egy elnevezett eseményt bocsát ki, amely az utolsó befejezési pont óta feldolgozott adatok metrikáit tartalmazza.

Ezeket a metrikákat úgy figyelheti meg, hogy egy figyelőt csatol a Spark-munkamenethez. A figyelő a végrehajtási módtól függ:

  • Batch mód: Használat QueryExecutionListener.

    QueryExecutionListener a lekérdezés befejezésekor hívjuk meg. A metrikák elérése a QueryExecution.observedMetrics térkép használatával.

  • Streamelés vagy mikroköteg: Használja StreamingQueryListener.

    StreamingQueryListener akkor lesz meghívva, amikor a streamelési lekérdezés befejez egy korszakot. A metrikák elérése a StreamingQueryProgress.observedMetrics térkép használatával. Az Azure Databricks nem támogatja a folyamatos végrehajtási streamelést.

Példa:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener objektummetrikák

Metrika Leírás
id Az újraindítások között megmaradó egyedi lekérdezésazonosító. Lásd: StreamingQuery.id().
runId Egyedi lekérdezésazonosító minden indításhoz vagy újraindításhoz. Lásd: StreamingQuery.runId().
name A lekérdezés felhasználó által megadott neve. Null érték, ha nincs megadva.
timestamp A mikro köteg végrehajtásának időbélyege.
batchId A feldolgozandó adatok aktuális kötegének egyedi azonosítója. Vegye figyelembe, hogy hiba utáni újrapróbálkozás esetén egy adott kötegazonosító többször is végrehajtható. Hasonlóképpen, ha nincs feldolgozandó adat, a kötegazonosító nem növekszik.
numInputRows Az eseményindítókban feldolgozott rekordok összesítése (az összes forrásban).
inputRowsPerSecond A beérkező adatok összesített (minden forrásra kiterjedő) aránya.
processedRowsPerSecond Összesítési sebesség (az összes forrásban) az adatok feldolgozásának gyakorisága a Sparkban.

durationMs objektum

Információ a mikroköteg-végrehajtási folyamat különböző szakaszainak elvégzéséhez szükséges időről.

Metrika Leírás
durationMs.addBatch A mikrobatch végrehajtásához szükséges idő. Ez kizárja a Spark által a mikrobatch tervezéséhez szükséges időt.
durationMs.getBatch Az eltolások metaadatainak lekérése a forrásból időbe telik.
durationMs.latestOffset A mikrobatchhez felhasznált legújabb eltolás. Ez a folyamatobjektum arra az időre vonatkozik, amely a legújabb eltolás lekéréséhez szükséges a forrásokból.
durationMs.queryPlanning A végrehajtási terv létrehozásához szükséges idő.
durationMs.triggerExecution A mikrobatch megtervezéséhez és végrehajtásához szükséges idő.
durationMs.walCommit Az új elérhető eltolások véglegesítéséhez szükséges idő.

eventTime objektum

Információ a mikro kötegben feldolgozott adatokban látható eseményidő értékéről. Ezeket az adatokat a vízjel használja a strukturált streamelési feladatban definiált állapotalapú összesítések feldolgozásának állapotának vágására.

Metrika Leírás
eventTime.avg Az eseményindítóban látható átlagos eseményidő.
eventTime.max Az eseményindítóban látható maximális eseményidő.
eventTime.min Az eseményindítóban látható minimális eseményidő.
eventTime.watermark Az eseményindítóban használt vízjel értéke.

stateOperators objektum

Információk a strukturált streamelési feladatban meghatározott állapotalapú műveletekről és az ezekből előállított összesítésekről.

Metrika Leírás
stateOperators.operatorName Annak az állapotalapú operátornak a neve, amelyre a metrikák vonatkoznak. Például, symmetricHashJoin, dedupe. stateStoreSave
stateOperators.numRowsTotal Az állapotban lévő sorok száma az állapotalapú operátor vagy -összesítés eredményeként.
stateOperators.numRowsUpdated Az állapotalapú operátor vagy -összesítés eredményeként az állapotban frissített sorok száma.
stateOperators.numRowsRemoved Az állapotkezelő operátor vagy -összesítés eredményeként eltávolított sorok száma.
stateOperators.commitTimeMs Az összes frissítés véglegesítéséhez (az eltávolításokhoz) és az új verzió visszaadásához szükséges idő.
stateOperators.memoryUsedBytes Az állapottároló által használt memória.
stateOperators.numRowsDroppedByWatermark Az állapotalapú összesítésben túl későnek ítélt sorok száma. Csak streamelési aggregációk: Az összesítés után elvetett sorok száma, nyers bemeneti sorok helyett. A szám nem pontos, de azt jelezheti, hogy a késői adatok elvetése történik.
stateOperators.numShufflePartitions Az állapotalapú operátor shuffle partícióinak száma.
stateOperators.numStateStoreInstances Az operátor által inicializált és karbantartott állapottároló tényleges példánya. Sok állapotalapú operátor esetében ez megegyezik a partíciók számával, de a stream-stream illesztés partíciónként négy állapottárpéldányt inicializál.

stateOperators.customMetrics objektum

A RocksDB-től gyűjtött információk, amelyek metrikákat rögzítenek a teljesítményéről és műveleteiről a strukturált streamelési feladathoz fenntartott állapotalapú értékek tekintetében. További információ: A RocksDB állapottároló konfigurálása az Azure Databricksben.

Metrika Leírás
customMetrics.rocksdbBytesCopied A RocksDB Fájlkezelő által nyomon követett bájtok száma.
customMetrics.rocksdbCommitCheckpointLatency Ezredmásodpercben idő a natív RocksDB pillanatképének készítésére és egy helyi könyvtárba való írására.
customMetrics.rocksdbCompactLatency Az ellenőrzőpont véglegesítése során ezredmásodpercben megadott tömörítési idő (nem kötelező).
customMetrics.rocksdbCommitFileSyncLatencyMs Ezredmásodpercben eltelt idő a natív RocksDB-pillanatképhez kapcsolódó fájlok külső tárterületre (ellenőrzőpont helyére) való szinkronizálására.
customMetrics.rocksdbCommitFlushLatency Ezredmásodpercben eltelt idő a RocksDB memóriabeli változásainak kiürítéséhez a helyi lemezen.
customMetrics.rocksdbCommitPauseLatency Ezredmásodpercben meg kell állítani a háttérmunkaszálakat (például tömörítés esetén) az ellenőrzőpont-véglegesítés részeként.
customMetrics.rocksdbCommitWriteBatchLatency Ezredmásodpercben eltelt idő a memórián belüli strukturált szakaszos írások (WriteBatch) natív RocksDB-ben való alkalmazásához.
customMetrics.rocksdbFilesCopied A RocksDB Fájlkezelő által nyomon követett fájlok száma.
customMetrics.rocksdbFilesReused A RocksDB Fájlkezelő által nyomon követett fájlok száma.
customMetrics.rocksdbGetCount A get db-hez intézett hívások száma (Ez nem tartalmazza getsWriteBatchaz előkészítési írásokhoz használt memóriabeli köteget).
customMetrics.rocksdbGetLatency A mögöttes natív RocksDB::Get hívás átlagos időtartama nanoszekundumokban.
customMetrics.rocksdbReadBlockCacheHitCount A RocksDB blokkgyorsítótárának nagy része hasznos vagy nem hasznos, és elkerüli a helyi lemezolvasásokat.
customMetrics.rocksdbReadBlockCacheMissCount A RocksDB blokkgyorsítótárának nagy része hasznos vagy nem hasznos, és elkerüli a helyi lemezolvasásokat.
customMetrics.rocksdbSstFileSize Az összes SST-fájl mérete. Az SST a statikus rendezésű táblázatot jelenti, amely a RocksDB által az adatok tárolására használt táblázatos struktúra.
customMetrics.rocksdbTotalBytesRead Műveletek által get beolvasott tömörítetlen bájtok száma.
customMetrics.rocksdbTotalBytesReadByCompaction A tömörítési folyamat által a lemezről beolvasott bájtok száma.
customMetrics.rocksdbTotalBytesReadThroughIterator Az állapotalapú műveletek némelyike (például időtúllépési feldolgozás FlatMapGroupsWithState és vízjelezés) a DB-ben lévő adatok iterátoron keresztüli olvasását igényli. Ez a metrika az iterátor használatával beolvasott tömörítetlen adatok méretét jelöli.
customMetrics.rocksdbTotalBytesWritten Műveletek által put írt tömörítetlen bájtok száma.
customMetrics.rocksdbTotalBytesWrittenByCompaction A tömörítési folyamat által a lemezre írt bájtok száma.
customMetrics.rocksdbTotalCompactionLatencyMs A RocksDB tömörítéseinek idő ezredmásodpercei, beleértve a háttértömörítéseket és a véglegesítés során kezdeményezett opcionális tömörítést.
customMetrics.rocksdbTotalFlushLatencyMs Kiürítési idő, beleértve a háttér kiürítését is. A kiürítési műveletek olyan folyamatok, amelyekkel a MemTable ki lesz ürítve a tárolóba, miután megtelt. A MemTables az első szint, ahol az adatok a RocksDB-ben vannak tárolva.
customMetrics.rocksdbZipFileBytesUncompressed A RocksDB File Manager kezeli a fizikai SST-fájl lemezterületének kihasználtságát és törlését. Ez a metrika a tömörítetlen zip-fájlokat jelöli bájtokban a Fájlkezelő által jelentett módon.

forrásobjektum (Kafka)

Metrika Leírás
sources.description Annak a forrásnak a neve, amelyből a streamlekérdezés olvas. Például: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset Objektum A streamelési feladat által elindított Kafka-témakör eltolásszámának kezdőszáma.
sources.endOffset Objektum A mikrobatch által feldolgozott legújabb eltolás. Ez egyenlő latestOffset lehet egy folyamatban lévő mikrobatch-végrehajtással.
sources.latestOffset Objektum A mikrobatch által kitalált legújabb eltolás. Szabályozás esetén előfordulhat, hogy a mikrokötegelési folyamat nem dolgozza fel az összes eltolást endOffsetlatestOffset , ami és eltérő lehet.
sources.numInputRows A forrásból feldolgozott bemeneti sorok száma.
sources.inputRowsPerSecond Az adatok forrásként való feldolgozásának sebessége.
sources.processedRowsPerSecond A Spark által a forráshoz tartozó adatok feldolgozásának sebessége.

sources.metrics object (Kafka)

Metrika Leírás
sources.metrics.avgOffsetsBehindLatest A streamelési lekérdezés által az összes előfizetett témakör közül a legújabb elérhető eltolás mögött található eltolások átlagos száma.
sources.metrics.estimatedTotalBytesBehindLatest Becsült bájtok száma, amelyeket a lekérdezési folyamat nem használt fel az előfizetett témakörökből.
sources.metrics.maxOffsetsBehindLatest A streamelési lekérdezés által az összes előfizetett témakör legújabb elérhető eltolása mögött található eltolások maximális száma.
sources.metrics.minOffsetsBehindLatest A streamelési lekérdezés által az összes előfizetett témakör közül a legújabb elérhető eltolás mögött található eltolások minimális száma.

fogadó objektum (Kafka)

Metrika Leírás
sink.description Annak a fogadónak a neve, amelybe a streamelési lekérdezés ír. Például: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows A kimeneti táblába vagy fogadóba a mikrobatch részeként írt sorok száma. Bizonyos helyzetekben ez az érték lehet "-1", és általában "ismeretlenként" értelmezhető.

forrásobjektum (Delta Lake)

Metrika Leírás
sources.description Annak a forrásnak a neve, amelyből a streamlekérdezés olvas. Például: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion A szerializálás azon verziója, amellyel ez az eltolás kódolva van.
sources.[startOffset/endOffset].reservoirId Annak a táblázatnak az azonosítója, amelyből olvas. Ez a lekérdezés újraindítása során a helytelen konfiguráció észlelésére szolgál.
sources.[startOffset/endOffset].reservoirVersion A jelenleg feldolgozni kívánt tábla verziója.
sources.[startOffset/endOffset].index Indexelés az ebben a verzióban lévő sorrendben AddFiles . Ez arra szolgál, hogy a nagy véglegesítéseket több kötegre bontsa. Ez az index a rendezés és patha modificationTimestamp .
sources.[startOffset/endOffset].isStartingVersion Azt jelzi, hogy ez az eltolás olyan lekérdezést jelöl-e, amely a módosítások feldolgozása helyett indul el. Új lekérdezés indításakor a rendszer feldolgoz minden olyan adatot, amely a tábla elején található, majd az új adatok is megérkeztek.
sources.latestOffset A mikrobatch-lekérdezés által feldolgozott legújabb eltolás.
sources.numInputRows A forrásból feldolgozott bemeneti sorok száma.
sources.inputRowsPerSecond Az adatok forrásként való feldolgozásának sebessége.
sources.processedRowsPerSecond A Spark által a forráshoz tartozó adatok feldolgozásának sebessége.
sources.metrics.numBytesOutstanding A kiemelkedő fájlok (a RocksDB által nyomon követett fájlok) mérete kombinálva. Ez a Delta és az Automatikus betöltő szolgáltatás hátralékmetrikája streamforrásként.
sources.metrics.numFilesOutstanding A feldolgozandó fájlok száma. Ez a Delta és az Automatikus betöltő szolgáltatás hátralékmetrikája streamforrásként.

fogadó objektum (Delta Lake)

Metrika Leírás
sink.description Annak a fogadónak a neve, amelybe a streamelési lekérdezés ír. Például: “DeltaSink[table]”.
sink.numOutputRows A metrika sorainak száma "-1", mivel a Spark nem tud kimeneti sorokat kikövetkeztetni a DSv1-fogadókhoz, ami a Delta Lake-fogadó besorolása.

Példák

Példa Kafka-to-Kafka StreamingQueryListener eseményre

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Példa Delta Lake-to-Delta Lake StreamingQueryListener eseményre

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Példa a Delta Lake StreamingQueryListener esemény sebességének forrására

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}