教程第 4 部分:执行批量评分并将预测保存到湖屋

本教程介绍如何使用 Microsoft Fabric MLflow 模型注册表导入第 3 部分中经过训练和注册的 LightGBMRegressor 模型,并针对从湖屋加载的测试数据集执行批量预测。

Microsoft Fabric 允许用户使用名为 PREDICT 的可缩放函数操作机器学习模型,该函数支持在任何计算引擎中进行批量评分。 可以直接从 Microsoft Fabric 笔记本或给定模型的项页生成批量预测。 了解 PREDICT

若要基于测试数据集生成批量预测,将使用经过训练的 LightGBM 模型的版本 1,该版本模型在所有已训练的机器学习模型中性能最佳。 将测试数据集加载到 Spark DataFrame 中,并创建 MLFlowTransformer 对象以生成批量预测。 然后,可通过以下三种方式之一调用 PREDICT 函数:

  • SynapseML 中的转换器 API
  • Spark SQL API
  • PySpark 用户定义函数 (UDF)

先决条件

这是系列教程的第 4 部分(共 5 部分)。 若要完成本教程,请先完成:

在笔记本中继续操作

4-predict.ipynb 是本教程随附的笔记本。

若要打开本教程随附的笔记本,请按照让系统为数据科学做好准备教程中的说明操作,将该笔记本导入到工作区。

如果要从此页面复制并粘贴代码,则可以创建新的笔记本

在开始运行代码之前,请务必将湖屋连接到笔记本

重要

随附在此系列的其他部分中使用的同一个湖屋。

加载测试数据

加载第 3 部分中保存的测试数据。

df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)

使用转换器 API 的 PREDICT

若要使用 SynapseML 中的转换器 API,首先需要创建 MLFlowTransformer 对象。

实例化 MLFlowTransformer 对象

MLFlowTransformer 对象是在第 3 部分中注册的 MLFlow 模型周围的包装器。 它允许基于给定数据帧生成批量预测。 若要实例化 MLFlowTransformer 对象,需要提供以下参数:

  • 测试数据帧中需要作为模型输入的列(在本例中,需要所有列)。
  • 新输出列的名称(本例中为“预测”)。
  • 用于生成预测的正确模型名称和模型版本(本例中为 lgbm_sm 和版本 1)。
from synapse.ml.predict import MLFlowTransformer

model = MLFlowTransformer(
    inputCols=list(df_test.columns),
    outputCol='predictions',
    modelName='lgbm_sm',
    modelVersion=1
)

有了 MLFlowTransformer 对象后,可以使用它来生成批量预测。

import pandas

predictions = model.transform(df_test)
display(predictions)

使用 Spark SQL API 的 PREDICT

以下代码使用 Spark SQL API 调用 PREDICT 函数。

from pyspark.ml.feature import SQLTransformer 

# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns

sqlt = SQLTransformer().setStatement( 
    f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")

# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))

使用用户定义函数 (UDF) 的 PREDICT

以下代码使用 PySpark UDF 调用 PREDICT 函数。

from pyspark.sql.functions import col, pandas_udf, udf, lit

# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns

display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))

请注意,也可以从模型的项页生成 PREDICT 代码。 了解 PREDICT

将模型预测结果写入湖屋

生成批量预测后,立即将模型预测结果写回到湖屋。

# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")

下一步

继续学习: