這個教學展示了如何匯入你在 第三部分建立的註冊 LightGBMClassifier 模型。 那個教學使用 Microsoft Fabric MLflow 模型登錄檔來訓練模型,然後對從 lakehouse 載入的測試資料集進行批次預測。
使用 Microsoft Fabric,你可以利用可擴展的 PREDICT 函數,將機器學習模型實際化。 這個函式在任何運算引擎中都支援批次評分。 你可以直接從 Microsoft Fabric 筆記本產生批次預測,或從特定模型的項目頁面產生。 欲了解更多關於 PREDICT 函數的資訊,請造訪 此 資源。
要在測試資料集上產生批次預測,請使用訓練好的 LightGBM 模型版本 1。 該版本在所有訓練過的機器學習模型中表現最佳。 你將測試資料集載入 spark DataFrame,並建立 MLFlowTransformer 物件來產生批次預測。 接著你可以使用以下其中一種技術來呼叫 PREDICT 函數:
- SynapseML 的轉換器 API
- Spark SQL API
- PySpark 使用者定義函數 (UDF)
必要條件
取得 Microsoft Fabric 訂用帳戶。 或註冊免費的 Microsoft Fabric 試用版。
登入 Microsoft Fabric。
使用首頁左下角的體驗切換器切換到 Fabric。
這是五部分教學系列的第四部分。 若要完成本教學課程,請先完成:
- 第 1 部分:使用 Apache Spark 將資料內嵌至 Microsoft Fabric Lakehouse。
- 第 2 部分:使用 Microsoft Fabric 筆記本探索和視覺化資料,以深入了解資料。
- 第 3 部分:訓練和註冊機器學習模型。
遵循筆記本中的指示
4-predict.ipynb 是本教學課程隨附的筆記本。
若要開啟本教學課程隨附的筆記本,請遵循 準備系統以進行數據科學教學課程中的指示, 將筆記本匯入工作區。
如果您想要複製並貼上此頁面中的程式碼,則可以建立新的筆記本。
開始執行程式碼之前,請務必將 Lakehouse 連結至筆記本。
重要
連結您在此系列的其他部分中所使用的相同 Lakehouse。
載入測試資料
在以下程式碼片段中,載入你在第三部分儲存的測試資料:
df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)
具有轉換器 API 的 PREDICT
要使用 SynapseML 的 Transformer API,必須先建立一個 MLFlowTransformer 物件。
具現化 MLFlowTransformer 物件
MLFlowTransformer 物件作為封裝器,將你在第 3 部分註冊的 MLFlow 模型封裝起來。 它可讓您在指定的 DataFrame 上產生批次預測。 要實例化 MLFlowTransformer 物件,您必須提供以下參數:
- 模型需要作為輸入的測試 DataFrame 欄位(在此情況下,模型需要全部欄位)
- 新輸出欄位的名稱(此處為 預測)
- 正確的模型名稱與模型版本以產生預測(此例
lgbm_sm為版本 1)
以下程式碼片段處理這些步驟:
from synapse.ml.predict import MLFlowTransformer
model = MLFlowTransformer(
inputCols=list(df_test.columns),
outputCol='predictions',
modelName='lgbm_sm',
modelVersion=1
)
現在你有了 MLFlowTransformer 物件,可以用它來產生批次預測,如下程式碼片段所示:
import pandas
predictions = model.transform(df_test)
display(predictions)
具有 Spark SQL API 的 PREDICT
以下程式碼片段使用 Spark SQL API 來呼叫 PREDICT 函式:
from pyspark.ml.feature import SQLTransformer
# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns
sqlt = SQLTransformer().setStatement(
f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")
# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))
具有使用者定義函數 (UDF) 的 PREDICT
以下程式碼片段使用 PySpark UDF 來呼叫 PREDICT 函式:
from pyspark.sql.functions import col, pandas_udf, udf, lit
# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns
display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))
你也可以從模型的項目頁面產生 PREDICT 程式碼。 欲了解更多關於 PREDICT 函數的資訊,請造訪 此 資源。
將模型預測結果寫入 Lakehouse
在產生批次預測後,將模型預測結果寫回湖屋,如以下程式碼片段所示:
# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")
後續步驟
繼續執行: