ONNX-inferens på Spark

I dette eksempel oplærer du en LightGBM-model og konverterer modellen til ONNX-format . Når den er konverteret, kan du bruge modellen til at udlede nogle testdata på Spark.

I dette eksempel bruges følgende Python-pakker og -versioner:

  • onnxmltools==1.7.0
  • lightgbm==3.2.1

Forudsætninger

  • Vedhæft din notesbog til et lakehouse. I venstre side skal du vælge Tilføj for at tilføje et eksisterende lakehouse eller oprette et lakehouse.
  • Du skal muligvis installere onnxmltools ved at tilføje !pip install onnxmltools==1.7.0 en kodecelle og derefter køre cellen.

Indlæs eksempeldataene

Hvis du vil indlæse eksempeldataene, skal du føje følgende kodeeksempler til cellerne i notesbogen og derefter køre cellerne:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)

display(df)

Outputtet skal ligne følgende tabel, selvom værdierne og antallet af rækker kan variere:

Rentedækningsforhold Flag for nettoindkomst Egenkapital til ansvar
0.5641 1.0 0.0165
0.5702 1.0 0.0208
0.5673 1.0 0.0165

Brug LightGBM til at oplære en model

from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

train_data = featurizer.transform(df)["Bankrupt?", "features"]

model = (
    LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?")
    .setEarlyStoppingRound(300)
    .setLambdaL1(0.5)
    .setNumIterations(1000)
    .setNumThreads(-1)
    .setMaxDeltaStep(0.5)
    .setNumLeaves(31)
    .setMaxDepth(-1)
    .setBaggingFraction(0.7)
    .setFeatureFraction(0.7)
    .setBaggingFreq(2)
    .setObjective("binary")
    .setIsUnbalance(True)
    .setMinSumHessianInLeaf(20)
    .setMinGainToSplit(0.01)
)

model = model.fit(train_data)

Konvertér modellen til ONNX-format

Følgende kode eksporterer den oplærte model til en LightGBM-booster og konverterer den derefter til ONNX-format:

import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier


def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
    from onnxmltools.convert import convert_lightgbm
    from onnxconverter_common.data_types import FloatTensorType

    initial_types = [("input", FloatTensorType([-1, input_size]))]
    onnx_model = convert_lightgbm(
        lgbm_model, initial_types=initial_types, target_opset=9
    )
    return onnx_model.SerializeToString()


booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))

Efter konvertering skal du indlæse ONNX-nyttedataene i en ONNXModel og undersøge modelinput og -output:

from synapse.ml.onnx import ONNXModel

onnx_ml = ONNXModel().setModelPayload(model_payload_ml)

print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))

Knyt modelinputtet til inputdatarammens kolonnenavn (FeedDict), og knyt outputdatarammens kolonnenavne til modeloutputtet (FetchDict).

onnx_ml = (
    onnx_ml.setDeviceType("CPU")
    .setFeedDict({"input": "features"})
    .setFetchDict({"probability": "probabilities", "prediction": "label"})
    .setMiniBatchSize(5000)
)

Brug modellen til at udlede

For at kunne udlede modellen opretter følgende kode testdata og transformerer dataene via ONNX-modellen.

from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np

n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
    VectorAssembler()
    .setInputCols(cols)
    .setOutputCol("features")
    .transform(testDf)
    .drop(*cols)
    .cache()
)

display(onnx_ml.transform(testDf))

Outputtet skal ligne følgende tabel, selvom værdierne og antallet af rækker kan variere:

Indeks Funktioner Forudsigelse Probability
0 "{"type":1,"values":[0.105... 0 "{"0":0.835...
2 "{"type":1,"values":[0.814... 0 "{"0":0.658...