Monitoring Structured Streaming queries on Azure Databricks

Azure Databricks provides built-in monitoring for Structured Streaming applications through the Spark UI under the Streaming tab.

Distinguish Structured Streaming queries in the Spark UI

Provide your streams a unique query name by adding .queryName(<query-name>) to your writeStream code to easily distinguish which metrics belong to which stream in the Spark UI.

Push Structured Streaming metrics to external services

Streaming metrics can be pushed to external services for alerting or dashboarding use cases by using Apache Spark’s Streaming Query Listener interface. In Databricks Runtime 11.3 LTS and above, the Streaming Query Listener is available in Python and Scala.

Important

Credentials and objects managed by Unity Catalog cannot be used in StreamingQueryListener logic.

Note

Processing latency associated with listeners can adversely impact query processing. Databricks recommends minimizing processing logic in these listeners and writing to low latency sinks such as Kafka.

The following code provides basic examples of the syntax for implementing a listener:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Defining observable metrics in Structured Streaming

Observable metrics are named arbitrary aggregate functions that can be defined on a query (DataFrame). As soon as the execution of a DataFrame reaches a completion point (that is, finishes a batch query or reaches a streaming epoch), a named event is emitted that contains the metrics for the data processed since the last completion point.

You can observe these metrics by attaching a listener to the Spark session. The listener depends on the execution mode:

  • Batch mode: Use QueryExecutionListener.

    QueryExecutionListener is called when the query completes. Access the metrics using the QueryExecution.observedMetrics map.

  • Streaming, or micro-batch: Use StreamingQueryListener.

    StreamingQueryListener is called when the streaming query completes an epoch. Access the metrics using the StreamingQueryProgress.observedMetrics map. Azure Databricks does not support continuous execution streaming.

For example:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener object metrics

Metric Description
id Unique query ID that persists across restarts. See StreamingQuery.id().
runId Unique query ID for every start or restart. See StreamingQuery.runId().
name User-specified name of the query. Null if not specified.
timestamp Timestamp for the execution of the micro-batch.
batchId Unique ID for the current batch of data being processed. Note that in the case of retries after a failure, a given batch ID can be executed more than once. Similarly, when there is no data to be processed, the batch ID is not incremented.
numInputRows Aggregate (across all sources) number of records processed in a trigger.
inputRowsPerSecond Aggregate (across all sources) rate of arriving data.
processedRowsPerSecond Aggregate (across all sources) rate at which Spark is processing data.

durationMs object

Information about the time it takes to complete various stages of the micro-batch execution process.

Metric Description
durationMs.addBatch Time taken to execute the microbatch. This excludes the time Spark takes to plan the microbatch.
durationMs.getBatch Time it takes to retrieve the metadata about the offsets from the source.
durationMs.latestOffset Latest offset consumed for the microbatch. This progress object refers to the time taken to retrieve the latest offset from sources.
durationMs.queryPlanning Time taken to generate the execution plan.
durationMs.triggerExecution Time taken to plan and execute the microbatch.
durationMs.walCommit Time taken to commit the new available offsets.

eventTime object

Information about the event time value seen within the data being processed in the micro-batch. This data is used by the watermark to figure out how to trim the state for processing stateful aggregations defined in the Structured Streaming job.

Metric Description
eventTime.avg Average event time seen in the trigger.
eventTime.max Maximum event time seen in the trigger.
eventTime.min Minimum event time seen in the trigger.
eventTime.watermark Value of the watermark used in the trigger.

stateOperators object

Information about the stateful operations that are defined in the Structured Streaming job and the aggregations that are produced from them.

Metric Description
stateOperators.operatorName Name of the stateful operator that the metrics relate to. For example, symmetricHashJoin, dedupe, stateStoreSave.
stateOperators.numRowsTotal Number of rows in the state as a result of the stateful operator or aggregation.
stateOperators.numRowsUpdated Number of rows updated in the state as a result of the stateful operator or aggregation.
stateOperators.numRowsRemoved Number of rows removed from the state as a result of the stateful operator or aggregation.
stateOperators.commitTimeMs Time taken to commit all updates (puts and removes) and return a new version.
stateOperators.memoryUsedBytes Memory used by the state store.
stateOperators.numRowsDroppedByWatermark Number of rows that are considered too late to be included in the stateful aggregation. Streaming aggregations only: Number of rows dropped post-aggregation, and not raw input rows. The number is not precise, but it can indicate that late data is being dropped.
stateOperators.numShufflePartitions Number of shuffle partitions for this stateful operator.
stateOperators.numStateStoreInstances Actual state store instance that the operator has initialized and maintained. In many stateful operators, this is the same as the number of partitions, but stream-stream join initializes four state store instances per partition.

stateOperators.customMetrics object

Information collected from RocksDB that captures metrics about its performance and operations with respect to the stateful values it maintains for the Structured Streaming job. For more information, see Configure RocksDB state store on Azure Databricks.

Metric Description
customMetrics.rocksdbBytesCopied Number of bytes copied as tracked by the RocksDB File Manager.
customMetrics.rocksdbCommitCheckpointLatency Time in milliseconds to take a snapshot of native RocksDB and write it to a local directory.
customMetrics.rocksdbCompactLatency Time in milliseconds for compaction (optional) during the checkpoint commit.
customMetrics.rocksdbCommitFileSyncLatencyMs Time in milliseconds to sync the native RocksDB snapshot related files to an external storage (checkpoint location).
customMetrics.rocksdbCommitFlushLatency Time in milliseconds to flush the RocksDB in-memory changes to your local disk.
customMetrics.rocksdbCommitPauseLatency Time in milliseconds to stop the background worker threads (for example, for compaction) as part of the checkpoint commit.
customMetrics.rocksdbCommitWriteBatchLatency Time in milliseconds to apply the staged writes in in-memory structure (WriteBatch) to native RocksDB.
customMetrics.rocksdbFilesCopied Number of files copied as tracked by the RocksDB File Manager.
customMetrics.rocksdbFilesReused Number of files reused as tracked by the RocksDB File Manager.
customMetrics.rocksdbGetCount Number of get calls to the DB (This doesn’t include gets from WriteBatch: In-memory batch used for staging writes).
customMetrics.rocksdbGetLatency Average time in nanoseconds for the underlying native RocksDB::Get call.
customMetrics.rocksdbReadBlockCacheHitCount How much of the block cache in RocksDB is useful or not and avoiding local disk reads.
customMetrics.rocksdbReadBlockCacheMissCount How much of the block cache in RocksDB is useful or not and avoiding local disk reads.
customMetrics.rocksdbSstFileSize Size of all SST files. SST stands for Static Sorted Table, which is the tabular structure RocksDB uses to store data.
customMetrics.rocksdbTotalBytesRead Number of uncompressed bytes read by get operations.
customMetrics.rocksdbTotalBytesReadByCompaction Number of bytes that the compaction process reads from the disk.
customMetrics.rocksdbTotalBytesReadThroughIterator Some of the stateful operations (for example, timeout processing in FlatMapGroupsWithState and watermarking) require reading data in DB through an iterator. This metric represents the size of uncompressed data read using the iterator.
customMetrics.rocksdbTotalBytesWritten Number of uncompressed bytes written by put operations.
customMetrics.rocksdbTotalBytesWrittenByCompaction Number of bytes the compaction process writes to the disk.
customMetrics.rocksdbTotalCompactionLatencyMs Time milliseconds for RocksDB compactions, including background compactions and the optional compaction initiated during the commit.
customMetrics.rocksdbTotalFlushLatencyMs Flush time, including background flushing. Flush operations are processes by which the MemTable is flushed to storage once it’s full. MemTables are the first level where data is stored in RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed RocksDB File Manager manages the physical SST file disk space utilization and deletion. This metric represents the uncompressed zip files in bytes as reported by the File Manager.

sources object (Kafka)

Metric Description
sources.description Name of the source the streaming query is reading from. For example, “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset object Starting offset number within the Kafka topic that the streaming job started at.
sources.endOffset object Latest offset processed by the microbatch. This could be equal to latestOffset for an ongoing microbatch execution.
sources.latestOffset object Latest offset figured by the microbatch. When there is throttling, the micro-batching process might not process all offsets, causing endOffset and latestOffset to differ.
sources.numInputRows Number of input rows processed from this source.
sources.inputRowsPerSecond Rate at which data is arriving for processing for this source.
sources.processedRowsPerSecond Rate at which Spark is processing data for this source.

sources.metrics object (Kafka)

Metric Description
sources.metrics.avgOffsetsBehindLatest Average number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.
sources.metrics.estimatedTotalBytesBehindLatest Estimated number of bytes that the query process has not consumed from the subscribed topics.
sources.metrics.maxOffsetsBehindLatest Maximum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.
sources.metrics.minOffsetsBehindLatest Minimum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.

sink object (Kafka)

Metric Description
sink.description Name of the sink the streaming query is writing to. For example, “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Number of rows that were written to the output table or sink as part of the microbatch. For some situations, this value can be “-1” and generally can be interpreted as “unknown”.

sources object (Delta Lake)

Metric Description
sources.description Name of the source the streaming query is reading from. For example, “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion Version of serialization that this offset is encoded with.
sources.[startOffset/endOffset].reservoirId ID of the table you are reading from. This is used to detect misconfiguration when restarting a query.
sources.[startOffset/endOffset].reservoirVersion Version of the table that you are currently processing.
sources.[startOffset/endOffset].index Index in the sequence of AddFiles in this version. This is used to break large commits into multiple batches. This index is created by sorting on modificationTimestamp and path.
sources.[startOffset/endOffset].isStartingVersion Whether this offset denotes a query that is starting rather than processing changes. When starting a new query, all data present in the table at the start is processed, and then new data that has arrived.
sources.latestOffset Latest offset processed by the microbatch query.
sources.numInputRows Number of input rows processed from this source.
sources.inputRowsPerSecond Rate at which data is arriving for processing for this source.
sources.processedRowsPerSecond Rate at which Spark is processing data for this source.
sources.metrics.numBytesOutstanding Size of the outstanding files (files tracked by RocksDB) combined. This is the backlog metric for Delta and Auto Loader as the streaming source.
sources.metrics.numFilesOutstanding Number of outstanding files to be processed. This is the backlog metric for Delta and Auto Loader as the streaming source.

sink object (Delta Lake)

Metric Description
sink.description Name of the sink that the streaming query writes to. For example, “DeltaSink[table]”.
sink.numOutputRows Number of rows in this metric is “-1” because Spark can’t infer output rows for DSv1 sinks, which is the classification for the Delta Lake sink.

Examples

Example Kafka-to-Kafka StreamingQueryListener event

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Example Delta Lake-to-Delta Lake StreamingQueryListener event

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Example rate source to Delta Lake StreamingQueryListener event

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}