Compartir a través de


Inferencia de ONNX en Spark

En este ejemplo, entrenará un modelo de LightGBM y convertirá el modelo en formato ONNX . Una vez convertido, use el modelo para inferir algunos datos de prueba en Spark.

En este ejemplo se usan los siguientes paquetes y versiones de Python:

  • onnxmltools==1.7.0
  • lightgbm==3.2.1

Requisitos previos

  • Adjunte el bloc de notas a una casa de lago. En el lado izquierdo, seleccione Añadir para añadir un almacén de lago existente o crear uno.
  • Es posible que tenga que instalar onnxmltools agregando !pip install onnxmltools==1.7.0 una celda de código y, a continuación, ejecutando la celda.

Carga de los datos de ejemplo

Para cargar los datos de ejemplo, agregue los ejemplos de código siguientes a las celdas del cuaderno y, a continuación, ejecute las celdas:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)

display(df)

La salida debe ser similar a la tabla siguiente, aunque los valores y el número de filas pueden diferir:

Relación de cobertura de interés Marca de ingresos netos Equidad en responsabilidad
0.5641 1.0 0.0165
0.5702 1.0 0.0208
0.5673 1.0 0.0165

Uso de LightGBM para entrenar un modelo

from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

train_data = featurizer.transform(df)["Bankrupt?", "features"]

model = (
    LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?", dataTransferMode="bulk")
    .setEarlyStoppingRound(300)
    .setLambdaL1(0.5)
    .setNumIterations(1000)
    .setNumThreads(-1)
    .setMaxDeltaStep(0.5)
    .setNumLeaves(31)
    .setMaxDepth(-1)
    .setBaggingFraction(0.7)
    .setFeatureFraction(0.7)
    .setBaggingFreq(2)
    .setObjective("binary")
    .setIsUnbalance(True)
    .setMinSumHessianInLeaf(20)
    .setMinGainToSplit(0.01)
)

model = model.fit(train_data)

Conversión del modelo al formato ONNX

El siguiente código exporta el modelo entrenado a un amplificador de LightGBM y después lo convierte a formato ONNX:

import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier


def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
    from onnxmltools.convert import convert_lightgbm
    from onnxconverter_common.data_types import FloatTensorType

    initial_types = [("input", FloatTensorType([-1, input_size]))]
    onnx_model = convert_lightgbm(
        lgbm_model, initial_types=initial_types, target_opset=9
    )
    return onnx_model.SerializeToString()


booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))

Tras la conversión, cargue la carga útil de ONNX en un ONNXModel e inspeccione las entradas y salidas del modelo:

from synapse.ml.onnx import ONNXModel

onnx_ml = ONNXModel().setModelPayload(model_payload_ml)

print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))

Asigne la entrada del modelo al nombre de columna del dataframe de entrada (FeedDict), y asigne los nombres de columna del dataframe de salida a las salidas del modelo (FetchDict).

onnx_ml = (
    onnx_ml.setDeviceType("CPU")
    .setFeedDict({"input": "features"})
    .setFetchDict({"probability": "probabilities", "prediction": "label"})
    .setMiniBatchSize(5000)
)

Uso del modelo para la inferencia

Para realizar inferencias con el modelo, el código siguiente crea datos de prueba y transforma los datos a través del modelo ONNX.

from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np

n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
    VectorAssembler()
    .setInputCols(cols)
    .setOutputCol("features")
    .transform(testDf)
    .drop(*cols)
    .cache()
)

display(onnx_ml.transform(testDf))

La salida debe ser similar a la tabla siguiente, aunque los valores y el número de filas pueden diferir:

Índice Características Predicción Probabilidad
1 "{"type":1,"values":[0.105... 0 "{"0":0.835...
2 "{"type":1,"values":[0.814... 0 "{"0":0.658...