Zdieľať cez


Inferencia ONNX k Spark

V tomto príklade trénujete model LightGBM a skonvertujete tento model do formátu ONNX . Po konvertovaní môžete pomocou modelu odvodiť niektoré testovacie údaje v službe Spark.

V tomto príklade sa používajú tieto balíky a verzie jazyka Python:

  • onnxmltools==1.7.0
  • lightgbm==3.2.1

Požiadavky

  • Pripojte svoj notebook k jazeru. Na ľavej strane vyberte položku Pridať a pridajte existujúce jazero alebo vytvorte jazero.
  • Možno bude potrebné nainštalovať .onnxmltools Ak to chcete urobiť, pridajte do !pip install onnxmltools==1.7.0 bunky kódu poznámkového bloku a potom túto bunku spustite.
  • Možno bude potrebné nainštalovať .lightgbm Ak to chcete urobiť, pridajte do !pip install lightgbm==3.2.1 bunky kódu poznámkového bloku a potom túto bunku spustite.

Načítanie vzorového údajov

Ak chcete načítať vzorové údaje, pridajte tieto príklady kódu do buniek v poznámkovom bloku a potom spustite tieto bunky:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)

display(df)

Výstup by mal vyzerať podobne ako v nasledujúcej tabuľke. Zobrazené konkrétne stĺpce, počet riadkov a skutočné hodnoty v tabuľke sa môžu líšiť:

Pomer pokrytia úrokov Príznak Čistý príjem Vlastné imanie na zodpovednosť
0.5641 1.0 0.0165
0.5702 1.0 0.0208
0.5673 1.0 0.0165

Trénovať model pomocou lightGBM

from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

train_data = featurizer.transform(df)["Bankrupt?", "features"]

model = (
    LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?", dataTransferMode="bulk")
    .setEarlyStoppingRound(300)
    .setLambdaL1(0.5)
    .setNumIterations(1000)
    .setNumThreads(-1)
    .setMaxDeltaStep(0.5)
    .setNumLeaves(31)
    .setMaxDepth(-1)
    .setBaggingFraction(0.7)
    .setFeatureFraction(0.7)
    .setBaggingFreq(2)
    .setObjective("binary")
    .setIsUnbalance(True)
    .setMinSumHessianInLeaf(20)
    .setMinGainToSplit(0.01)
)

model = model.fit(train_data)

Konverzia modelu do formátu ONNX

Nasledujúci kód exportuje trénovaný model do posilňovača LightGBM a potom skonvertuje model do formátu ONNX:

import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier


def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
    from onnxmltools.convert import convert_lightgbm
    from onnxconverter_common.data_types import FloatTensorType

    initial_types = [("input", FloatTensorType([-1, input_size]))]
    onnx_model = convert_lightgbm(
        lgbm_model, initial_types=initial_types, target_opset=9
    )
    return onnx_model.SerializeToString()


booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))

Po konverzii načítajte údajovú ONNXModelčasť ONNX do a skontrolujte vstupy a výstupy modelu:

from synapse.ml.onnx import ONNXModel

onnx_ml = ONNXModel().setModelPayload(model_payload_ml)

print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))

Priraďte vstup modelu k názvu stĺpca (FeedDict) vstupného údajového rámca a priraďte názvy stĺpcov výstupného údajového rámca k výstupom modelu (FetchDict):

onnx_ml = (
    onnx_ml.setDeviceType("CPU")
    .setFeedDict({"input": "features"})
    .setFetchDict({"probability": "probabilities", "prediction": "label"})
    .setMiniBatchSize(5000)
)

Použitie modelu na odvodenie

Nasledujúci kód vytvorí testovacie údaje a transformuje údaje prostredníctvom modelu ONNX na vykonanie záverov v modeli ONNX:

from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np

n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
    VectorAssembler()
    .setInputCols(cols)
    .setOutputCol("features")
    .transform(testDf)
    .drop(*cols)
    .cache()
)

display(onnx_ml.transform(testDf))

Výstup by mal vyzerať podobne ako v nasledujúcej tabuľke, aj keď hodnoty a počet riadkov sa môžu líšiť:

Index Súčasti Predpoveď Probability
1 "{"type":1,"values":[0.105... 0 "{"0":0.835...
2 "{"type":1,"values":[0.814... 0 "{"0":0.658...