Självstudie del 4: Utföra batchbedömning och spara förutsägelser till ett lakehouse

I denna handledning får du lära dig att importera den redan registrerade modellen LightGBMClassifier, som tränades i del 3, med hjälp av modellregistret för Microsoft Fabric MLflow och utföra batchförutsägelser på en testdatamängd som har lästs in från ett datahus.

Med Microsoft Fabric kan du operationalisera maskininlärningsmodeller med en skalbar funktion som heter PREDICT, som stöder batchbedömning i alla beräkningsmotorer. Du kan generera batchförutsägelser direkt från en Microsoft Fabric-anteckningsbok eller från en viss modells objektsida. Läs mer om PREDICT.

För att generera batchförutsägelser på testdatauppsättningen använder du version 1 av den tränade LightGBM-modellen som visade bästa prestanda bland alla tränade maskininlärningsmodeller. Du läser in testdatauppsättningen i en spark DataFrame och skapar ett MLFlowTransformer-objekt för att generera batchförutsägelser. Du kan sedan anropa funktionen PREDICT på något av följande tre sätt:

  • Transformera API från SynapseML
  • Spark SQL API
  • Användardefinierad pySpark-funktion (UDF)

Förutsättningar

Den här delen 4 av 5 i självstudieserien. Slutför den här självstudien genom att först slutföra:

Följ med i anteckningsboken

4-predict.ipynb är anteckningsboken som medföljer den här handledningen.

Viktig

Bifoga samma sjöhus som du använde i de andra delarna av den här serien.

Läs in testdata

Läs in testdata som du sparade i del 3.

df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)

FÖRUTSÄGA med transformerings-API:et

Om du vill använda Transformer-API:et från SynapseML måste du först skapa ett MLFlowTransformer-objekt.

Instansiera MLFlowTransformer-objekt

MLFlowTransformer-objektet är en omslutning runt MLFlow-modellen som du registrerade i del 3. Det gör att du kan generera batchförutsägelser på en viss DataFrame. Om du vill instansiera MLFlowTransformer-objektet måste du ange följande parametrar:

  • Kolumnerna från testdataramen som du behöver som indata till modellen (i det här fallet behöver du alla).
  • Ett namn på den nya utdatakolumnen (i det här fallet förutsägelser).
  • Rätt modellnamn och modellversion för att generera förutsägelserna (i det här fallet lgbm_sm och version 1).
from synapse.ml.predict import MLFlowTransformer

model = MLFlowTransformer(
    inputCols=list(df_test.columns),
    outputCol='predictions',
    modelName='lgbm_sm',
    modelVersion=1
)

Nu när du har MLFlowTransformer-objektet kan du använda det för att generera batchförutsägelser.

import pandas

predictions = model.transform(df_test)
display(predictions)

FÖRUTSÄGA med Spark SQL API

Följande kod anropar funktionen PREDICT med Spark SQL API.

from pyspark.ml.feature import SQLTransformer 

# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns

sqlt = SQLTransformer().setStatement( 
    f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")

# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))

PREDICT med en användardefinierad funktion (UDF)

Följande kod anropar funktionen PREDICT med en PySpark UDF.

from pyspark.sql.functions import col, pandas_udf, udf, lit

# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns

display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))

Observera att du också kan generera PREDICT-kod från en modells objektsida. Läs mer om PREDICT.

Skriva modellförutsägelseresultat till lakehouse

När du har genererat batchförutsägelser skriver du modellförutsägelseresultaten tillbaka till lakehouse.

# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")

Nästa steg

Fortsätt till: