监视 Azure Databricks 上的结构化流式处理查询

Azure Databricks 通过“流式处理”选项卡下的 Spark UI 提供对结构化流式处理应用程序的内置监视。

区分 Spark UI 中的结构化流式处理查询

通过将 .queryName(<query-name>) 添加到 writeStream 代码为流提供唯一的查询名称,以便在 Spark UI 中轻松区分哪些指标属于哪个流。

将结构化流式处理指标推送到外部服务

可以使用 Apache Spark 的“流式处理查询侦听器”界面将流式处理指标推送到外部服务,以用于警报或仪表板用例。 在 Databricks Runtime 11.3 LTS 及更高版本中,流式处理查询侦听器在 Python 和 Scala 中可用。

重要

不能在 StreamingQueryListener 逻辑中使用 Unity Catalog 管理的凭据和对象。

注意

与侦听器关联的处理延迟可能会对查询处理产生负面影响。 Databricks 建议尽量减少这些侦听器中的处理逻辑,并将内容写入到 Kafka 之类的低延迟接收器。

以下代码提供了用于实现侦听器的语法的基本示例:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

在结构化流式处理中定义可观察指标

可观察指标是可以在查询(数据帧)中定义的命名任意聚合函数。 在数据帧的执行达到完成点(即,完成批处理查询或达到流式处理循环)后,会发出一个命名事件,其中包含自上一个完成点以来处理的数据的指标。

可以通过将侦听器附加到 Spark 会话来观察这些指标。 侦听器取决于执行模式:

  • 批处理模式:使用 QueryExecutionListener

    查询完成时调用 QueryExecutionListener。 使用 QueryExecution.observedMetrics 映射访问指标。

  • 流式处理或微批处理:使用 StreamingQueryListener

    流式处理查询完成某个循环时调用 StreamingQueryListener。 使用 StreamingQueryProgress.observedMetrics 映射访问指标。 Azure Databricks 不支持连续执行流式处理。

例如:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener 对象指标

指标 说明
id 在重启时保留的唯一查询 ID。 请参阅 StreamingQuery.id()
runId 每次启动或重启的唯一查询 ID。 请参阅 StreamingQuery.runId()
name 用户指定的查询名称。 如果未指定,则为 null。
timestamp 执行微批的时间戳。
batchId 当前正在处理的批数据的唯一 ID。 请注意,如果失败后重试,可多次执行给定批 ID。 同样,如果没有要处理的数据,则批 ID 不会递增。
numInputRows 触发器中处理的记录的(跨所有源)聚合数。
inputRowsPerSecond 到达数据的(跨所有源)聚合速率。
processedRowsPerSecond Spark 处理数据时的(跨所有源)聚合速率。

durationMs 对象

有关完成微批执行过程的各个阶段所所用的时间的信息。

指标 说明
durationMs.addBatch 执行微批所用的时间。 此时间不包括 Spark 规划微批所用的时间。
durationMs.getBatch 检索与源偏移量有关的元数据所用的时间。
durationMs.latestOffset 微批消耗的最新偏移量。 此进度对象是指从源检索最新偏移量所花费的时间。
durationMs.queryPlanning 生成执行计划所用的时间。
durationMs.triggerExecution 计划和执行微批所用的时间。
durationMs.walCommit 提交新的可用偏移量所用的时间。

eventTime 对象

有关在微批中处理的数据中看到的事件时间值的信息。 水印使用此数据来确定如何剪裁状态以处理结构化流式处理作业中定义的有状态聚合。

指标 说明
eventTime.avg 触发器中看到的事件平均时间。
eventTime.max 触发器中看到的事件最长时间。
eventTime.min 触发器中看到的事件最短时间。
eventTime.watermark 触发器中使用的水印值。

stateOperators 对象

有关结构化流式处理作业中定义的有状态操作及其生成的聚合的信息。

指标 说明
stateOperators.operatorName 指标与之相关的有状态运算符的名称。 例如:symmetricHashJoindedupestateStoreSave
stateOperators.numRowsTotal 有状态运算符或聚合的结果状态中的行数。
stateOperators.numRowsUpdated 有状态运算符或聚合的结果状态中更新的行数。
stateOperators.numRowsRemoved 从有状态运算符或聚合的结果状态中删除的行数。
stateOperators.commitTimeMs 提交所有更新(放置和删除)并返回新版本所用的时间。
stateOperators.memoryUsedBytes 状态存储使用的内存。
stateOperators.numRowsDroppedByWatermark 被视为太晚而无法包含在有状态聚合中的行数。 仅流式处理聚合:在聚合后删除的行数,而不是原始输入行数。 该数字并不精确,但它可以指示正在删除延迟数据。
stateOperators.numShufflePartitions 此有状态运算符的随机分区数。
stateOperators.numStateStoreInstances 运算符已初始化和维护的实际状态存储实例。 在许多有状态运算符中,此数据与分区数相同,但流-流联接将初始化每个分区的四个状态存储实例。

stateOperators.customMetrics 对象

从 RocksDB 收集的信息,该源捕获的指标与它为结构化流式处理作业维护的状态值的性能和操作有关。 有关详细信息,请参阅在 Azure Databricks 上配置 RocksDB 状态存储

指标 说明
customMetrics.rocksdbBytesCopied RocksDB 文件管理器所跟踪的复制字节数。
customMetrics.rocksdbCommitCheckpointLatency 创建本机 RocksDB 快照并将其写入本地目录所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCompactLatency 在检查点提交期间进行压缩(可选)所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCommitFileSyncLatencyMs 将本机 RocksDB 快照相关的文件同步到外部存储(检查点位置)所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCommitFlushLatency 将 RocksDB 内存中更改刷新到本地磁盘所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCommitPauseLatency 在检查点提交过程中停止后台工作线程(例如,实现压缩等目的)所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCommitWriteBatchLatency 将内存中结构中的分阶段写入 (WriteBatch) 应用于本机 RocksDB 所花费的时间(以毫秒为单位)。
customMetrics.rocksdbFilesCopied RocksDB 文件管理器所跟踪的复制文件数。
customMetrics.rocksdbFilesReused RocksDB 文件管理器所跟踪的重用文件数。
customMetrics.rocksdbGetCount DB get 调用的数量(不包括从 WriteBatch 执行的 gets:用于分阶段写入的内存中批)。
customMetrics.rocksdbGetLatency 基础本机 RocksDB::Get 调用平均花费的时间(以纳秒为单位)。
customMetrics.rocksdbReadBlockCacheHitCount RocksDB 中的块缓存有多少是有用的或无用的,以及如果避免本地磁盘读取。
customMetrics.rocksdbReadBlockCacheMissCount RocksDB 中的块缓存有多少是有用的或无用的,以及如果避免本地磁盘读取。
customMetrics.rocksdbSstFileSize 所有 SST 文件的大小。 SST 代表静态排序表,它是 RocksDB 用于存储数据的表格结构。
customMetrics.rocksdbTotalBytesRead 通过 get 操作读取的未压缩字节数。
customMetrics.rocksdbTotalBytesReadByCompaction 压缩进程从磁盘读取的字节数。
customMetrics.rocksdbTotalBytesReadThroughIterator 某些有状态操作(例如 FlatMapGroupsWithState 中的超时处理和水印)需要通过迭代器读取 DB 中的数据。 此指标表示使用迭代器读取的未压缩数据的大小。
customMetrics.rocksdbTotalBytesWritten 通过 put 操作写入的未压缩字节数。
customMetrics.rocksdbTotalBytesWrittenByCompaction 压缩进程写入磁盘的字节数。
customMetrics.rocksdbTotalCompactionLatencyMs RocksDB 压缩(包括后台压缩和提交期间启动的可选压缩)所花费的时间(以毫秒为单位)。
customMetrics.rocksdbTotalFlushLatencyMs 刷新时间,包括后台刷新时间。 刷新操作是指 MemTable 在满后刷新到存储的过程。 MemTable 是 RocksDB 中存储数据的第一级。
customMetrics.rocksdbZipFileBytesUncompressed RocksDB 文件管理器用于管理物理 SST 文件磁盘空间的利用率和删除。 此指标表示文件管理器报告的未压缩 zip 文件(以字节为单位)。

源对象 (Kafka)

指标 说明
sources.description 流式处理查询从中读取的源的名称。 例如 “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”
(属于sources.startOffset 对象)的父级。 启动流式处理作业的 Kafka 主题内的起始偏移量。
(属于sources.endOffset 对象)的父级。 微批处理的最新偏移量。 对于正在进行的微批执行,这可能等于 latestOffset
(属于sources.latestOffset 对象)的父级。 微批计算出的最新偏移量。 如果存在限制,微批处理进程可能不会处理所有偏移量,这会导致 endOffsetlatestOffset 不相同。
sources.numInputRows 从此源处理的输入行数。
sources.inputRowsPerSecond 处理此源数据所达到的速率。
sources.processedRowsPerSecond Spark 处理此源数据的速率。

sources.metrics 对象 (Kafka)

指标 说明
sources.metrics.avgOffsetsBehindLatest 流式处理查询在所有订阅主题中最新可用偏移量之后使用的平均偏移量。
sources.metrics.estimatedTotalBytesBehindLatest 查询进程尚未从订阅主题使用的估计字节数。
sources.metrics.maxOffsetsBehindLatest 流式处理查询在所有订阅主题中最新可用偏移量之后使用的最大偏移量。
sources.metrics.minOffsetsBehindLatest 流式处理查询在所有订阅主题中最新可用偏移量之后使用的最小偏移量。

接收器对象 (Kafka)

指标 说明
sink.description 流式处理查询正在写入的接收器的名称。 例如 “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”
sink.numOutputRows 作为微批的一部分写入到输出表或接收器的行数。 在某些情况下,此值可以为“-1”,通常可解释为“未知”。

源对象 (Delta Lake)

指标 说明
sources.description 流式处理查询从中读取的源的名称。 例如 “DeltaSource[table]”
sources.[startOffset/endOffset].sourceVersion 对此偏移量进行编码所使用的序列化版本。
sources.[startOffset/endOffset].reservoirId 要从中读取的表的 ID。 此值用于在重启查询时检测错误配置。
sources.[startOffset/endOffset].reservoirVersion 当前正在处理的表的版本。
sources.[startOffset/endOffset].index 此版本中 AddFiles 序列中的索引。 此值用于将大型提交分解成多个批。 可通过对 modificationTimestamppath 进行排序来创建此索引。
sources.[startOffset/endOffset].isStartingVersion 此偏移量是否表示正在启动而不是正在处理更改的查询。 启动新查询时,将先处理表中显示的所有数据,然后再处理到达的新数据。
sources.latestOffset 微批查询处理的最新偏移量。
sources.numInputRows 从此源处理的输入行数。
sources.inputRowsPerSecond 处理此源数据所达到的速率。
sources.processedRowsPerSecond Spark 处理此源数据的速率。
sources.metrics.numBytesOutstanding 未完成文件(RocksDB 跟踪的文件)的总大小。 这是 Delta 和自动加载程序作为流式处理源的积压工作 (backlog) 指标。
sources.metrics.numFilesOutstanding 要处理的未完成文件数。 这是 Delta 和自动加载程序作为流式处理源的积压工作 (backlog) 指标。

接收器对象 (Delta Lake)

指标 说明
sink.description 流式处理查询写入的接收器的名称。 例如 “DeltaSink[table]”
sink.numOutputRows 该指标中的行数为“-1”,因为 Spark 无法推断 DSv1 接收器的输出行,这是 Delta Lake 接收器的分类。

示例

Kafka-to-Kafka StreamingQueryListener 事件示例

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Delta Lake-to-Delta Lake StreamingQueryListener 事件示例

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Delta Lake StreamingQueryListener 事件的速率源示例

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}