Sdílet prostřednictvím


Monitorování dotazů strukturovaného streamování v Azure Databricks

Azure Databricks poskytuje integrované monitorování pro aplikace strukturovaného streamování prostřednictvím uživatelského rozhraní Sparku na kartě Streamování .

Rozlišení dotazů strukturovaného streamování v uživatelském rozhraní Sparku

Zadejte jedinečný název dotazu přidáním .queryName(<query-name>) do writeStream kódu, abyste snadno rozlišili metriky, ke kterým streamům patří v uživatelském rozhraní Sparku.

Nabízení metrik strukturovaného streamování do externích služeb

Metriky streamování je možné odeslat do externích služeb pro upozorňování nebo případy použití řídicích panelů pomocí rozhraní naslouchacího procesu dotazů streamování Apache Sparku. Ve službě Databricks Runtime 11.3 LTS a novější je naslouchací proces streamování dotazů dostupný v Pythonu a Scala.

Důležité

Přihlašovací údaje a objekty spravované katalogem Unity nelze použít v StreamingQueryListener logice.

Poznámka:

Latence zpracování s naslouchacími procesy může výrazně ovlivnit rychlost zpracování dotazů. Doporučujeme omezit logiku zpracování v těchto naslouchacích procesech a rozhodnout se psát do systémů rychlé odezvy, jako je Kafka, kvůli efektivitě.

Následující kód obsahuje základní příklady syntaxe pro implementaci naslouchacího procesu:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definování pozorovatelných metrik ve strukturovaném streamování

Pozorovatelné metriky se nazývají libovolné agregační funkce, které lze definovat v dotazu (DataFrame). Jakmile provádění datového rámce dosáhne bodu dokončení (tj. dokončí dávkové dotaz nebo dosáhne epochy streamování), vygeneruje se pojmenovaná událost obsahující metriky pro data zpracovávaná od posledního bodu dokončení.

Tyto metriky můžete sledovat připojením naslouchacího procesu k relaci Sparku. Naslouchací proces závisí na režimu spuštění:

  • Režim dávky: Použijte QueryExecutionListener.

    QueryExecutionListener je volána po dokončení dotazu. Získejte přístup k metrikám pomocí QueryExecution.observedMetrics mapy.

  • Streamování nebo mikrobatch: Použijte StreamingQueryListener.

    StreamingQueryListener se volá, když streamovací dotaz dokončí epochu. Získejte přístup k metrikám pomocí StreamingQueryProgress.observedMetrics mapy. Azure Databricks nepodporuje streamování průběžného spouštění.

Příklad:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Metriky objektů StreamingQueryListener

Metrický Popis
id Jedinečné ID dotazu, které se zachová při restartování.
runId ID dotazu, které je jedinečné pro každé spuštění/restartování. Viz StreamingQuery.runId().
name Uživatelem zadaný název dotazu. Název má hodnotu null, pokud není zadán žádný název.
timestamp Časové razítko pro spuštění mikrobatchu.
batchId Jedinečné ID pro aktuální dávku zpracovávaných dat. V případě opakování po selhání může být dané ID dávky provedeno vícekrát. Podobně platí, že pokud neexistují žádná data ke zpracování, ID dávky se nezvýší.
numInputRows Agregace (napříč všemi zdroji) počtu záznamů zpracovaných v triggeru
inputRowsPerSecond Míra příchozích dat (napříč všemi zdroji)
processedRowsPerSecond Agregace (napříč všemi zdroji) s jakou Spark zpracovává data.

DurationMs – objekt

Informace o době potřebnou k dokončení různých fází procesu provádění mikrobatchu.

Metrický Popis
durationMs.addBatch Doba potřebná ke spuštění mikrobatchu. Tím se vyloučí čas potřebný k naplánování mikrobatchu.
durationMs.getBatch Doba potřebnou k načtení metadat o posunech ze zdroje.
durationMs.latestOffset Nejnovější posun spotřebovaný pro mikrobatch. Tento objekt průběhu odkazuje na čas potřebný k načtení nejnovějšího posunu ze zdrojů.
durationMs.queryPlanning Doba potřebná ke generování plánu provádění.
durationMs.triggerExecution Doba, která trvá naplánování a spuštění mikrobatchu.
durationMs.walCommit Doba potřebná k potvrzení nových dostupných posunů.

eventTime – objekt

Informace o hodnotě času události, které se zobrazují v datech zpracovávaných v mikrobatchu. Tato data vodoznak používá k zjištění, jak oříznout stav stavových agregací definovaných v úloze strukturovaného streamování.

Metrický Popis
eventTime.avg Průměrná doba události zobrazená v této aktivační události.
eventTime.max Maximální doba události zobrazená v daném triggeru.
eventTime.min Minimální doba události zobrazená v této aktivační události.
eventTime.watermark Hodnota vodoznaku použitého v této aktivační události.

stateOperators – objekt

Informace o stavových operacích, které jsou definovány v úloze strukturovaného streamování a agregace vytvořené z nich.

Metrický Popis
stateOperators.operatorName Název stavového operátoru, ke kterému se metriky vztahují, například symmetricHashJoin, dedupe. stateStoreSave
stateOperators.numRowsTotal Celkový počet řádků ve stavu v důsledku stavového operátoru nebo agregace.
stateOperators.numRowsUpdated Celkový počet řádků aktualizovaných ve stavu v důsledku stavového operátoru nebo agregace.
stateOperators.allUpdatesTimeMs Tato metrika v současné době není měřitelná sparkem a plánuje se odebrat v budoucích aktualizacích.
stateOperators.numRowsRemoved Celkový počet řádků odebraných ze stavu v důsledku stavového operátoru nebo agregace.
stateOperators.allRemovalsTimeMs Tato metrika v současné době není měřitelná sparkem a plánuje se odebrat v budoucích aktualizacích.
stateOperators.commitTimeMs Doba potřebná k potvrzení všech aktualizací (vložení a odebrání) a vrácení nové verze.
stateOperators.memoryUsedBytes Paměť používaná úložištěm stavů.
stateOperators.numRowsDroppedByWatermark Počet řádků, které jsou považovány za příliš pozdě, aby se zahrnuly do stavové agregace. Pouze agregace streamování: Počet vynechaných řádků po agregaci (ne nezpracovaných vstupních řádků). Toto číslo není přesné, ale poskytuje indikaci, že dochází k pozdnímu vyřazení dat.
stateOperators.numShufflePartitions Počet oddílů pro náhodné prohazování tohoto stavového operátoru.
stateOperators.numStateStoreInstances Skutečná instance úložiště stavu, kterou operátor inicializoval a zachoval. U mnoha stavových operátorů je to stejné jako počet oddílů. Spojení stream-stream však inicializují čtyři instance úložiště stavu na oddíl.

stateOperators.customMetrics – objekt

Informace shromážděné ze služby RocksDB zachycující metriky týkající se výkonu a operací s ohledem na stavové hodnoty, které udržuje pro úlohu strukturovaného streamování. Další informace najdete v tématu Konfigurace úložiště stavů RocksDB v Azure Databricks.

Metrický Popis
customMetrics.rocksdbBytesCopied Počet bajtů zkopírovaných podle sledování správcem souborů RocksDB.
customMetrics.rocksdbCommitCheckpointLatency Čas v milisekundách pořídí snímek nativní databáze RocksDB a zapíše ho do místního adresáře.
customMetrics.rocksdbCompactLatency Doba komprimace v milisekundách (volitelné) během potvrzení kontrolního bodu.
customMetrics.rocksdbCommitFileSyncLatencyMs Čas synchronizace nativního snímku RocksDB s externím úložištěm (umístění kontrolního bodu) v milisekundách
customMetrics.rocksdbCommitFlushLatency Čas v milisekundách vyprázdnění paměti RocksDB se změní na místní disk.
customMetrics.rocksdbCommitPauseLatency Doba v milisekundách zastaví pracovní vlákna na pozadí jako součást potvrzení kontrolního bodu, například pro komprimace.
customMetrics.rocksdbCommitWriteBatchLatency Čas v milisekundách, který používá fázované zápisy ve struktuře v paměti (WriteBatch) na nativní rocksDB.
customMetrics.rocksdbFilesCopied Počet souborů zkopírovaných podle sledování správcem souborů RocksDB.
customMetrics.rocksdbFilesReused Početsouborůch
customMetrics.rocksdbGetCount Počet get volání do databáze (nezahrnuje gets dávku WriteBatch v paměti použitou pro přípravné zápisy).
customMetrics.rocksdbGetLatency Průměrná doba v nanosekundách základního nativního RocksDB::Get volání.
customMetrics.rocksdbReadBlockCacheHitCount Počet přístupů do mezipaměti bloků ve službě RocksDB, které jsou užitečné při zabránění čtení místních disků.
customMetrics.rocksdbReadBlockCacheMissCount Počet mezipaměti bloků ve službě RocksDB není užitečný při zabránění čtení místních disků.
customMetrics.rocksdbSstFileSize Velikost všech souborů se statickou seřazenou tabulkou (SST) – tabulková struktura RocksDB používá k ukládání dat.
customMetrics.rocksdbTotalBytesRead Počet nekomprimovaných bajtů přečtených operacemi get .
customMetrics.rocksdbTotalBytesReadByCompaction Počet bajtů, které proces komprimace načítá z disku.
customMetrics.rocksdbTotalBytesReadThroughIterator Celkový počet bajtů nekomprimovaných dat přečtených pomocí iterátoru. Některé stavové operace (například zpracování FlatMapGroupsWithState časového limitu a vodoznaku) vyžadují čtení dat v databázi prostřednictvím iterátoru.
customMetrics.rocksdbTotalBytesWritten Celkový počet nekomprimovaných bajtů zapsaných operacemi put .
customMetrics.rocksdbTotalBytesWrittenByCompaction Celkový počet bajtů, které proces komprimace zapisuje na disk.
customMetrics.rocksdbTotalCompactionLatencyMs Čas v milisekundách pro komprimace RocksDB, včetně komprimací pozadí a volitelné komprimace zahájené během potvrzení.
customMetrics.rocksdbTotalFlushLatencyMs Celkový čas vyprázdnění, včetně vyprazdňování na pozadí. Operace vyprázdnění jsou procesy, kterými MemTable se vyprázdní do úložiště, jakmile je zaplněno. MemTables jsou první úroveň, ve které jsou data uložená ve službě RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed Velikost v bajtech nekomprimovaných souborů ZIP hlášených správcem souborů. Správce souborů spravuje využití a odstranění fyzického místa na disku se soubory SST.

source – objekt (Kafka)

Metrický Popis
sources.description Podrobný popis zdroje Kafka, který určuje přesné téma Kafka, ze kterého se čte. Například: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset objekt Počáteční číslo posunu v tématu Kafka, ve kterém byla spuštěna úloha streamování.
sources.endOffset objekt Poslední posun zpracovaný mikrobatchem. To se může rovnat latestOffset probíhajícímu provádění mikrobatchu.
sources.latestOffset objekt Poslední posun z obrázku microbatch. Proces mikrobatchingu nemusí zpracovat všechny posuny v případě omezování, což vede k endOffset rozdílu a latestOffset rozdílu.
sources.numInputRows Počet vstupních řádků zpracovaných z tohoto zdroje
sources.inputRowsPerSecond Rychlost, s jakou data přicházejí ke zpracování z tohoto zdroje.
sources.processedRowsPerSecond Rychlost, s jakou Spark zpracovává data z tohoto zdroje.

sources.metrics – objekt (Kafka)

Metrický Popis
sources.metrics.avgOffsetsBehindLatest Průměrný počet posunů, které streamovací dotaz stojí za nejnovějším dostupným posunem mezi všemi předplatným tématy.
sources.metrics.estimatedTotalBytesBehindLatest Odhadovaný početch
sources.metrics.maxOffsetsBehindLatest Maximální počet posunů, které streamovací dotaz obsahuje, je za nejnovějším dostupným posunem mezi všemi předplatným tématy.
sources.metrics.minOffsetsBehindLatest Minimální počet posunů, které streamovací dotaz stojí za nejnovějším dostupným posunem mezi všemi předplatným tématy.

objekt jímky (Kafka)

Metrický Popis
sink.description Popis jímky Kafka, do které dotaz streamování píše, podrobně popisuje konkrétní použitou implementaci jímky Kafka. Například: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Počet řádků, které byly zapsány do výstupní tabulky nebo jímky jako součást mikrobatchu. V některých situacích může být tato hodnota -1 a obecně ji lze interpretovat jako "neznámá".

sources – objekt (Delta Lake)

Metrický Popis
sources.description Popis zdroje, ze kterého streamovací dotaz čte. Například: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion Verze serializace, se kterou je tento posun kódován.
sources.[startOffset/endOffset].reservoirId ID přečtené tabulky. Používá se k detekci chybné konfigurace při restartování dotazu.
sources.[startOffset/endOffset].reservoirVersion Verze tabulky, která se právě zpracovává
sources.[startOffset/endOffset].index Index v posloupnosti AddFiles této verze. Slouží k přerušení velkých potvrzení do více dávek. Tento index je vytvořen řazením modificationTimestamp a path.
sources.[startOffset/endOffset].isStartingVersion Určuje, zda aktuální posun označuje začátek nového streamovacího dotazu místo zpracování změn, ke kterým došlo po počátečním zpracování dat. Při spuštění nového dotazu se nejprve zpracuje všechna data, která jsou v tabulce na začátku, a pak všechna nová data, která dorazí.
sources.latestOffset Nejnovější posun zpracovaný dotazem microbatch.
sources.numInputRows Počet vstupních řádků zpracovaných z tohoto zdroje
sources.inputRowsPerSecond Rychlost, s jakou data přicházejí ke zpracování z tohoto zdroje.
sources.processedRowsPerSecond Rychlost, s jakou Spark zpracovává data z tohoto zdroje.
sources.metrics.numBytesOutstanding Kombinovaná velikost nevyřízených souborů (soubory sledované rocksDB) Jedná se o metriku backlogu pro Rozdílový a automatický zavaděč jako zdroj streamování.
sources.metrics.numFilesOutstanding Počet nevyřízených souborů, které se mají zpracovat. Jedná se o metriku backlogu pro Rozdílový a automatický zavaděč jako zdroj streamování.

objekt jímky (Delta Lake)

Metrický Popis
sink.description Popis jímky Delta podrobně popisuje konkrétní použitou implementaci jímky Delta. Například: “DeltaSink[table]”.
sink.numOutputRows Počet řádků je vždy -1, protože Spark nemůže odvodit výstupní řádky pro jímky DSv1, což je klasifikace jímky Delta Lake.

Příklady

Příklad události Kafka-to-Kafka StreamingQueryListener

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Příklad události Delta Lake-to-Delta Lake StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Příklad události Kinesis-to-Delta Lake StreamingQueryListener

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Příklad události Kafka+Delta Lake-to-Delta Lake StreamingQueryListener

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Příklad zdroje rychlosti do události Delta Lake StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}