分享方式:


搭配 Databricks Lakehouse 監視使用自定義計量

重要

這項功能處於公開預覽狀態

此頁面說明如何在 Databricks Lakehouse 監視中建立自定義計量。 除了自動計算的分析與漂移統計數據之外,您還可以建立自定義計量。 例如,您可能想要追蹤加權平均值,以擷取商業規則的某些層面或使用自定義模型品質分數。 您也可以建立自定義漂移計量,以追蹤主要數據表中值的變更(相較於基準或上一個時間範圍)。

如需如何使用 MonitorMetric API 的詳細資訊,請參閱 API 參考

自訂計量的類型

Databricks Lakehouse 監視包含下列類型的自定義計量:

  • 匯總計量,這是根據主數據表中的數據行計算。 匯總計量會儲存在配置檔計量數據表中。
  • 衍生的計量,這是根據先前計算的匯總計量所計算,而且不會直接使用來自主數據表的數據。 衍生的計量會儲存在配置檔計量數據表中。
  • 漂移計量,其會比較先前計算的匯總或衍生計量來自兩個不同的時間範圍,或主要數據表與基準數據表之間的計量。 漂移計量會儲存在漂移計量數據表中。

盡可能使用衍生和漂移計量,將完整主數據表的重新計算降至最低。 僅匯總計量會從主數據表存取數據。 然後可以直接從匯總計量值計算衍生和漂移計量。

自定義計量參數

若要定義自定義計量,您可以建立 SQL 資料行表達式的 Jinja 範本 。 本節中的數據表描述定義計量的參數,以及 Jinja 範本中使用的參數。

參數 描述
type MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATEMonitorMetricType.CUSTOM_METRIC_TYPE_DERIVEDMonitorMetricType.CUSTOM_METRIC_TYPE_DRIFT 的其中之一。
name 計量數據表中自定義計量的數據行名稱。
input_columns 輸入數據表中應該計算計量的數據行名稱清單。 若要指出計算中使用了一個以上的資料列,請使用 :table。 請參閱本文中的範例。
definition 指定如何計算計量之 SQL 表達式的 Jinja 範本。 請參閱 建立定義
output_data_type JSON 字串格式之計量輸出的 Spark 資料類型。

創造 definition

參數 definition 必須是以 Jinja 範本形式的單一字串表達式。 它不能包含聯結或子查詢。 若要建構複雜的定義,您可以使用 Python 協助程式函式。

下表列出可用來建立 SQL Jinja 範本的參數,以指定如何計算計量。

參數 描述
{{input_column}} 用來計算自定義計量的數據行。
{{prediction_col}} 包含 ML 模型預測的數據行。 與分析搭配 InferenceLog 使用。
{{label_col}} 數據行持有 ML 模型地面真相標籤。 與分析搭配 InferenceLog 使用。
{{current_df}} 相對於前一個時間範圍,漂移。 上一個時間範圍的數據。
{{base_df}} 相對於基準數據表的漂移。 基準數據。

匯總計量範例

下列範例會計算資料列中值的平方平均值,並套用至 f1 資料行和 f2。 輸出會儲存為配置檔計量數據表中的新數據行,並顯示在對應至 f1 數據行和 f2的分析數據列中。 適用的數據行名稱會取代為 Jinja 參數 {{input_column}}

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
    name="squared_avg",
    input_columns=["f1", "f2"],
    definition="avg(`{{input_column}}`*`{{input_column}}`)",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

下列程式代碼會定義自定義計量,以計算數據行與 f2之間f1差異的平均值。 這個範例示範 在 [":table"] 參數中使用 input_columns ,指出計算中使用了數據表中的一個以上的數據行。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
    name="avg_diff_f1_f2",
    input_columns=[":table"],
    definition="avg(f1 - f2)",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

此範例會計算加權模型品質分數。 對於數據行為 Truecritical觀察,當該數據列的預測值不符合地面真相時,會指派較重的懲罰。 因為其定義於原始數據行 (predictionlabel),所以會定義為匯總計量。 數據 :table 行表示此計量是從多個數據行計算而來。 Jinja 參數 {{prediction_col}}{{label_col}} 會取代為監視器的預測和地面真相卷標數據行名稱。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
    name="weighted_error",
    input_columns=[":table"],
    definition="""avg(CASE
      WHEN {{prediction_col}} = {{label_col}} THEN 0
      WHEN {{prediction_col}} != {{label_col}} AND critical=TRUE THEN 2
      ELSE 1 END)""",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

衍生的計量範例

下列程式代碼會定義自定義計量,以計算本節稍早定義之計量的 squared_avg 平方根。 因為這是衍生的計量,因此不會參考主要數據表數據,而是以匯總計量來 squared_avg 定義。 輸出會儲存為配置檔計量數據表中的新數據行。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_DERIVED,
    name="root_mean_square",
    input_columns=["f1", "f2"],
    definition="sqrt(squared_avg)",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

漂移計量範例

下列程式代碼會定義漂移計量,以追蹤本節稍早定義的計量變更 weighted_error 。 和 {{current_df}}{{base_df}} 參數允許計量從目前視窗和比較窗口參考 weighted_error 值。 比較視窗可以是基準數據或上一個時間範圍中的資料。 漂移計量會儲存在漂移計量數據表中。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_DRIFT,
    name="error_rate_delta",
    input_columns=[":table"],
    definition="{{current_df}}.weighted_error - {{base_df}}.weighted_error",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)