Azure Databricks 透過 [串流] 索引卷標下的 Spark UI,為結構化串流 應用程式提供內建監視。
區分 Spark UI 中的結構化串流查詢
藉由在程式代碼中新增 .queryName(<query-name>)
,為您的數據流提供唯一的查詢名稱,以便在 Spark UI 中輕鬆地區分哪些度量屬於哪個數據流。
將結構化串流計量推送至外部服務
串流指標可以透過使用 Apache Spark 的串流查詢偵聽器介面推送至外部服務,供警報或儀表板使用情境使用。 在 Databricks Runtime 11.3 LTS 和更新版本中,StreamingQueryListener
可在 Python 和 Scala 中使用。
重要
下列限制適用於使用已啟用 Unity 目錄的計算存取模式的工作負載:
-
StreamingQueryListener
需要 Databricks Runtime 15.1 或更高版本才能使用憑證,或在專用存取模式下與 Unity Catalog 管理的物件互動。 -
StreamingQueryListener
針對以標準存取模式(先前稱為共用存取模式)設定的 Scala 工作負載需要 Databricks Runtime 16.1 或更新版本。
注意
使用接聽程式處理延遲可能會大幅影響查詢處理速度。 建議您限制這些接聽程式中的處理邏輯,並選擇寫入 Kafka 等快速響應系統以提高效率。
下列程式代碼提供實作接聽程式之語法的基本範例:
程式語言 Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
在結構化串流中定義可觀察的計量
可觀察的計量會命名為可在查詢上定義的任意聚合函數(DataFrame)。 一旦 DataFrame 的執行到達一個完成點(例如完成一次批次查詢或達到一個串流 epoch),系統就會發出一個具名事件,其中包含自上次完成點以來所處理資料的相關指標。
您可以將監聽器附加至Spark工作階段,以觀察這些指標。 接聽程式取決於執行模式:
批次模式:使用
QueryExecutionListener
。QueryExecutionListener
會在查詢完成時呼叫。 使用QueryExecution.observedMetrics
地圖存取計量。串流或微批次:使用
StreamingQueryListener
。當串流查詢完成一個時間周期時,會呼叫
StreamingQueryListener
。 使用StreamingQueryProgress.observedMetrics
地圖存取計量。 Azure Databricks 不支援串流continuous
觸發模式。
例如:
程式語言 Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
對應 Unity 目錄、Delta Lake 和結構化串流計量數據表標識碼
結構化串流計量會針對作為串流查詢來源之 Delta 數據表的唯一身分識別,使用 reservoirId
數個位置的欄位。
reservoirId
欄位會映射 Delta 交易日誌中 Delta 資料表所儲存的唯一識別碼。 此 ID 無法 對應至 Unity Catalog 所指派的 tableId
值,並顯示於 Catalog Explorer 中。
使用下列語法來檢閱 Delta 數據表的數據表標識碼。 這適用於 Unity Catalog 受控數據表、Unity Catalog 外部數據表,以及所有 Hive Metastore Delta 資料表:
DESCRIBE DETAIL <table-name>
結果中顯示的 id
字段是對應至串流計量中 reservoirId
的標識碼。
StreamingQueryListener 物件計量
領域 | 描述 |
---|---|
id |
在多次重新啟動中持續保存的唯一查詢識別碼。 |
runId |
查詢 ID 在每次啟動/重新啟動時都是唯一的。 請參閱 StreamingQuery.runId()。 |
name |
使用者指定的查詢名稱。 如果未指定名稱,則 Name 為 null。 |
timestamp |
microbatch 執行的時間戳。 |
batchId |
目前正在處理之數據批次的唯一標識符。 在失敗后重試的情況下,可能會多次執行指定的批次標識碼。 同樣地,如果沒有要處理的數據,批次標識符就不會遞增。 |
batchDuration |
批次作業的處理持續時間,以毫秒為單位。 |
numInputRows |
在觸發程式中處理的記錄總數(跨所有來源)。 |
inputRowsPerSecond |
抵達數據的匯總(跨所有來源)速率。 |
processedRowsPerSecond |
Spark 正在處理數據的匯總 (跨所有來源) 速率。 |
StreamingQueryListener
也會定義下列欄位,其中包含您可以檢查客戶計量和來源進度詳細資料的物件:
領域 | 描述 |
---|---|
durationMs |
類型:ju.Map[String, JLong] 。 請參閱 durationMs 物件。 |
eventTime |
類型:ju.Map[String, String] 。 請參閱 eventTime 物件。 |
stateOperators |
類型:Array[StateOperatorProgress] 。 請參閱 stateOperators 物件。 |
sources |
類型:Array[SourceProgress] 。 請參閱 sources 物件。 |
sink |
類型:SinkProgress 。 參見 匯入物件。 |
observedMetrics |
類型:ju.Map[String, Row] 。 可在 DataFrame/查詢上定義的具名任意聚合函數(例如 df.observe )。 |
durationMs 物件
物件類型: ju.Map[String, JLong]
完成微批次執行程式各個階段所花費時間的相關信息。
領域 | 描述 |
---|---|
durationMs.addBatch |
執行 microbatch 所需的時間。 這不包括Spark規劃 microbatch 所需的時間。 |
durationMs.getBatch |
從來源擷取有關位移的元數據所花費的時間。 |
durationMs.latestOffset |
microbatch 所耗用的最新位移。 此進度物件是指從來源擷取最新位移所花費的時間。 |
durationMs.queryPlanning |
產生執行計劃所花費的時間。 |
durationMs.triggerExecution |
規劃和執行 microbatch 所需的時間。 |
durationMs.walCommit |
提交新可用位移所需的時間。 |
durationMs.commitBatch |
在 addBatch 期間所需的時間用來提交寫入至匯入端的數據。 僅適用於支援提交的匯流器。 |
durationMs.commitOffsets |
提交批次至提交日誌所需的時間。 |
eventTime 物件
物件類型: ju.Map[String, String]
有關在 microbatch 中處理之數據內所見事件時間值的資訊。 水印會使用此資料來確定如何修剪狀態,從而處理結構化串流作業中定義的具狀態聚合。
領域 | 描述 |
---|---|
eventTime.avg |
在該觸發程式中看到的平均事件時間。 |
eventTime.max |
該觸發事件中觀察到的最大事件時間。 |
eventTime.min |
在該觸發器中看到的最小事件時間。 |
eventTime.watermark |
該觸發程式中使用的浮浮水印值。 |
stateOperators 物件
物件類型: Array[StateOperatorProgress]
物件 stateOperators
包含結構化串流作業中定義之具狀態作業的相關信息,以及從中產生的匯總。
如需串流狀態運算符的詳細資訊,請參閱 什麼是具狀態串流?。
領域 | 描述 |
---|---|
stateOperators.operatorName |
計量關聯之具狀態運算子的名稱,例如 symmetricHashJoin 、 dedupe 或 stateStoreSave 。 |
stateOperators.numRowsTotal |
狀態中的行總數,是由具狀態的運算符或匯總操作產生的。 |
stateOperators.numRowsUpdated |
狀態中因具狀態運算符或匯總而更新的數據列總數。 |
stateOperators.allUpdatesTimeMs |
此計量目前無法由Spark測量,並計劃在未來的更新中移除。 |
stateOperators.numRowsRemoved |
因狀態運算符或聚合而從狀態中移除的總行數。 |
stateOperators.allRemovalsTimeMs |
此計量目前無法由Spark測量,並計劃在未來的更新中移除。 |
stateOperators.commitTimeMs |
提交所有更新(新增和移除)所需的時間,並返回新版本。 |
stateOperators.memoryUsedBytes |
狀態存放區所使用的記憶體。 |
stateOperators.numRowsDroppedByWatermark |
被視為太晚而無法包含在具狀態匯總中的數據列數目。 僅串流匯總:匯總後被卸除的行數(而非原始輸入行數)。 這個數字不精確,但它提供指示顯示有延遲的數據被捨棄。 |
stateOperators.numShufflePartitions |
狀態運算子的洗牌分割區數量。 |
stateOperators.numStateStoreInstances |
運算子已初始化與維護的狀態存儲實例。 對於許多具狀態運算符,這與分割區數目相同。 不過,數據流串流聯結會為每個分割區初始化四個狀態存放區。 |
stateOperators.customMetrics |
如需詳細資訊,請參閱本主題中的 stateOperators.customMetrics 。 |
StateOperatorProgress.customMetrics 自定義度量物件
物件類型: ju.Map[String, JLong]
StateOperatorProgress
具有一個欄位 customMetrics
,其中包含你在收集這些計量時所使用功能的專屬計量。
特徵 / 功能 | 描述 |
---|---|
RocksDB 狀態存放區 | RocksDB 狀態存放區的計量。 |
HDFS 狀態存放區 | HDFS 狀態存放區的計量。 |
串流去重 | 列去重的指標。 |
串流匯總 | 行聚合的指標。 |
串流聯結運算符 | 流聯結運算子的度量指標。 |
transformWithState |
運算子的 transformWithState 指標。 |
RocksDB 狀態存放區自定義計量
從 RocksDB 收集的資訊中,捕捉其效能和操作的計量,這些計量與其為結構化串流工作維護的具狀態值相關。 如需詳細資訊,請參閱 在 Azure Databricks 上設定 RocksDB 狀態存放區。
領域 | 描述 |
---|---|
customMetrics.rocksdbBytesCopied |
RocksDB 檔案管理員所追蹤的位元組數量。 |
customMetrics.rocksdbCommitCheckpointLatency |
擷取原生 RocksDB 快照和寫入本機目錄的時間,以毫秒計算。 |
customMetrics.rocksdbCompactLatency |
在檢查點提交期間,合併的時間(以毫秒為單位,選擇性)。 |
customMetrics.rocksdbCommitCompactLatency |
提交期間的壓縮時間,以毫秒為單位。 |
customMetrics.rocksdbCommitFileSyncLatencyMs |
將原生 RocksDB 快照集同步至外部記憶體的時間以毫秒為單位(檢查點位置)。 |
customMetrics.rocksdbCommitFlushLatency |
排清 RocksDB 記憶體內部變更至本機磁碟的時間,以毫秒為單位。 |
customMetrics.rocksdbCommitPauseLatency |
停止背景工作線程作為檢查點提交一部分所需的時間(以毫秒計),例如用於壓縮的時間。 |
customMetrics.rocksdbCommitWriteBatchLatency |
將分段WriteBatch 寫入套用至原生 RocksDB 的毫秒數。 |
customMetrics.rocksdbFilesCopied |
RocksDB 檔案管理員所追蹤的複製的檔案數量。 |
customMetrics.rocksdbFilesReused |
RocksDB 檔案管理員所追蹤的重複使用檔案數目。 |
customMetrics.rocksdbGetCount |
呼叫的數量 get (不包括來自 gets 的 WriteBatch ,用於暫存寫入的記憶體內部批次)。 |
customMetrics.rocksdbGetLatency |
基礎原生 RocksDB::Get 呼叫之 nanoseconds 的平均時間。 |
customMetrics.rocksdbReadBlockCacheHitCount |
來自 RocksDB 中區塊快取的快取叫用計數。 |
customMetrics.rocksdbReadBlockCacheMissCount |
RocksDB 中的區塊快取遺漏計數。 |
customMetrics.rocksdbSstFileSize |
RocksDB 實體中所有靜態排序數據表 (SST) 檔案的大小。 |
customMetrics.rocksdbTotalBytesRead |
由 get 操作讀取的未壓縮位元組數目。 |
customMetrics.rocksdbTotalBytesWritten |
作業寫入 put 的未壓縮位元組總數。 |
customMetrics.rocksdbTotalBytesReadThroughIterator |
使用反覆運算器讀取未壓縮數據的位元組總數。 某些有狀態的操作(例如 FlatMapGroupsWithState 中的逾時處理和浮水印)的讀取需要通過迭代器將數據讀入 Azure Databricks。 |
customMetrics.rocksdbTotalBytesReadByCompaction |
壓縮程式從磁碟讀取的位元組數目。 |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
壓縮程式寫入磁碟的位元組總數。 |
customMetrics.rocksdbTotalCompactionLatencyMs |
RocksDB 壓縮的時間以毫秒為單位,包括背景壓縮和認可期間起始的選擇性壓縮。 |
customMetrics.rocksdbTotalFlushLatencyMs |
排清時間總計,包括背景排清。 排清作業是將 MemTable 內儲存空間滿載時進行排清至儲存裝置的過程。
MemTables 是數據儲存在 RocksDB 的第一個層級。 |
customMetrics.rocksdbZipFileBytesUncompressed |
檔案管理員所報告的未壓縮 zip 檔案大小,以位元組為單位。 檔案管理員會管理實體 SST 檔案磁碟空間使用率和刪除。 |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
RocksDB 快照集的最新版本已儲存至檢查點位置。 值為 “-1” 表示從未儲存過快照集。 由於快照集是每個狀態存放區實例特有的,因此此度量會套用至特定的分割區 ID 和狀態存放區名稱。 |
customMetrics.rocksdbPutLatency |
總發送呼叫延遲。 |
customMetrics.rocksdbPutCount |
put 呼叫的數目。 |
customMetrics.rocksdbWriterStallLatencyMs |
寫入器等候壓縮或排清完成的時間。 |
customMetrics.rocksdbTotalBytesWrittenByFlush |
排清所寫入的總位元組數 |
customMetrics.rocksdbPinnedBlocksMemoryUsage |
已釘選區塊的記憶體使用量 |
customMetrics.rocksdbNumInternalColFamiliesKeys |
內部欄族的內部鍵數量 |
customMetrics.rocksdbNumExternalColumnFamilies |
外部欄族的數量 |
customMetrics.rocksdbNumInternalColumnFamilies |
內部欄族的數目 |
HDFS 狀態存儲自定義度量
有關 HDFS 狀態存放區提供者行為和作業的資訊。
領域 | 描述 |
---|---|
customMetrics.stateOnCurrentVersionSizeBytes |
僅限於目前版本中狀態的估計大小。 |
customMetrics.loadedMapCacheHitCount |
在提供者中快取的狀態的快取命中計數。 |
customMetrics.loadedMapCacheMissCount |
提供者中快取狀態的快取遺漏計數。 |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
特定狀態存放區實例上次上傳的快照集版本。 |
重複資料刪除自定義指標
收集重複資料刪除行為和作業的相關信息。
領域 | 描述 |
---|---|
customMetrics.numDroppedDuplicateRows |
刪除的重複數據列數目。 |
customMetrics.numRowsReadDuringEviction |
狀態淘汰期間讀取的狀態行數。 |
匯總自定義計量
收集匯總行為和作業的相關信息。
領域 | 描述 |
---|---|
customMetrics.numRowsReadDuringEviction |
狀態淘汰期間讀取的狀態行數。 |
串流連接自定義指標
有關串流聯結行為和作業的資訊。
領域 | 描述 |
---|---|
customMetrics.skippedNullValueCount |
設定null 為spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled 時,略過的true 值數目。 |
transformWithState 自定義計量
收集有關 (TWS) 行為和作業的資訊 transformWithState
。 如需transformWithState
的詳細資訊,請參閱建置自定義具狀態應用程式。
領域 | 描述 |
---|---|
customMetrics.initialStateProcessingTimeMs |
處理所有初始狀態所花費的毫秒數。 |
customMetrics.numValueStateVars |
值狀態變數的數目。 也為 transformWithStateInPandas 提供 。 |
customMetrics.numListStateVars |
清單狀態變數的數目。 也為 transformWithStateInPandas 提供 。 |
customMetrics.numMapStateVars |
地圖狀態變數的數目。 也為 transformWithStateInPandas 提供 。 |
customMetrics.numDeletedStateVars |
已刪除的狀態變數數目。 也為 transformWithStateInPandas 提供 。 |
customMetrics.timerProcessingTimeMs |
處理所有定時器所花費的毫秒數 |
customMetrics.numRegisteredTimers |
已註冊的定時器數目。 也為 transformWithStateInPandas 提供 。 |
customMetrics.numDeletedTimers |
已刪除的定時器數目。 也為 transformWithStateInPandas 提供 。 |
customMetrics.numExpiredTimers |
過期定時器的數目。 也為 transformWithStateInPandas 提供 。 |
customMetrics.numValueStateWithTTLVars |
具有 TTL 的值狀態變數數目。 也為 transformWithStateInPandas 提供 。 |
customMetrics.numListStateWithTTLVars |
具有TTL的清單狀態變數數目。 也為 transformWithStateInPandas 提供 。 |
customMetrics.numMapStateWithTTLVars |
具有 TTL 的對應狀態變數數目。 也為 transformWithStateInPandas 提供 。 |
customMetrics.numValuesRemovedDueToTTLExpiry |
由於 TTL 到期而移除的值數目。 也為 transformWithStateInPandas 提供 。 |
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry |
因 TTL 到期而累加移除的值數目。 |
來源物件
物件類型: Array[SourceProgress]
物件 sources
包含串流數據源的信息和計量。
領域 | 描述 |
---|---|
description |
串流數據源數據表的詳細描述。 |
startOffset |
串流作業啟動所在的數據源數據表內的起始位移號碼。 |
endOffset |
microbatch 處理的最後一個偏移量。 |
latestOffset |
microbatch 所處理的最新位移。 |
numInputRows |
從這個來源處理的輸入數據列數目。 |
inputRowsPerSecond |
速率,以秒為單位,數據會從這個來源抵達以進行處理。 |
processedRowsPerSecond |
Spark 正在處理此來源數據的速率。 |
metrics |
類型:ju.Map[String, String] 。 包含特定數據源的自定義計量。 |
Azure Databricks 提供下列來源對象實作:
注意
請將表單 sources.<startOffset / endOffset / latestOffset>.*
中定義的欄位(或某些變形)解釋為最多 3 個可能的欄位,這些欄位都包含所指示的子欄位:
sources.startOffset.<child-field>
sources.endOffset.<child-field>
sources.latestOffset.<child-field>
Delta Lake sources 物件
用於 Delta 表串流數據來源的自定義指標的定義。
領域 | 描述 |
---|---|
sources.description |
串流查詢從中讀取的來源描述。 例如: “DeltaSource[table]” 。 |
sources.<startOffset / endOffset>.sourceVersion |
該位移所使用編碼的序列化版本。 |
sources.<startOffset / endOffset>.reservoirId |
正在讀取之數據表的標識碼。 這會用來偵測重新啟動查詢時設定錯誤。 請參閱 Unity Catalog、Delta Lake 和 Structured Streaming 的指標資料表標識符。 |
sources.<startOffset / endOffset>.reservoirVersion |
目前正在處理的數據表版本。 |
sources.<startOffset / endOffset>.index |
此版本中序列中的索引 AddFiles 。 這是用於將大型提交分成多個批次。 此索引是根據 modificationTimestamp 和 path 來排序建立。 |
sources.<startOffset / endOffset>.isStartingVersion |
識別當前位移是否代表新串流查詢的開始,而不是初始數據處理後變更的處理。 啟動新的查詢時,會先處理數據表中的所有數據,然後再處理任何抵達的新數據。 |
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis |
用來排列順序的事件時間被記錄下來。 待處理之初始快照資料的事件時間。 處理初始快照集時,按事件時間順序進行。 |
sources.latestOffset |
微批次查詢所處理的最新偏移量。 |
sources.numInputRows |
從這個來源處理的輸入數據列數目。 |
sources.inputRowsPerSecond |
數據從這個來源抵達用於處理的速率。 |
sources.processedRowsPerSecond |
Spark 正在處理此來源數據的速率。 |
sources.metrics.numBytesOutstanding |
待處理檔案的合併大小(由 RocksDB 追蹤的檔案)。 這是 Delta 和 Auto Loader 作為串流來源的積壓指標。 |
sources.metrics.numFilesOutstanding |
要處理的未處理檔案數目。 這是 Delta 和 Auto Loader 作為串流來源的積壓指標。 |
Apache Kafka 來源物件
用於 Apache Kafka 串流數據源之自定義計量的定義。
領域 | 描述 |
---|---|
sources.description |
Kafka 來源的詳細描述,指定要從中讀取的確切 Kafka 主題。 例如: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” 。 |
sources.startOffset |
Kafka 主題中啟動串流作業的起始位移號碼。 |
sources.endOffset |
microbatch 處理的最後一個偏移量。 這可能等於 latestOffset 進行中的微批次執行。 |
sources.latestOffset |
由 microbatch 所計算的最新位移。 有節流時,微批次程式可能不會處理所有位移,這會導致 endOffset 和 latestOffset 差異。 |
sources.numInputRows |
從這個來源處理的輸入數據列數目。 |
sources.inputRowsPerSecond |
數據從這個來源抵達用於處理的速率。 |
sources.processedRowsPerSecond |
Spark 正在處理此來源數據的速率。 |
sources.metrics.avgOffsetsBehindLatest |
串流查詢的平均位移數是所有已訂閱主題中最新的可用位移後方。 |
sources.metrics.estimatedTotalBytesBehindLatest |
查詢程序未從已訂閱的主題中取用的估計位元組數量。 |
sources.metrics.maxOffsetsBehindLatest |
串流查詢在所有已訂閱主題中相較於最新可用偏移量所落後的最大偏移量數目。 |
sources.metrics.minOffsetsBehindLatest |
串流查詢在所有訂閱主題中相較於最新可用位移的最小落後位移數。 |
自動載入器來源計量
自動載入器串流數據來源所使用的自訂計量定義。
領域 | 描述 |
---|---|
sources.<startOffset / endOffset / latestOffset>.seqNum |
正在依據發現檔案的順序,處理檔案時的當前位置。 |
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
cloudFiles 來源的實作版本。 |
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs |
最近回填作業的開始時間。 |
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs |
最近回填作業的結束時間。 |
sources.<startOffset / endOffset / latestOffset>.lastInputPath |
在重新啟動數據流之前,數據流的最後一個使用者提供輸入路徑。 |
sources.metrics.numFilesOutstanding |
待辦專案中的檔案數目 |
sources.metrics.numBytesOutstanding |
待處理項目中檔案的大小(位元元組) |
sources.metrics.approximateQueueSize |
訊息佇列的近似大小。 只有在已啟用 cloudFiles.useNotifications 選項時。 |
sources.numInputRows |
從這個來源處理的輸入數據列數目。 對於binaryFile 來源格式,numInputRows 等於檔案的數量。 |
PubSub 來源指標
用於 PubSub 串流數據來源之自訂計量的定義。 如需監視 PubSub 串流來源的詳細資訊,請參閱 監視串流計量。
領域 | 描述 |
---|---|
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
此位移編碼的實作版本。 |
sources.<startOffset / endOffset / latestOffset>.seqNum |
正在處理的持久化序號。 |
sources.<startOffset / endOffset / latestOffset>.fetchEpoch |
正在處理的最大擷取時代。 |
sources.metrics.numRecordsReadyToProcess |
目前待辦專案中可用於處理的記錄數目。 |
sources.metrics.sizeOfRecordsReadyToProcess |
目前積壓未處理數據的位元組總大小。 |
sources.metrics.numDuplicatesSinceStreamStart |
串流自啟動後所處理之重複記錄的總計數。 |
Pulsar 來源度量指標
用於 Pulsar 串流數據源之自定義計量的定義。
領域 | 描述 |
---|---|
sources.metrics.numInputRows |
目前微批次中處理的行數量。 |
sources.metrics.numInputBytes |
目前微批次中處理的位元組總數。 |
接收器物件
物件類型: SinkProgress
領域 | 描述 |
---|---|
sink.description |
接收器的描述,詳細說明所使用的特定接收器實現。 |
sink.numOutputRows |
輸出數據列的數目。 不同的匯入點類型可能會對值有不同的行為或限制。 請參閱特定的支持類型 |
sink.metrics |
ju.Map[String, String] 的數據接收器計量。 |
目前,Azure Databricks 提供兩個特定的 sink
對象實作:
匯入器類型 | 詳細資訊 |
---|---|
Delta 表 | 請參閱 Delta sink 物件。 |
Apache Kafka 主題 | 請參閱 Kafka 的匯入物件。 |
欄位 sink.metrics
對於物件的這兩個變體 sink
的行為相同。
Delta Lake 匯入物件
領域 | 描述 |
---|---|
sink.description |
Delta 接收的描述,詳細說明所使用的特定 Delta 接收實作。 例如: “DeltaSink[table]” 。 |
sink.numOutputRows |
行的數量永遠都是-1 ,因為 Spark 無法推斷 DSv1 接收器的輸出行,而這是 Delta Lake 接收器的分類。 |
Apache Kafka 匯出器物件
領域 | 描述 |
---|---|
sink.description |
串流查詢正在寫入的 Kafka 匯入的說明,詳細說明所使用的特定 Kafka 匯入的實施。 例如: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” 。 |
sink.numOutputRows |
寫入輸出表或匯入作為微批次一部分的行數。 在某些情況下,這個值可以是 “-1”,通常可以解譯為「未知」。 |
範例
範例 Kafka-to-Kafka 的 StreamingQueryListener 事件
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
範例 Delta Lake to-Delta Lake StreamingQueryListener 事件
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
範例 Kinesis-to-Delta Lake StreamingQueryListener 事件
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
範例 Kafka+Delta Lake to-Delta Lake StreamingQueryListener 事件
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Delta Lake StreamingQueryListener 事件的範例速率來源
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}