共用方式為


監視 Azure Databricks 上的結構化串流查詢

Azure Databricks 透過 [串流] 索引卷標下的 Spark UI,為結構化串流 應用程式提供內建監視。

區分 Spark UI 中的結構化串流查詢

藉由在程式代碼中新增 .queryName(<query-name>),為您的數據流提供唯一的查詢名稱,以便在 Spark UI 中輕鬆地區分哪些度量屬於哪個數據流。

將結構化串流計量推送至外部服務

串流指標可以透過使用 Apache Spark 的串流查詢偵聽器介面推送至外部服務,供警報或儀表板使用情境使用。 在 Databricks Runtime 11.3 LTS 和更新版本中,StreamingQueryListener 可在 Python 和 Scala 中使用。

重要

下列限制適用於使用已啟用 Unity 目錄的計算存取模式的工作負載:

  • StreamingQueryListener 需要 Databricks Runtime 15.1 或更高版本才能使用憑證,或在專用存取模式下與 Unity Catalog 管理的物件互動。
  • StreamingQueryListener 針對以標準存取模式(先前稱為共用存取模式)設定的 Scala 工作負載需要 Databricks Runtime 16.1 或更新版本。

注意

使用接聽程式處理延遲可能會大幅影響查詢處理速度。 建議您限制這些接聽程式中的處理邏輯,並選擇寫入 Kafka 等快速響應系統以提高效率。

下列程式代碼提供實作接聽程式之語法的基本範例:

程式語言 Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

在結構化串流中定義可觀察的計量

可觀察的計量會命名為可在查詢上定義的任意聚合函數(DataFrame)。 一旦 DataFrame 的執行到達一個完成點(例如完成一次批次查詢或達到一個串流 epoch),系統就會發出一個具名事件,其中包含自上次完成點以來所處理資料的相關指標。

您可以將監聽器附加至Spark工作階段,以觀察這些指標。 接聽程式取決於執行模式:

  • 批次模式:使用 QueryExecutionListener

    QueryExecutionListener 會在查詢完成時呼叫。 使用 QueryExecution.observedMetrics 地圖存取計量。

  • 串流或微批次:使用 StreamingQueryListener

    當串流查詢完成一個時間周期時,會呼叫 StreamingQueryListener。 使用 StreamingQueryProgress.observedMetrics 地圖存取計量。 Azure Databricks 不支援串流 continuous 觸發模式。

例如:

程式語言 Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

對應 Unity 目錄、Delta Lake 和結構化串流計量數據表標識碼

結構化串流計量會針對作為串流查詢來源之 Delta 數據表的唯一身分識別,使用 reservoirId 數個位置的欄位。

reservoirId 欄位會映射 Delta 交易日誌中 Delta 資料表所儲存的唯一識別碼。 此 ID 無法 對應至 Unity Catalog 所指派的 tableId 值,並顯示於 Catalog Explorer 中。

使用下列語法來檢閱 Delta 數據表的數據表標識碼。 這適用於 Unity Catalog 受控數據表、Unity Catalog 外部數據表,以及所有 Hive Metastore Delta 資料表:

DESCRIBE DETAIL <table-name>

結果中顯示的 id 字段是對應至串流計量中 reservoirId 的標識碼。

StreamingQueryListener 物件計量

領域 描述
id 在多次重新啟動中持續保存的唯一查詢識別碼。
runId 查詢 ID 在每次啟動/重新啟動時都是唯一的。 請參閱 StreamingQuery.runId()。
name 使用者指定的查詢名稱。 如果未指定名稱,則 Name 為 null。
timestamp microbatch 執行的時間戳。
batchId 目前正在處理之數據批次的唯一標識符。 在失敗后重試的情況下,可能會多次執行指定的批次標識碼。 同樣地,如果沒有要處理的數據,批次標識符就不會遞增。
batchDuration 批次作業的處理持續時間,以毫秒為單位。
numInputRows 在觸發程式中處理的記錄總數(跨所有來源)。
inputRowsPerSecond 抵達數據的匯總(跨所有來源)速率。
processedRowsPerSecond Spark 正在處理數據的匯總 (跨所有來源) 速率。

StreamingQueryListener 也會定義下列欄位,其中包含您可以檢查客戶計量和來源進度詳細資料的物件:

領域 描述
durationMs 類型:ju.Map[String, JLong]。 請參閱 durationMs 物件
eventTime 類型:ju.Map[String, String]。 請參閱 eventTime 物件
stateOperators 類型:Array[StateOperatorProgress]。 請參閱 stateOperators 物件
sources 類型:Array[SourceProgress]。 請參閱 sources 物件
sink 類型:SinkProgress。 參見 匯入物件
observedMetrics 類型:ju.Map[String, Row]。 可在 DataFrame/查詢上定義的具名任意聚合函數(例如 df.observe)。

durationMs 物件

物件類型ju.Map[String, JLong]

完成微批次執行程式各個階段所花費時間的相關信息。

領域 描述
durationMs.addBatch 執行 microbatch 所需的時間。 這不包括Spark規劃 microbatch 所需的時間。
durationMs.getBatch 從來源擷取有關位移的元數據所花費的時間。
durationMs.latestOffset microbatch 所耗用的最新位移。 此進度物件是指從來源擷取最新位移所花費的時間。
durationMs.queryPlanning 產生執行計劃所花費的時間。
durationMs.triggerExecution 規劃和執行 microbatch 所需的時間。
durationMs.walCommit 提交新可用位移所需的時間。
durationMs.commitBatch addBatch 期間所需的時間用來提交寫入至匯入端的數據。 僅適用於支援提交的匯流器。
durationMs.commitOffsets 提交批次至提交日誌所需的時間。

eventTime 物件

物件類型ju.Map[String, String]

有關在 microbatch 中處理之數據內所見事件時間值的資訊。 水印會使用此資料來確定如何修剪狀態,從而處理結構化串流作業中定義的具狀態聚合。

領域 描述
eventTime.avg 在該觸發程式中看到的平均事件時間。
eventTime.max 該觸發事件中觀察到的最大事件時間。
eventTime.min 在該觸發器中看到的最小事件時間。
eventTime.watermark 該觸發程式中使用的浮浮水印值。

stateOperators 物件

物件類型Array[StateOperatorProgress] 物件 stateOperators 包含結構化串流作業中定義之具狀態作業的相關信息,以及從中產生的匯總。

如需串流狀態運算符的詳細資訊,請參閱 什麼是具狀態串流?

領域 描述
stateOperators.operatorName 計量關聯之具狀態運算子的名稱,例如 symmetricHashJoindedupestateStoreSave
stateOperators.numRowsTotal 狀態中的行總數,是由具狀態的運算符或匯總操作產生的。
stateOperators.numRowsUpdated 狀態中因具狀態運算符或匯總而更新的數據列總數。
stateOperators.allUpdatesTimeMs 此計量目前無法由Spark測量,並計劃在未來的更新中移除。
stateOperators.numRowsRemoved 因狀態運算符或聚合而從狀態中移除的總行數。
stateOperators.allRemovalsTimeMs 此計量目前無法由Spark測量,並計劃在未來的更新中移除。
stateOperators.commitTimeMs 提交所有更新(新增和移除)所需的時間,並返回新版本。
stateOperators.memoryUsedBytes 狀態存放區所使用的記憶體。
stateOperators.numRowsDroppedByWatermark 被視為太晚而無法包含在具狀態匯總中的數據列數目。 僅串流匯總:匯總後被卸除的行數(而非原始輸入行數)。 這個數字不精確,但它提供指示顯示有延遲的數據被捨棄。
stateOperators.numShufflePartitions 狀態運算子的洗牌分割區數量。
stateOperators.numStateStoreInstances 運算子已初始化與維護的狀態存儲實例。 對於許多具狀態運算符,這與分割區數目相同。 不過,數據流串流聯結會為每個分割區初始化四個狀態存放區。
stateOperators.customMetrics 如需詳細資訊,請參閱本主題中的 stateOperators.customMetrics

StateOperatorProgress.customMetrics 自定義度量物件

物件類型ju.Map[String, JLong]

StateOperatorProgress 具有一個欄位 customMetrics,其中包含你在收集這些計量時所使用功能的專屬計量。

特徵 / 功能 描述
RocksDB 狀態存放區 RocksDB 狀態存放區的計量
HDFS 狀態存放區 HDFS 狀態存放區的計量
串流去重 列去重的指標
串流匯總 行聚合的指標
串流聯結運算符 流聯結運算子的度量指標
transformWithState 運算子的 transformWithState 指標

RocksDB 狀態存放區自定義計量

從 RocksDB 收集的資訊中,捕捉其效能和操作的計量,這些計量與其為結構化串流工作維護的具狀態值相關。 如需詳細資訊,請參閱 在 Azure Databricks 上設定 RocksDB 狀態存放區。

領域 描述
customMetrics.rocksdbBytesCopied RocksDB 檔案管理員所追蹤的位元組數量。
customMetrics.rocksdbCommitCheckpointLatency 擷取原生 RocksDB 快照和寫入本機目錄的時間,以毫秒計算。
customMetrics.rocksdbCompactLatency 在檢查點提交期間,合併的時間(以毫秒為單位,選擇性)。
customMetrics.rocksdbCommitCompactLatency 提交期間的壓縮時間,以毫秒為單位。
customMetrics.rocksdbCommitFileSyncLatencyMs 將原生 RocksDB 快照集同步至外部記憶體的時間以毫秒為單位(檢查點位置)。
customMetrics.rocksdbCommitFlushLatency 排清 RocksDB 記憶體內部變更至本機磁碟的時間,以毫秒為單位。
customMetrics.rocksdbCommitPauseLatency 停止背景工作線程作為檢查點提交一部分所需的時間(以毫秒計),例如用於壓縮的時間。
customMetrics.rocksdbCommitWriteBatchLatency 將分段WriteBatch寫入套用至原生 RocksDB 的毫秒數。
customMetrics.rocksdbFilesCopied RocksDB 檔案管理員所追蹤的複製的檔案數量。
customMetrics.rocksdbFilesReused RocksDB 檔案管理員所追蹤的重複使用檔案數目。
customMetrics.rocksdbGetCount 呼叫的數量 get(不包括來自 getsWriteBatch,用於暫存寫入的記憶體內部批次)。
customMetrics.rocksdbGetLatency 基礎原生 RocksDB::Get 呼叫之 nanoseconds 的平均時間。
customMetrics.rocksdbReadBlockCacheHitCount 來自 RocksDB 中區塊快取的快取叫用計數。
customMetrics.rocksdbReadBlockCacheMissCount RocksDB 中的區塊快取遺漏計數。
customMetrics.rocksdbSstFileSize RocksDB 實體中所有靜態排序數據表 (SST) 檔案的大小。
customMetrics.rocksdbTotalBytesRead get 操作讀取的未壓縮位元組數目。
customMetrics.rocksdbTotalBytesWritten 作業寫入 put 的未壓縮位元組總數。
customMetrics.rocksdbTotalBytesReadThroughIterator 使用反覆運算器讀取未壓縮數據的位元組總數。 某些有狀態的操作(例如 FlatMapGroupsWithState 中的逾時處理和浮水印)的讀取需要通過迭代器將數據讀入 Azure Databricks。
customMetrics.rocksdbTotalBytesReadByCompaction 壓縮程式從磁碟讀取的位元組數目。
customMetrics.rocksdbTotalBytesWrittenByCompaction 壓縮程式寫入磁碟的位元組總數。
customMetrics.rocksdbTotalCompactionLatencyMs RocksDB 壓縮的時間以毫秒為單位,包括背景壓縮和認可期間起始的選擇性壓縮。
customMetrics.rocksdbTotalFlushLatencyMs 排清時間總計,包括背景排清。 排清作業是將 MemTable 內儲存空間滿載時進行排清至儲存裝置的過程。 MemTables 是數據儲存在 RocksDB 的第一個層級。
customMetrics.rocksdbZipFileBytesUncompressed 檔案管理員所報告的未壓縮 zip 檔案大小,以位元組為單位。 檔案管理員會管理實體 SST 檔案磁碟空間使用率和刪除。
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> RocksDB 快照集的最新版本已儲存至檢查點位置。 值為 “-1” 表示從未儲存過快照集。 由於快照集是每個狀態存放區實例特有的,因此此度量會套用至特定的分割區 ID 和狀態存放區名稱。
customMetrics.rocksdbPutLatency 總發送呼叫延遲。
customMetrics.rocksdbPutCount put 呼叫的數目。
customMetrics.rocksdbWriterStallLatencyMs 寫入器等候壓縮或排清完成的時間。
customMetrics.rocksdbTotalBytesWrittenByFlush 排清所寫入的總位元組數
customMetrics.rocksdbPinnedBlocksMemoryUsage 已釘選區塊的記憶體使用量
customMetrics.rocksdbNumInternalColFamiliesKeys 內部欄族的內部鍵數量
customMetrics.rocksdbNumExternalColumnFamilies 外部欄族的數量
customMetrics.rocksdbNumInternalColumnFamilies 內部欄族的數目

HDFS 狀態存儲自定義度量

有關 HDFS 狀態存放區提供者行為和作業的資訊。

領域 描述
customMetrics.stateOnCurrentVersionSizeBytes 僅限於目前版本中狀態的估計大小。
customMetrics.loadedMapCacheHitCount 在提供者中快取的狀態的快取命中計數。
customMetrics.loadedMapCacheMissCount 提供者中快取狀態的快取遺漏計數。
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> 特定狀態存放區實例上次上傳的快照集版本。

重複資料刪除自定義指標

收集重複資料刪除行為和作業的相關信息。

領域 描述
customMetrics.numDroppedDuplicateRows 刪除的重複數據列數目。
customMetrics.numRowsReadDuringEviction 狀態淘汰期間讀取的狀態行數。

匯總自定義計量

收集匯總行為和作業的相關信息。

領域 描述
customMetrics.numRowsReadDuringEviction 狀態淘汰期間讀取的狀態行數。

串流連接自定義指標

有關串流聯結行為和作業的資訊。

領域 描述
customMetrics.skippedNullValueCount 設定nullspark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled 時,略過的true值數目。

transformWithState 自定義計量

收集有關 (TWS) 行為和作業的資訊 transformWithState 。 如需transformWithState的詳細資訊,請參閱建置自定義具狀態應用程式

領域 描述
customMetrics.initialStateProcessingTimeMs 處理所有初始狀態所花費的毫秒數。
customMetrics.numValueStateVars 值狀態變數的數目。 也為 transformWithStateInPandas提供 。
customMetrics.numListStateVars 清單狀態變數的數目。 也為 transformWithStateInPandas提供 。
customMetrics.numMapStateVars 地圖狀態變數的數目。 也為 transformWithStateInPandas提供 。
customMetrics.numDeletedStateVars 已刪除的狀態變數數目。 也為 transformWithStateInPandas提供 。
customMetrics.timerProcessingTimeMs 處理所有定時器所花費的毫秒數
customMetrics.numRegisteredTimers 已註冊的定時器數目。 也為 transformWithStateInPandas提供 。
customMetrics.numDeletedTimers 已刪除的定時器數目。 也為 transformWithStateInPandas提供 。
customMetrics.numExpiredTimers 過期定時器的數目。 也為 transformWithStateInPandas提供 。
customMetrics.numValueStateWithTTLVars 具有 TTL 的值狀態變數數目。 也為 transformWithStateInPandas提供 。
customMetrics.numListStateWithTTLVars 具有TTL的清單狀態變數數目。 也為 transformWithStateInPandas提供 。
customMetrics.numMapStateWithTTLVars 具有 TTL 的對應狀態變數數目。 也為 transformWithStateInPandas提供 。
customMetrics.numValuesRemovedDueToTTLExpiry 由於 TTL 到期而移除的值數目。 也為 transformWithStateInPandas提供 。
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry 因 TTL 到期而累加移除的值數目。

來源物件

物件類型Array[SourceProgress]

物件 sources 包含串流數據源的信息和計量。

領域 描述
description 串流數據源數據表的詳細描述。
startOffset 串流作業啟動所在的數據源數據表內的起始位移號碼。
endOffset microbatch 處理的最後一個偏移量。
latestOffset microbatch 所處理的最新位移。
numInputRows 從這個來源處理的輸入數據列數目。
inputRowsPerSecond 速率,以秒為單位,數據會從這個來源抵達以進行處理。
processedRowsPerSecond Spark 正在處理此來源數據的速率。
metrics 類型:ju.Map[String, String]。 包含特定數據源的自定義計量。

Azure Databricks 提供下列來源對象實作:

注意

請將表單 sources.<startOffset / endOffset / latestOffset>.* 中定義的欄位(或某些變形)解釋為最多 3 個可能的欄位,這些欄位都包含所指示的子欄位:

  • sources.startOffset.<child-field>
  • sources.endOffset.<child-field>
  • sources.latestOffset.<child-field>

Delta Lake sources 物件

用於 Delta 表串流數據來源的自定義指標的定義。

領域 描述
sources.description 串流查詢從中讀取的來源描述。 例如: “DeltaSource[table]”
sources.<startOffset / endOffset>.sourceVersion 該位移所使用編碼的序列化版本。
sources.<startOffset / endOffset>.reservoirId 正在讀取之數據表的標識碼。 這會用來偵測重新啟動查詢時設定錯誤。 請參閱 Unity Catalog、Delta Lake 和 Structured Streaming 的指標資料表標識符
sources.<startOffset / endOffset>.reservoirVersion 目前正在處理的數據表版本。
sources.<startOffset / endOffset>.index 此版本中序列中的索引 AddFiles 。 這是用於將大型提交分成多個批次。 此索引是根據 modificationTimestamppath 來排序建立。
sources.<startOffset / endOffset>.isStartingVersion 識別當前位移是否代表新串流查詢的開始,而不是初始數據處理後變更的處理。 啟動新的查詢時,會先處理數據表中的所有數據,然後再處理任何抵達的新數據。
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis 用來排列順序的事件時間被記錄下來。 待處理之初始快照資料的事件時間。 處理初始快照集時,按事件時間順序進行。
sources.latestOffset 微批次查詢所處理的最新偏移量。
sources.numInputRows 從這個來源處理的輸入數據列數目。
sources.inputRowsPerSecond 數據從這個來源抵達用於處理的速率。
sources.processedRowsPerSecond Spark 正在處理此來源數據的速率。
sources.metrics.numBytesOutstanding 待處理檔案的合併大小(由 RocksDB 追蹤的檔案)。 這是 Delta 和 Auto Loader 作為串流來源的積壓指標。
sources.metrics.numFilesOutstanding 要處理的未處理檔案數目。 這是 Delta 和 Auto Loader 作為串流來源的積壓指標。

Apache Kafka 來源物件

用於 Apache Kafka 串流數據源之自定義計量的定義。

領域 描述
sources.description Kafka 來源的詳細描述,指定要從中讀取的確切 Kafka 主題。 例如: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”
sources.startOffset Kafka 主題中啟動串流作業的起始位移號碼。
sources.endOffset microbatch 處理的最後一個偏移量。 這可能等於 latestOffset 進行中的微批次執行。
sources.latestOffset 由 microbatch 所計算的最新位移。 有節流時,微批次程式可能不會處理所有位移,這會導致 endOffsetlatestOffset 差異。
sources.numInputRows 從這個來源處理的輸入數據列數目。
sources.inputRowsPerSecond 數據從這個來源抵達用於處理的速率。
sources.processedRowsPerSecond Spark 正在處理此來源數據的速率。
sources.metrics.avgOffsetsBehindLatest 串流查詢的平均位移數是所有已訂閱主題中最新的可用位移後方。
sources.metrics.estimatedTotalBytesBehindLatest 查詢程序未從已訂閱的主題中取用的估計位元組數量。
sources.metrics.maxOffsetsBehindLatest 串流查詢在所有已訂閱主題中相較於最新可用偏移量所落後的最大偏移量數目。
sources.metrics.minOffsetsBehindLatest 串流查詢在所有訂閱主題中相較於最新可用位移的最小落後位移數。

自動載入器來源計量

自動載入器串流數據來源所使用的自訂計量定義。

領域 描述
sources.<startOffset / endOffset / latestOffset>.seqNum 正在依據發現檔案的順序,處理檔案時的當前位置。
sources.<startOffset / endOffset / latestOffset>.sourceVersion cloudFiles 來源的實作版本。
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs 最近回填作業的開始時間。
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs 最近回填作業的結束時間。
sources.<startOffset / endOffset / latestOffset>.lastInputPath 在重新啟動數據流之前,數據流的最後一個使用者提供輸入路徑。
sources.metrics.numFilesOutstanding 待辦專案中的檔案數目
sources.metrics.numBytesOutstanding 待處理項目中檔案的大小(位元元組)
sources.metrics.approximateQueueSize 訊息佇列的近似大小。 只有在已啟用 cloudFiles.useNotifications 選項時。
sources.numInputRows 從這個來源處理的輸入數據列數目。 對於binaryFile來源格式,numInputRows等於檔案的數量。

PubSub 來源指標

用於 PubSub 串流數據來源之自訂計量的定義。 如需監視 PubSub 串流來源的詳細資訊,請參閱 監視串流計量

領域 描述
sources.<startOffset / endOffset / latestOffset>.sourceVersion 此位移編碼的實作版本。
sources.<startOffset / endOffset / latestOffset>.seqNum 正在處理的持久化序號。
sources.<startOffset / endOffset / latestOffset>.fetchEpoch 正在處理的最大擷取時代。
sources.metrics.numRecordsReadyToProcess 目前待辦專案中可用於處理的記錄數目。
sources.metrics.sizeOfRecordsReadyToProcess 目前積壓未處理數據的位元組總大小。
sources.metrics.numDuplicatesSinceStreamStart 串流自啟動後所處理之重複記錄的總計數。

Pulsar 來源度量指標

用於 Pulsar 串流數據源之自定義計量的定義。

領域 描述
sources.metrics.numInputRows 目前微批次中處理的行數量。
sources.metrics.numInputBytes 目前微批次中處理的位元組總數。

接收器物件

物件類型SinkProgress

領域 描述
sink.description 接收器的描述,詳細說明所使用的特定接收器實現。
sink.numOutputRows 輸出數據列的數目。 不同的匯入點類型可能會對值有不同的行為或限制。 請參閱特定的支持類型
sink.metrics ju.Map[String, String] 的數據接收器計量。

目前,Azure Databricks 提供兩個特定的 sink 對象實作:

匯入器類型 詳細資訊
Delta 表 請參閱 Delta sink 物件
Apache Kafka 主題 請參閱 Kafka 的匯入物件

欄位 sink.metrics 對於物件的這兩個變體 sink 的行為相同。

Delta Lake 匯入物件

領域 描述
sink.description Delta 接收的描述,詳細說明所使用的特定 Delta 接收實作。 例如: “DeltaSink[table]”
sink.numOutputRows 行的數量永遠都是-1,因為 Spark 無法推斷 DSv1 接收器的輸出行,而這是 Delta Lake 接收器的分類。

Apache Kafka 匯出器物件

領域 描述
sink.description 串流查詢正在寫入的 Kafka 匯入的說明,詳細說明所使用的特定 Kafka 匯入的實施。 例如: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”
sink.numOutputRows 寫入輸出表或匯入作為微批次一部分的行數。 在某些情況下,這個值可以是 “-1”,通常可以解譯為「未知」。

範例

範例 Kafka-to-Kafka 的 StreamingQueryListener 事件

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1370,
      "SnapshotLastUploaded.partition_1_default" : 1370,
      "SnapshotLastUploaded.partition_2_default" : 1362,
      "SnapshotLastUploaded.partition_3_default" : 1370,
      "SnapshotLastUploaded.partition_4_default" : 1356,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1360,
      "SnapshotLastUploaded.partition_1_default" : 1360,
      "SnapshotLastUploaded.partition_2_default" : 1352,
      "SnapshotLastUploaded.partition_3_default" : 1360,
      "SnapshotLastUploaded.partition_4_default" : 1346,
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
      "SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
      "SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
      "SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
      "SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

範例 Delta Lake to-Delta Lake StreamingQueryListener 事件

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

範例 Kinesis-to-Delta Lake StreamingQueryListener 事件

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

範例 Kafka+Delta Lake to-Delta Lake StreamingQueryListener 事件

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Delta Lake StreamingQueryListener 事件的範例速率來源

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}