搭配 Databricks Lakehouse 監視使用自定義計量
重要
這項功能處於公開預覽狀態。
此頁面說明如何在 Databricks Lakehouse 監視中建立自定義計量。 除了自動計算的分析與漂移統計數據之外,您還可以建立自定義計量。 例如,您可能想要追蹤加權平均值,以擷取商業規則的某些層面或使用自定義模型品質分數。 您也可以建立自定義漂移計量,以追蹤主要數據表中值的變更(相較於基準或上一個時間範圍)。
如需如何使用 MonitorMetric
API 的詳細資訊,請參閱 API 參考。
自訂計量的類型
Databricks Lakehouse 監視包含下列類型的自定義計量:
- 匯總計量,這是根據主數據表中的數據行計算。 匯總計量會儲存在配置檔計量數據表中。
- 衍生的計量,這是根據先前計算的匯總計量所計算,而且不會直接使用來自主數據表的數據。 衍生的計量會儲存在配置檔計量數據表中。
- 漂移計量,其會比較先前計算的匯總或衍生計量來自兩個不同的時間範圍,或主要數據表與基準數據表之間的計量。 漂移計量會儲存在漂移計量數據表中。
盡可能使用衍生和漂移計量,將完整主數據表的重新計算降至最低。 僅匯總計量會從主數據表存取數據。 然後可以直接從匯總計量值計算衍生和漂移計量。
自定義計量參數
若要定義自定義計量,您可以建立 SQL 資料行表達式的 Jinja 範本 。 本節中的數據表描述定義計量的參數,以及 Jinja 範本中使用的參數。
參數 | 描述 |
---|---|
type |
MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE 、MonitorMetricType.CUSTOM_METRIC_TYPE_DERIVED 或 MonitorMetricType.CUSTOM_METRIC_TYPE_DRIFT 的其中之一。 |
name |
計量數據表中自定義計量的數據行名稱。 |
input_columns |
輸入數據表中應該計算計量的數據行名稱清單。 若要指出計算中使用了一個以上的資料列,請使用 :table 。 請參閱本文中的範例。 |
definition |
指定如何計算計量之 SQL 表達式的 Jinja 範本。 請參閱 建立定義。 |
output_data_type |
JSON 字串格式之計量輸出的 Spark 資料類型。 |
創造 definition
參數 definition
必須是以 Jinja 範本形式的單一字串表達式。 它不能包含聯結或子查詢。 若要建構複雜的定義,您可以使用 Python 協助程式函式。
下表列出可用來建立 SQL Jinja 範本的參數,以指定如何計算計量。
參數 | 描述 |
---|---|
{{input_column}} |
用來計算自定義計量的數據行。 |
{{prediction_col}} |
包含 ML 模型預測的數據行。 與分析搭配 InferenceLog 使用。 |
{{label_col}} |
數據行持有 ML 模型地面真相標籤。 與分析搭配 InferenceLog 使用。 |
{{current_df}} |
相對於前一個時間範圍,漂移。 上一個時間範圍的數據。 |
{{base_df}} |
相對於基準數據表的漂移。 基準數據。 |
匯總計量範例
下列範例會計算資料列中值的平方平均值,並套用至 f1
資料行和 f2
。 輸出會儲存為配置檔計量數據表中的新數據行,並顯示在對應至 f1
數據行和 f2
的分析數據列中。 適用的數據行名稱會取代為 Jinja 參數 {{input_column}}
。
from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T
MonitorMetric(
type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
name="squared_avg",
input_columns=["f1", "f2"],
definition="avg(`{{input_column}}`*`{{input_column}}`)",
output_data_type=T.StructField("output", T.DoubleType()).json(),
)
下列程式代碼會定義自定義計量,以計算數據行與 f2
之間f1
差異的平均值。 這個範例示範 在 [":table"]
參數中使用 input_columns
,指出計算中使用了數據表中的一個以上的數據行。
from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T
MonitorMetric(
type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
name="avg_diff_f1_f2",
input_columns=[":table"],
definition="avg(f1 - f2)",
output_data_type=T.StructField("output", T.DoubleType()).json(),
)
此範例會計算加權模型品質分數。 對於數據行為 True
的critical
觀察,當該數據列的預測值不符合地面真相時,會指派較重的懲罰。 因為其定義於原始數據行 (prediction
和 label
),所以會定義為匯總計量。 數據 :table
行表示此計量是從多個數據行計算而來。 Jinja 參數 {{prediction_col}}
和 {{label_col}}
會取代為監視器的預測和地面真相卷標數據行名稱。
from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T
MonitorMetric(
type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
name="weighted_error",
input_columns=[":table"],
definition="""avg(CASE
WHEN {{prediction_col}} = {{label_col}} THEN 0
WHEN {{prediction_col}} != {{label_col}} AND critical=TRUE THEN 2
ELSE 1 END)""",
output_data_type=T.StructField("output", T.DoubleType()).json(),
)
衍生的計量範例
下列程式代碼會定義自定義計量,以計算本節稍早定義之計量的 squared_avg
平方根。 因為這是衍生的計量,因此不會參考主要數據表數據,而是以匯總計量來 squared_avg
定義。 輸出會儲存為配置檔計量數據表中的新數據行。
from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T
MonitorMetric(
type=MonitorMetricType.CUSTOM_METRIC_TYPE_DERIVED,
name="root_mean_square",
input_columns=["f1", "f2"],
definition="sqrt(squared_avg)",
output_data_type=T.StructField("output", T.DoubleType()).json(),
)
漂移計量範例
下列程式代碼會定義漂移計量,以追蹤本節稍早定義的計量變更 weighted_error
。 和 {{current_df}}
{{base_df}}
參數允許計量從目前視窗和比較窗口參考 weighted_error
值。 比較視窗可以是基準數據或上一個時間範圍中的資料。 漂移計量會儲存在漂移計量數據表中。
from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T
MonitorMetric(
type=MonitorMetricType.CUSTOM_METRIC_TYPE_DRIFT,
name="error_rate_delta",
input_columns=[":table"],
definition="{{current_df}}.weighted_error - {{base_df}}.weighted_error",
output_data_type=T.StructField("output", T.DoubleType()).json(),
)
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應