Delen via


Query's voor gestructureerd streamen bewaken in Azure Databricks

Azure Databricks biedt ingebouwde bewaking voor structured streaming-toepassingen via de Spark-gebruikersinterface op het tabblad Streaming .

Structured Streaming-query's onderscheiden in de Spark-gebruikersinterface

Geef uw streams een unieke querynaam op door toe te voegen .queryName(<query-name>) aan uw writeStream code om eenvoudig te onderscheiden welke metrische gegevens tot welke stream behoren in de Spark-gebruikersinterface.

Metrische gegevens over gestructureerd streamen naar externe services pushen

Metrische streaminggegevens kunnen worden gepusht naar externe services voor waarschuwingen of gebruiksvoorbeelden voor dashboards met behulp van de interface streamingquerylistener van Apache Spark. In Databricks Runtime 11.3 LTS en hoger is de Streaming Query Listener beschikbaar in Python en Scala.

Belangrijk

Referenties en objecten die worden beheerd door Unity Catalog kunnen niet worden gebruikt in StreamingQueryListener logica.

Notitie

De verwerkingslatentie met listeners kan aanzienlijk van invloed zijn op de verwerkingssnelheden van query's. Het wordt aangeraden om de verwerkingslogica in deze listeners te beperken en ervoor te kiezen om te schrijven naar snelle responssystemen zoals Kafka voor efficiëntie.

De volgende code bevat basisvoorbeelden van de syntaxis voor het implementeren van een listener:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Waarneembare metrische gegevens definiëren in Structured Streaming

Waarneembare metrische gegevens worden willekeurige statistische functies genoemd die kunnen worden gedefinieerd voor een query (DataFrame). Zodra de uitvoering van een DataFrame een voltooiingspunt bereikt (dat wil gezegd, voltooit u een batchquery of bereikt u een streaming-epoch), wordt een benoemde gebeurtenis verzonden die de metrische gegevens bevat voor de gegevens die zijn verwerkt sinds het laatste voltooiingspunt.

U kunt deze metrische gegevens bekijken door een listener toe te voegen aan de Spark-sessie. De listener is afhankelijk van de uitvoeringsmodus:

  • Batchmodus: Gebruiken QueryExecutionListener.

    QueryExecutionListener wordt aangeroepen wanneer de query is voltooid. Toegang tot de metrische gegevens met behulp van de QueryExecution.observedMetrics kaart.

  • Streaming of microbatch: Gebruik StreamingQueryListener.

    StreamingQueryListener wordt aangeroepen wanneer de streamingquery een periode voltooit. Toegang tot de metrische gegevens met behulp van de StreamingQueryProgress.observedMetrics kaart. Azure Databricks biedt geen ondersteuning voor continue uitvoering van streaming.

Voorbeeld:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Metrische gegevens van streamingQueryListener-objecten

Metrisch Beschrijving
id Een unieke query-id die blijft bestaan tijdens het opnieuw opstarten.
runId Een query-id die uniek is voor elke start/herstart. Zie StreamingQuery.runId().
name De door de gebruiker opgegeven naam van de query. De naam is null als er geen naam is opgegeven.
timestamp De tijdstempel voor de uitvoering van de microbatch.
batchId Een unieke id voor de huidige batch met gegevens die worden verwerkt. In het geval van nieuwe pogingen na een fout, kan een bepaalde batch-id meerdere keren worden uitgevoerd. Op dezelfde manier wordt de batch-id niet verhoogd wanneer er geen gegevens moeten worden verwerkt.
numInputRows Het geaggregeerde aantal records (in alle bronnen) dat in een trigger is verwerkt.
inputRowsPerSecond De cumulatieve snelheid (voor alle bronnen) van binnenkomende gegevens.
processedRowsPerSecond De cumulatieve snelheid (voor alle bronnen) waarmee Spark gegevens verwerkt.

durationMs-object

Informatie over de tijd die nodig is om verschillende fasen van het microbatch-uitvoeringsproces te voltooien.

Metrisch Beschrijving
durationMs.addBatch De tijd die nodig was om de microbatch uit te voeren. Dit sluit de tijd die Spark nodig heeft om de microbatch te plannen.
durationMs.getBatch De tijd die nodig is om de metagegevens over de offsets van de bron op te halen.
durationMs.latestOffset De meest recente offset die wordt verbruikt voor de microbatch. Dit voortgangsobject verwijst naar de tijd die nodig is om de meest recente offset van bronnen op te halen.
durationMs.queryPlanning De tijd die nodig is om het uitvoeringsplan te genereren.
durationMs.triggerExecution De tijd die nodig is om de microbatch te plannen en uit te voeren.
durationMs.walCommit De tijd die nodig is om de nieuwe beschikbare offsets vast te leggen.

eventTime-object

Informatie over de waarde van de gebeurtenistijd die wordt gezien in de gegevens die in de microbatch worden verwerkt. Deze gegevens worden door het watermerk gebruikt om erachter te komen hoe u de status kunt bijsnijden voor het verwerken van stateful aggregaties die zijn gedefinieerd in de structured streaming-taak.

Metrisch Beschrijving
eventTime.avg De gemiddelde gebeurtenistijd die in die trigger wordt gezien.
eventTime.max De maximale gebeurtenistijd die in die trigger wordt weergegeven.
eventTime.min De minimale gebeurtenistijd die in die trigger wordt weergegeven.
eventTime.watermark De waarde van het watermerk dat in die trigger wordt gebruikt.

stateOperators-object

Informatie over de stateful bewerkingen die zijn gedefinieerd in de Structured Streaming-taak en de aggregaties die daaruit worden geproduceerd.

Metrisch Beschrijving
stateOperators.operatorName De naam van de stateful operator waarop de metrische gegevens betrekking hebben, zoals symmetricHashJoin, dedupestateStoreSave.
stateOperators.numRowsTotal Het totale aantal rijen met status als gevolg van een stateful operator of aggregatie.
stateOperators.numRowsUpdated Het totale aantal rijen dat in de status is bijgewerkt als gevolg van een stateful operator of aggregatie.
stateOperators.allUpdatesTimeMs Deze metrische waarde is momenteel niet meetbaar door Spark en is gepland om te worden verwijderd in toekomstige updates.
stateOperators.numRowsRemoved Het totale aantal rijen dat uit de status is verwijderd als gevolg van een stateful operator of aggregatie.
stateOperators.allRemovalsTimeMs Deze metrische waarde is momenteel niet meetbaar door Spark en is gepland om te worden verwijderd in toekomstige updates.
stateOperators.commitTimeMs De tijd die nodig is om alle updates door te voeren (plaatst en verwijdert) en een nieuwe versie retourneert.
stateOperators.memoryUsedBytes Geheugen dat wordt gebruikt door het statusarchief.
stateOperators.numRowsDroppedByWatermark Het aantal rijen dat te laat wordt beschouwd om te worden opgenomen in een stateful aggregatie. Alleen streamingaggregaties: het aantal rijen dat na aggregatie is verwijderd (geen onbewerkte invoerrijen). Dit getal is niet nauwkeurig, maar geeft een indicatie dat er late gegevens worden verwijderd.
stateOperators.numShufflePartitions Het aantal willekeurige partities voor deze stateful operator.
stateOperators.numStateStoreInstances Het werkelijke exemplaar van het statusarchief dat de operator heeft geïnitialiseerd en onderhouden. Voor veel stateful operators is dit hetzelfde als het aantal partities. Stream-stream-joins initialiseren echter vier exemplaren van het statusarchief per partitie.

stateOperators.customMetrics-object

Gegevens die worden verzameld uit RocksDB die metrische gegevens vastleggen over de prestaties en bewerkingen met betrekking tot de stateful waarden die worden onderhouden voor de structured streaming-taak. Zie RocksDB-statusopslag configureren in Azure Databricks voor meer informatie.

Metrisch Beschrijving
customMetrics.rocksdbBytesCopied Het aantal bytes dat is gekopieerd als bijgehouden door RocksDB File Manager.
customMetrics.rocksdbCommitCheckpointLatency De tijd in milliseconden die een momentopname maken van systeemeigen RocksDB en deze naar een lokale map schrijven.
customMetrics.rocksdbCompactLatency De tijd in milliseconden comprimeren (optioneel) tijdens het doorvoeren van het controlepunt.
customMetrics.rocksdbCommitFileSyncLatencyMs De tijd in milliseconden die de systeemeigen RocksDB-momentopname synchroniseert met externe opslag (de locatie van het controlepunt).
customMetrics.rocksdbCommitFlushLatency De tijd in milliseconden die de RocksDB-in-memory wijzigingen in het geheugen leegmaken aan de lokale schijf.
customMetrics.rocksdbCommitPauseLatency De tijd in milliseconden stopt de achtergrondwerkrolthreads als onderdeel van de controlepuntdoorvoering, zoals voor compressie.
customMetrics.rocksdbCommitWriteBatchLatency De tijd in milliseconden die de gefaseerde schrijfbewerkingen in de geheugenstructuur (WriteBatch) toepassen op systeemeigen RocksDB.
customMetrics.rocksdbFilesCopied Het aantal bestanden dat is gekopieerd als bijgehouden door RocksDB File Manager.
customMetrics.rocksdbFilesReused Het aantal bestanden dat opnieuw wordt gebruikt als bijgehouden door RocksDB File Manager.
customMetrics.rocksdbGetCount Het aantal get aanroepen naar de database (bevat gets WriteBatch geen batch in het geheugen die wordt gebruikt voor faseringsschrijfbewerkingen).
customMetrics.rocksdbGetLatency De gemiddelde tijd in nanoseconden voor de onderliggende systeemeigen RocksDB::Get aanroep.
customMetrics.rocksdbReadBlockCacheHitCount Het aantal cachetreffers uit de blokcache in RocksDB die handig zijn bij het voorkomen van leesbewerkingen van lokale schijven.
customMetrics.rocksdbReadBlockCacheMissCount Het aantal blokcaches in RocksDB is niet handig bij het voorkomen van leesbewerkingen van lokale schijven.
customMetrics.rocksdbSstFileSize De grootte van alle SST-bestanden (Static Sorted Table) - de tabellaire structuur RocksDB gebruikt om gegevens op te slaan.
customMetrics.rocksdbTotalBytesRead Het aantal niet-gecomprimeerde bytes dat door get bewerkingen wordt gelezen.
customMetrics.rocksdbTotalBytesReadByCompaction Het aantal bytes dat door het compressieproces van de schijf wordt gelezen.
customMetrics.rocksdbTotalBytesReadThroughIterator Het totale aantal bytes aan niet-gecomprimeerde gegevens dat wordt gelezen met behulp van een iterator. Voor sommige stateful bewerkingen (bijvoorbeeld time-outverwerking in FlatMapGroupsWithState en watermerking) moeten gegevens in DB worden gelezen via een iterator.
customMetrics.rocksdbTotalBytesWritten Het totale aantal niet-gecomprimeerde bytes dat is geschreven door put bewerkingen.
customMetrics.rocksdbTotalBytesWrittenByCompaction Het totale aantal bytes dat door het compressieproces naar de schijf wordt geschreven.
customMetrics.rocksdbTotalCompactionLatencyMs De tijd in milliseconden voor RocksDB-compressies, inclusief achtergrondcompressies en de optionele compressie die tijdens het doorvoeren is geïnitieerd.
customMetrics.rocksdbTotalFlushLatencyMs De totale spoeltijd, inclusief het leegmaken van de achtergrond. Flush-bewerkingen zijn processen waarmee de MemTable leeggemaakte bewerkingen naar de opslag worden gespoeld zodra deze vol is. MemTables zijn het eerste niveau waar gegevens worden opgeslagen in RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed De grootte in bytes van de niet-gecomprimeerde zip-bestanden, zoals gerapporteerd door Bestandsbeheer. Bestandsbeheer beheert het gebruik en de verwijdering van de fysieke SST-bestandsschijfruimte.

bronobject (Kafka)

Metrisch Beschrijving
sources.description Een gedetailleerde beschrijving van de Kafka-bron, waarin het exacte Kafka-onderwerp wordt opgegeven waaruit wordt gelezen. Voorbeeld: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset object Het beginverschilnummer in het Kafka-onderwerp waarop de streamingtaak is gestart.
sources.endOffset object De laatste offset verwerkt door de microbatch. Dit kan gelijk zijn aan latestOffset een doorlopende uitvoering van microbatch.
sources.latestOffset object De meest recente offset van de microbatch. Het microbatchingproces verwerkt mogelijk niet alle offsets wanneer er beperking is, wat resulteert in endOffset en latestOffset differentiëren.
sources.numInputRows Het aantal invoerrijen dat uit deze bron is verwerkt.
sources.inputRowsPerSecond De snelheid waarmee gegevens binnenkomen voor verwerking vanuit deze bron.
sources.processedRowsPerSecond De snelheid waarmee Spark gegevens van deze bron verwerkt.

sources.metrics-object (Kafka)

Metrisch Beschrijving
sources.metrics.avgOffsetsBehindLatest Het gemiddelde aantal offsets dat de streamingquery achter de meest recente beschikbare offset ligt voor alle geabonneerde onderwerpen.
sources.metrics.estimatedTotalBytesBehindLatest Het geschatte aantal bytes dat het queryproces niet heeft verbruikt uit de geabonneerde onderwerpen.
sources.metrics.maxOffsetsBehindLatest Het maximum aantal offsets dat de streamingquery zich achter de meest recente beschikbare offset bevindt voor alle geabonneerde onderwerpen.
sources.metrics.minOffsetsBehindLatest Het minimale aantal offsets dat de streamingquery zich achter de meest recente beschikbare offset bevindt voor alle geabonneerde onderwerpen.

sink-object (Kafka)

Metrisch Beschrijving
sink.description De beschrijving van de Kafka-sink waarnaar de streamingquery schrijft, waarin de specifieke Kafka-sink-implementatie wordt beschreven die wordt gebruikt. Voorbeeld: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Het aantal rijen dat als onderdeel van de microbatch naar de uitvoertabel of sink is geschreven. In sommige situaties kan deze waarde '-1' zijn en over het algemeen worden geïnterpreteerd als 'onbekend'.

bronobject (Delta Lake)

Metrisch Beschrijving
sources.description De beschrijving van de bron waaruit de streamingquery leest. Voorbeeld: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion De versie van serialisatie waarmee deze offset wordt gecodeerd.
sources.[startOffset/endOffset].reservoirId De id van de tabel die wordt gelezen. Dit wordt gebruikt om onjuiste configuratie te detecteren bij het opnieuw opstarten van een query.
sources.[startOffset/endOffset].reservoirVersion De versie van de tabel die momenteel wordt verwerkt.
sources.[startOffset/endOffset].index De index in de volgorde van AddFiles in deze versie. Dit wordt gebruikt om grote doorvoeringen in meerdere batches op te splitsen. Deze index wordt gemaakt door te sorteren op modificationTimestamp en path.
sources.[startOffset/endOffset].isStartingVersion Hiermee wordt aangegeven of de huidige offset het begin van een nieuwe streamingquery markeert in plaats van de verwerking van wijzigingen die zijn opgetreden nadat de initiële gegevens zijn verwerkt. Wanneer u een nieuwe query start, worden alle gegevens die aanwezig zijn in de tabel aan het begin eerst verwerkt en vervolgens alle nieuwe gegevens die binnenkomen.
sources.latestOffset De meest recente offset die wordt verwerkt door de microbatch-query.
sources.numInputRows Het aantal invoerrijen dat uit deze bron is verwerkt.
sources.inputRowsPerSecond De snelheid waarmee gegevens binnenkomen voor verwerking vanuit deze bron.
sources.processedRowsPerSecond De snelheid waarmee Spark gegevens van deze bron verwerkt.
sources.metrics.numBytesOutstanding De gecombineerde grootte van de openstaande bestanden (bestanden bijgehouden door RocksDB). Dit is de metrische achterstand voor Delta en Auto Loader als streamingbron.
sources.metrics.numFilesOutstanding Het aantal openstaande bestanden dat moet worden verwerkt. Dit is de metrische achterstand voor Delta en Auto Loader als streamingbron.

sink-object (Delta Lake)

Metrisch Beschrijving
sink.description De beschrijving van de Delta-sink, met informatie over de specifieke Implementatie van de Delta-sink die wordt gebruikt. Voorbeeld: “DeltaSink[table]”.
sink.numOutputRows Het aantal rijen is altijd -1, omdat Spark geen uitvoerrijen voor DSv1-sinks kan afleiden. Dit is de classificatie voor de Delta Lake-sink.

Voorbeelden

Voorbeeld van kafka-naar-Kafka StreamingQueryListener-gebeurtenis

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Voorbeeld van een Delta Lake-naar-Delta Lake StreamingQueryListener-gebeurtenis

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Voorbeeld van de gebeurtenis Kinesis-to-Delta Lake StreamingQueryListener

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Voorbeeld van kafka+Delta Lake-to-Delta Lake StreamingQueryListener-gebeurtenis

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Voorbeeld van frequentiebron naar Delta Lake StreamingQueryListener-gebeurtenis

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}