共用方式為


教學課程第 4 部分:執行批次評分,並將預測儲存至 Lakehouse

在本教學課程中,您將瞭解如何匯入使用 Microsoft Fabric MLflow 模型登錄在第 3 部分中定型的已註冊 LightGBMClassifier 模型,並在從 Lakehouse 載入的測試數據集上執行批次預測。

Microsoft Fabric 可讓您使用稱為 PREDICT 的可調整函式來運作機器學習模型,該函式支援任何計算引擎中的批次評分。 您可以直接從 Microsoft Fabric 筆記本或指定的模型專案頁面產生批次預測。 瞭解 PREDICT

若要在測試數據集上產生批次預測,您將使用第 1 版定型的 LightGBM 模型,以示範所有定型機器學習模型的最佳效能。 您將將測試數據集載入 Spark DataFrame,並建立 MLFlowTransformer 物件來產生批次預測。 然後,您可以使用下列三種方式之一叫用 PREDICT 函式:

  • SynapseML 的轉換器 API
  • Spark SQL API
  • PySpark 使用者定義函式 (UDF)

必要條件

本教學課程系列的第 4 部分為 5。 若要完成本教學課程,請先完成:

在筆記本中跟著

4-predict.ipynb 是本教學課程隨附的筆記本。

若要開啟本教學課程隨附的筆記本,請遵循準備系統以進行數據科學教學課程中的指示,將筆記本匯入您的工作區。

如果您想要複製並貼上此頁面中的程式碼,您可以 建立新的筆記本

開始執行程序代碼之前,請務必將 Lakehouse 附加至筆記本

重要

附加您在此系列的其他部分所使用的相同 Lakehouse。

載入測試數據

載入您在第 3 部分中儲存的測試數據。

df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)

使用轉換器 API 預測

若要從 SynapseML 使用 Transformer API,您必須先建立 MLFlowTransformer 物件。

具現化 MLFlowTransformer 物件

MLFlowTransformer 對像是您在第 3 部分註冊之 MLFlow 模型周圍的包裝函式。 它可讓您在指定的 DataFrame 上產生批次預測。 若要具現化 MLFlowTransformer 物件,您必須提供下列參數:

  • 測試 DataFrame 中需要作為模型輸入的數據行(在此案例中,您需要全部數據行)。
  • 新輸出數據行的名稱(在此案例中為預測)。
  • 產生預測的正確模型名稱和模型版本(在此案例中為第 lgbm_sm 1 版)。
from synapse.ml.predict import MLFlowTransformer

model = MLFlowTransformer(
    inputCols=list(df_test.columns),
    outputCol='predictions',
    modelName='lgbm_sm',
    modelVersion=1
)

現在您已擁有 MLFlowTransformer 物件,您可以使用它來產生批次預測。

import pandas

predictions = model.transform(df_test)
display(predictions)

使用 Spark SQL API 預測

下列程式代碼會使用 Spark SQL API 叫用 PREDICT 函式。

from pyspark.ml.feature import SQLTransformer 

# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns

sqlt = SQLTransformer().setStatement( 
    f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")

# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))

使用使用者定義函數的 PREDICT (UDF)

下列程式代碼會使用 PySpark UDF 叫用 PREDICT 函式。

from pyspark.sql.functions import col, pandas_udf, udf, lit

# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns

display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))

請注意,您也可以從模型的項目頁面產生 PREDICT 程式代碼。 瞭解 PREDICT

將模型預測結果寫入 Lakehouse

產生批次預測之後,請將模型預測結果寫回 Lakehouse。

# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")

後續步驟

繼續執行到: