Inferenza ONNX in Spark

In questo esempio si esegue il training di un modello LightGBM e si converte il modello in formato ONNX . Dopo la conversione, si usa il modello per dedurre alcuni dati di test in Spark.

Questo esempio usa i pacchetti e le versioni Python seguenti:

  • onnxmltools==1.7.0
  • lightgbm==3.2.1

Prerequisiti

  • Collegare il notebook a una lakehouse. Sul lato sinistro selezionare Aggiungi per aggiungere una lakehouse esistente o creare una lakehouse.
  • Potrebbe essere necessario eseguire l'installazione onnxmltools aggiungendo !pip install onnxmltools==1.7.0 in una cella di codice e quindi eseguendo la cella.

Caricare i dati di esempio

Per caricare i dati di esempio, aggiungere gli esempi di codice seguenti alle celle del notebook e quindi eseguire le celle:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)

display(df)

L'output dovrebbe essere simile alla tabella seguente, anche se i valori e il numero di righe possono differire:

Rapporto copertura interessi Contrassegno del reddito netto Capitale azionario in responsabilità
0.5641 1.0 0.0165
0.5702 1.0 0.0208
0.5673 1.0 0.0165

Usare LightGBM per eseguire il training di un modello

from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

train_data = featurizer.transform(df)["Bankrupt?", "features"]

model = (
    LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?")
    .setEarlyStoppingRound(300)
    .setLambdaL1(0.5)
    .setNumIterations(1000)
    .setNumThreads(-1)
    .setMaxDeltaStep(0.5)
    .setNumLeaves(31)
    .setMaxDepth(-1)
    .setBaggingFraction(0.7)
    .setFeatureFraction(0.7)
    .setBaggingFreq(2)
    .setObjective("binary")
    .setIsUnbalance(True)
    .setMinSumHessianInLeaf(20)
    .setMinGainToSplit(0.01)
)

model = model.fit(train_data)

Convertire il modello in formato ONNX

Il codice seguente esporta il modello sottoposto a training in un booster LightGBM e quindi lo converte in formato ONNX:

import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier


def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
    from onnxmltools.convert import convert_lightgbm
    from onnxconverter_common.data_types import FloatTensorType

    initial_types = [("input", FloatTensorType([-1, input_size]))]
    onnx_model = convert_lightgbm(
        lgbm_model, initial_types=initial_types, target_opset=9
    )
    return onnx_model.SerializeToString()


booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))

Dopo la conversione, caricare il payload ONNX in un oggetto ONNXModel ed esaminare gli input e gli output del modello:

from synapse.ml.onnx import ONNXModel

onnx_ml = ONNXModel().setModelPayload(model_payload_ml)

print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))

Eseguire il mapping dell'input del modello al nome della colonna del dataframe di input (FeedDict) ed eseguire il mapping dei nomi delle colonne del dataframe di output agli output del modello (FetchDict).

onnx_ml = (
    onnx_ml.setDeviceType("CPU")
    .setFeedDict({"input": "features"})
    .setFetchDict({"probability": "probabilities", "prediction": "label"})
    .setMiniBatchSize(5000)
)

Usare il modello per l'inferenza

Per eseguire l'inferenza con il modello, il codice seguente crea dati di test e trasforma i dati tramite il modello ONNX.

from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np

n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
    VectorAssembler()
    .setInputCols(cols)
    .setOutputCol("features")
    .transform(testDf)
    .drop(*cols)
    .cache()
)

display(onnx_ml.transform(testDf))

L'output dovrebbe essere simile alla tabella seguente, anche se i valori e il numero di righe possono differire:

Indice Funzionalità Previsione Probability
1 "{"type":1,"values":[0.105... 0 "{"0":0.835...
2 "{"type":1,"values":[0.814... 0 "{"0":0.658...