Query's voor gestructureerd streamen bewaken in Azure Databricks

Azure Databricks biedt ingebouwde bewaking voor structured streaming-toepassingen via de Spark-gebruikersinterface op het tabblad Streaming .

Structured Streaming-query's onderscheiden in de Spark-gebruikersinterface

Geef uw streams een unieke querynaam op door toe te voegen .queryName(<query-name>) aan uw writeStream code om eenvoudig te onderscheiden welke metrische gegevens tot welke stream behoren in de Spark-gebruikersinterface.

Metrische gegevens over gestructureerd streamen naar externe services pushen

Metrische streaminggegevens kunnen worden gepusht naar externe services voor waarschuwingen of gebruiksvoorbeelden voor dashboards met behulp van de interface streamingquerylistener van Apache Spark. In Databricks Runtime 11.3 LTS en hoger is de Streaming Query Listener beschikbaar in Python en Scala.

Belangrijk

Referenties en objecten die worden beheerd door Unity Catalog kunnen niet worden gebruikt in StreamingQueryListener logica.

Notitie

De verwerkingslatentie die is gekoppeld aan listeners, kan de verwerking van query's nadelig beïnvloeden. Databricks raadt aan om verwerkingslogica in deze listeners te minimaliseren en te schrijven naar sinks met lage latentie, zoals Kafka.

De volgende code bevat basisvoorbeelden van de syntaxis voor het implementeren van een listener:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Waarneembare metrische gegevens definiëren in Structured Streaming

Waarneembare metrische gegevens worden willekeurige statistische functies genoemd die kunnen worden gedefinieerd voor een query (DataFrame). Zodra de uitvoering van een DataFrame een voltooiingspunt bereikt (dat wil gezegd, voltooit u een batchquery of bereikt u een streaming-epoch), wordt een benoemde gebeurtenis verzonden die de metrische gegevens bevat voor de gegevens die zijn verwerkt sinds het laatste voltooiingspunt.

U kunt deze metrische gegevens bekijken door een listener toe te voegen aan de Spark-sessie. De listener is afhankelijk van de uitvoeringsmodus:

  • Batchmodus: Gebruiken QueryExecutionListener.

    QueryExecutionListener wordt aangeroepen wanneer de query is voltooid. Toegang tot de metrische gegevens met behulp van de QueryExecution.observedMetrics kaart.

  • Streaming of microbatch: Gebruik StreamingQueryListener.

    StreamingQueryListener wordt aangeroepen wanneer de streamingquery een periode voltooit. Toegang tot de metrische gegevens met behulp van de StreamingQueryProgress.observedMetrics kaart. Azure Databricks biedt geen ondersteuning voor continue uitvoering van streaming.

Voorbeeld:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Metrische gegevens van streamingQueryListener-objecten

Metrisch Beschrijving
id Unieke query-id die zich blijft voordoen bij opnieuw opstarten. Zie StreamingQuery.id().
runId Unieke query-id voor elke start of herstart. Zie StreamingQuery.runId().
name Door de gebruiker opgegeven naam van de query. Null indien niet opgegeven.
timestamp Tijdstempel voor de uitvoering van de microbatch.
batchId Unieke id voor de huidige batch met gegevens die worden verwerkt. Houd er rekening mee dat in het geval van nieuwe pogingen na een fout een bepaalde batch-id meerdere keren kan worden uitgevoerd. Op dezelfde manier wordt de batch-id niet verhoogd wanneer er geen gegevens moeten worden verwerkt.
numInputRows Aggregeren (in alle bronnen) het aantal records dat in een trigger is verwerkt.
inputRowsPerSecond Aggregeren (in alle bronnen) het aantal binnenkomende gegevens.
processedRowsPerSecond Aggregeringssnelheid (voor alle bronnen) waarbij Spark gegevens verwerkt.

durationMs-object

Informatie over de tijd die nodig is om verschillende fasen van het microbatchuitvoeringsproces te voltooien.

Metrisch Beschrijving
durationMs.addBatch De tijd die nodig is om de microbatch uit te voeren. Dit sluit de tijd die Spark nodig heeft om de microbatch te plannen.
durationMs.getBatch De tijd die nodig is om de metagegevens over de offsets van de bron op te halen.
durationMs.latestOffset Meest recente offset verbruikt voor de microbatch. Dit voortgangsobject verwijst naar de tijd die nodig is om de meest recente offset van bronnen op te halen.
durationMs.queryPlanning De tijd die nodig is om het uitvoeringsplan te genereren.
durationMs.triggerExecution De tijd die nodig is om de microbatch te plannen en uit te voeren.
durationMs.walCommit De tijd die nodig is om de nieuwe beschikbare offsets vast te leggen.

eventTime-object

Informatie over de waarde van de gebeurtenistijd die wordt weergegeven in de gegevens die in de microbatch worden verwerkt. Deze gegevens worden door het watermerk gebruikt om erachter te komen hoe u de status kunt bijsnijden voor het verwerken van stateful aggregaties die zijn gedefinieerd in de structured streaming-taak.

Metrisch Beschrijving
eventTime.avg De gemiddelde gebeurtenistijd die in de trigger wordt weergegeven.
eventTime.max Maximale gebeurtenistijd die wordt weergegeven in de trigger.
eventTime.min Minimale gebeurtenistijd die wordt weergegeven in de trigger.
eventTime.watermark Waarde van het watermerk dat in de trigger wordt gebruikt.

stateOperators-object

Informatie over de stateful bewerkingen die zijn gedefinieerd in de Structured Streaming-taak en de aggregaties die daaruit worden geproduceerd.

Metrisch Beschrijving
stateOperators.operatorName Naam van de stateful operator waarmee de metrische gegevens betrekking hebben. Bijvoorbeeld, symmetricHashJoin, dedupe. stateStoreSave
stateOperators.numRowsTotal Het aantal rijen in de status als gevolg van de stateful operator of aggregatie.
stateOperators.numRowsUpdated Het aantal rijen dat in de status is bijgewerkt als gevolg van de stateful operator of aggregatie.
stateOperators.numRowsRemoved Het aantal rijen dat uit de status is verwijderd als gevolg van de stateful operator of aggregatie.
stateOperators.commitTimeMs De tijd die nodig is om alle updates door te voeren (plaatst en verwijdert) en een nieuwe versie retourneert.
stateOperators.memoryUsedBytes Geheugen dat wordt gebruikt door het statusarchief.
stateOperators.numRowsDroppedByWatermark Het aantal rijen dat te laat wordt beschouwd om te worden opgenomen in de stateful aggregatie. Alleen streamingaggregaties: het aantal rijen dat na de aggregatie is verwijderd en geen onbewerkte invoerrijen. Het getal is niet nauwkeurig, maar kan erop wijzen dat late gegevens worden verwijderd.
stateOperators.numShufflePartitions Aantal willekeurige partities voor deze stateful operator.
stateOperators.numStateStoreInstances Het werkelijke exemplaar van het statusarchief dat de operator heeft geïnitialiseerd en onderhouden. In veel stateful operators is dit hetzelfde als het aantal partities, maar stream-stream join initialiseert vier statusopslagexemplaren per partitie.

stateOperators.customMetrics-object

Informatie verzameld uit RocksDB die metrische gegevens vastlegt over de prestaties en bewerkingen met betrekking tot de stateful waarden die worden onderhouden voor de Structured Streaming-taak. Zie RocksDB-statusopslag configureren in Azure Databricks voor meer informatie.

Metrisch Beschrijving
customMetrics.rocksdbBytesCopied Het aantal bytes dat is gekopieerd als bijgehouden door RocksDB File Manager.
customMetrics.rocksdbCommitCheckpointLatency Tijd in milliseconden om een momentopname van systeemeigen RocksDB te maken en naar een lokale map te schrijven.
customMetrics.rocksdbCompactLatency Tijd in milliseconden voor compressie (optioneel) tijdens het doorvoeren van het controlepunt.
customMetrics.rocksdbCommitFileSyncLatencyMs Tijd in milliseconden om de systeemeigen RocksDB-momentopname gerelateerde bestanden te synchroniseren met een externe opslag (controlepuntlocatie).
customMetrics.rocksdbCommitFlushLatency Tijd in milliseconden om de wijzigingen in het geheugen van RocksDB in het geheugen op uw lokale schijf leeg te maken.
customMetrics.rocksdbCommitPauseLatency Tijd in milliseconden om de achtergrondwerkrolthreads (bijvoorbeeld voor compressie) te stoppen als onderdeel van de controlepuntdoorvoering.
customMetrics.rocksdbCommitWriteBatchLatency Tijd in milliseconden om de gefaseerde schrijfbewerkingen in de geheugenstructuur (WriteBatch) toe te passen op systeemeigen RocksDB.
customMetrics.rocksdbFilesCopied Het aantal bestanden dat is gekopieerd als bijgehouden door RocksDB File Manager.
customMetrics.rocksdbFilesReused Het aantal bestanden dat opnieuw wordt gebruikt als bijgehouden door RocksDB File Manager.
customMetrics.rocksdbGetCount get Aantal aanroepen naar de database (dit omvat gets niet vanWriteBatch: In-memory batch die wordt gebruikt voor faseringsschrijfbewerkingen).
customMetrics.rocksdbGetLatency Gemiddelde tijd in nanoseconden voor de onderliggende systeemeigen RocksDB::Get aanroep.
customMetrics.rocksdbReadBlockCacheHitCount Hoeveel van de blokcache in RocksDB is nuttig of niet en vermijdt leesbewerkingen van lokale schijven.
customMetrics.rocksdbReadBlockCacheMissCount Hoeveel van de blokcache in RocksDB is nuttig of niet en vermijdt leesbewerkingen van lokale schijven.
customMetrics.rocksdbSstFileSize Grootte van alle SST-bestanden. SST staat voor Statische gesorteerde tabel. Dit is de tabellaire structuur die RocksDB gebruikt om gegevens op te slaan.
customMetrics.rocksdbTotalBytesRead Aantal niet-gecomprimeerde bytes gelezen door get bewerkingen.
customMetrics.rocksdbTotalBytesReadByCompaction Het aantal bytes dat door het compressieproces van de schijf wordt gelezen.
customMetrics.rocksdbTotalBytesReadThroughIterator Voor sommige stateful bewerkingen (bijvoorbeeld time-outverwerking in FlatMapGroupsWithState en watermerken) moeten gegevens in DB worden gelezen via een iterator. Deze metrische waarde vertegenwoordigt de grootte van niet-gecomprimeerde gegevens die worden gelezen met behulp van de iterator.
customMetrics.rocksdbTotalBytesWritten Aantal niet-gecomprimeerde bytes geschreven door put bewerkingen.
customMetrics.rocksdbTotalBytesWrittenByCompaction Het aantal bytes dat het compressieproces naar de schijf schrijft.
customMetrics.rocksdbTotalCompactionLatencyMs Tijd milliseconden voor RocksDB-compressies, inclusief achtergrondcompressies en de optionele compressie die tijdens de doorvoer is geïnitieerd.
customMetrics.rocksdbTotalFlushLatencyMs Spoeltijd, inclusief het leegmaken van de achtergrond. Flush-bewerkingen zijn processen waarmee de MemTable wordt leeggemaakt naar de opslag zodra deze vol is. MemTables zijn het eerste niveau waarop gegevens worden opgeslagen in RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed RocksDB File Manager beheert het fysieke SST-bestandsruimtegebruik en -verwijdering. Deze metrische waarde vertegenwoordigt de niet-gecomprimeerde zip-bestanden in bytes, zoals gerapporteerd door Bestandsbeheer.

bronobject (Kafka)

Metrisch Beschrijving
sources.description De naam van de bron waaruit de streamingquery leest. Bijvoorbeeld: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset Object Begin van het offsetnummer in het Kafka-onderwerp waarmee de streamingtaak is gestart.
sources.endOffset Object Meest recente offset verwerkt door de microbatch. Dit kan gelijk zijn aan latestOffset een doorlopende uitvoering van microbatch.
sources.latestOffset Object De meest recente offset die wordt bepaald door de microbatch. Wanneer er sprake is van beperking, verwerkt het microbatchproces mogelijk niet alle offsets, waardoor endOffset en latestOffset verschilt.
sources.numInputRows Het aantal invoerrijen dat uit deze bron is verwerkt.
sources.inputRowsPerSecond Snelheid waarmee gegevens binnenkomen voor verwerking voor deze bron.
sources.processedRowsPerSecond Snelheid waarmee Spark gegevens verwerkt voor deze bron.

sources.metrics-object (Kafka)

Metrisch Beschrijving
sources.metrics.avgOffsetsBehindLatest Het gemiddelde aantal offsets dat de streamingquery zich achter de meest recente beschikbare offset bevindt voor alle geabonneerde onderwerpen.
sources.metrics.estimatedTotalBytesBehindLatest Het geschatte aantal bytes dat het queryproces niet heeft verbruikt vanuit de geabonneerde onderwerpen.
sources.metrics.maxOffsetsBehindLatest Maximum aantal offsets dat de streamingquery zich achter de meest recente beschikbare offset bevindt voor alle geabonneerde onderwerpen.
sources.metrics.minOffsetsBehindLatest Minimaal aantal offsets dat de streamingquery zich achter de meest recente beschikbare offset bevindt voor alle geabonneerde onderwerpen.

sink-object (Kafka)

Metrisch Beschrijving
sink.description De naam van de sink waar de streamingquery naar schrijft. Bijvoorbeeld: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Het aantal rijen dat als onderdeel van de microbatch naar de uitvoertabel of sink is geschreven. In sommige situaties kan deze waarde '-1' zijn en over het algemeen worden geïnterpreteerd als 'onbekend'.

bronobject (Delta Lake)

Metrisch Beschrijving
sources.description De naam van de bron waaruit de streamingquery leest. Bijvoorbeeld: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion Versie van serialisatie waarmee deze offset is gecodeerd.
sources.[startOffset/endOffset].reservoirId Id van de tabel waaruit u leest. Dit wordt gebruikt om onjuiste configuratie te detecteren bij het opnieuw opstarten van een query.
sources.[startOffset/endOffset].reservoirVersion Versie van de tabel die u momenteel verwerkt.
sources.[startOffset/endOffset].index Indexeren in de volgorde van AddFiles in deze versie. Dit wordt gebruikt om grote doorvoeringen in meerdere batches op te splitsen. Deze index wordt gemaakt door te sorteren op modificationTimestamp en path.
sources.[startOffset/endOffset].isStartingVersion Of deze offset een query aangeeft die wordt gestart in plaats van wijzigingen te verwerken. Wanneer u een nieuwe query start, worden alle gegevens die aanwezig zijn in de tabel aan het begin verwerkt en vervolgens nieuwe gegevens die zijn aangekomen.
sources.latestOffset Meest recente offset verwerkt door de microbatch-query.
sources.numInputRows Het aantal invoerrijen dat uit deze bron is verwerkt.
sources.inputRowsPerSecond Snelheid waarmee gegevens binnenkomen voor verwerking voor deze bron.
sources.processedRowsPerSecond Snelheid waarmee Spark gegevens verwerkt voor deze bron.
sources.metrics.numBytesOutstanding Grootte van de openstaande bestanden (bestanden bijgehouden door RocksDB) gecombineerd. Dit is de metrische achterstand voor Delta en Auto Loader als streamingbron.
sources.metrics.numFilesOutstanding Aantal openstaande bestanden dat moet worden verwerkt. Dit is de metrische achterstand voor Delta en Auto Loader als streamingbron.

sink-object (Delta Lake)

Metrisch Beschrijving
sink.description Naam van de sink waarnaar de streamingquery schrijft. Bijvoorbeeld: “DeltaSink[table]”.
sink.numOutputRows Het aantal rijen in deze metrische waarde is -1 omdat Spark geen uitvoerrijen voor DSv1-sinks kan afleiden. Dit is de classificatie voor de Delta Lake-sink.

Voorbeelden

Voorbeeld van kafka-naar-Kafka StreamingQueryListener-gebeurtenis

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Voorbeeld van een Delta Lake-naar-Delta Lake StreamingQueryListener-gebeurtenis

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Voorbeeld van frequentiebron naar Delta Lake StreamingQueryListener-gebeurtenis

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}