Tutorial Part 4: Perform batch scoring and save predictions to a lakehouse

In this tutorial, you'll learn to import the registered LightGBMClassifier model that was trained in part 3 using the Microsoft Fabric MLflow model registry, and perform batch predictions on a test dataset loaded from a lakehouse.

Microsoft Fabric allows you to operationalize machine learning models with a scalable function called PREDICT, which supports batch scoring in any compute engine. You can generate batch predictions directly from a Microsoft Fabric notebook or from a given model's item page. Learn about PREDICT.

To generate batch predictions on the test dataset, you'll use version 1 of the trained LightGBM model that demonstrated the best performance among all trained machine learning models. You'll load the test dataset into a spark DataFrame and create an MLFlowTransformer object to generate batch predictions. You can then invoke the PREDICT function using one of following three ways:

  • Transformer API from SynapseML
  • Spark SQL API
  • PySpark user-defined function (UDF)

Prerequisites

This part 4 of 5 in the tutorial series. To complete this tutorial, first complete:

Follow along in notebook

4-predict.ipynb is the notebook that accompanies this tutorial.

To open the accompanying notebook for this tutorial, follow the instructions in Prepare your system for data science tutorials, to import the notebook to your workspace.

If you'd rather copy and paste the code from this page, you can create a new notebook.

Be sure to attach a lakehouse to the notebook before you start running code.

Important

Attach the same lakehouse you used in the other parts of this series.

Load the test data

Load the test data that you saved in Part 3.

df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)

PREDICT with the Transformer API

To use the Transformer API from SynapseML, you'll need to first create an MLFlowTransformer object.

Instantiate MLFlowTransformer object

The MLFlowTransformer object is a wrapper around the MLFlow model that you registered in Part 3. It allows you to generate batch predictions on a given DataFrame. To instantiate the MLFlowTransformer object, you'll need to provide the following parameters:

  • The columns from the test DataFrame that you need as input to the model (in this case, you would need all of them).
  • A name for the new output column (in this case, predictions).
  • The correct model name and model version to generate the predictions (in this case, lgbm_sm and version 1).
from synapse.ml.predict import MLFlowTransformer

model = MLFlowTransformer(
    inputCols=list(df_test.columns),
    outputCol='predictions',
    modelName='lgbm_sm',
    modelVersion=1
)

Now that you have the MLFlowTransformer object, you can use it to generate batch predictions.

import pandas

predictions = model.transform(df_test)
display(predictions)

PREDICT with the Spark SQL API

The following code invokes the PREDICT function with the Spark SQL API.

from pyspark.ml.feature import SQLTransformer 

# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns

sqlt = SQLTransformer().setStatement( 
    f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")

# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))

PREDICT with a user-defined function (UDF)

The following code invokes the PREDICT function with a PySpark UDF.

from pyspark.sql.functions import col, pandas_udf, udf, lit

# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns

display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))

Note that you can also generate PREDICT code from a model's item page. Learn about PREDICT.

Write model prediction results to the lakehouse

Once you have generated batch predictions, write the model prediction results back to the lakehouse.

# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")

Next step

Continue on to: