Spark에서 ONNX 유추

이 예제에서는 LightGBM 모델을 학습시키고 모델을 ONNX 형식으로 변환합니다. 변환된 후에는 모델을 사용하여 Spark에서 일부 테스트 데이터를 유추합니다.

이 예제에서는 다음 Python 패키지 및 버전을 사용합니다.

  • onnxmltools==1.7.0
  • lightgbm==3.2.1

필수 조건

  • 레이크하우스에 전자 필기장을 첨부합니다. 왼쪽에서 추가를 선택하여 기존 레이크하우스를 추가하거나 레이크하우스를 만듭니다.
  • 코드 셀에 추가 !pip install onnxmltools==1.7.0 한 다음 셀을 실행하여 설치 onnxmltools 해야 할 수 있습니다.

예제 데이터 로드

예제 데이터를 로드하려면 Notebook의 셀에 다음 코드 예제를 추가한 다음 셀을 실행합니다.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)

display(df)

값과 행 수가 다를 수 있지만 출력은 다음 표와 유사하게 표시됩니다.

이자 적용 비율 당기순이익 플래그 책임에 대한 지분
0.5641 1.0 0.0165
0.5702 1.0 0.0208
0.5673 1.0 0.0165

LightGBM을 사용하여 모델 학습

from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

train_data = featurizer.transform(df)["Bankrupt?", "features"]

model = (
    LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?")
    .setEarlyStoppingRound(300)
    .setLambdaL1(0.5)
    .setNumIterations(1000)
    .setNumThreads(-1)
    .setMaxDeltaStep(0.5)
    .setNumLeaves(31)
    .setMaxDepth(-1)
    .setBaggingFraction(0.7)
    .setFeatureFraction(0.7)
    .setBaggingFreq(2)
    .setObjective("binary")
    .setIsUnbalance(True)
    .setMinSumHessianInLeaf(20)
    .setMinGainToSplit(0.01)
)

model = model.fit(train_data)

모델을 ONNX 형식으로 변환

다음 코드는 학습된 모델을 LightGBM 부스터로 내보낸 다음 ONNX 형식으로 변환합니다.

import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier


def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
    from onnxmltools.convert import convert_lightgbm
    from onnxconverter_common.data_types import FloatTensorType

    initial_types = [("input", FloatTensorType([-1, input_size]))]
    onnx_model = convert_lightgbm(
        lgbm_model, initial_types=initial_types, target_opset=9
    )
    return onnx_model.SerializeToString()


booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))

변환 후 ONNX 페이로드를 ONNXModel 로드하고 모델 입력 및 출력을 검사합니다.

from synapse.ml.onnx import ONNXModel

onnx_ml = ONNXModel().setModelPayload(model_payload_ml)

print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))

모델 입력을 입력 데이터 프레임의 열 이름(FeedDict)에 매핑하고 출력 데이터 프레임의 열 이름을 모델 출력(FetchDict)에 매핑합니다.

onnx_ml = (
    onnx_ml.setDeviceType("CPU")
    .setFeedDict({"input": "features"})
    .setFetchDict({"probability": "probabilities", "prediction": "label"})
    .setMiniBatchSize(5000)
)

유추를 위해 모델 사용

모델을 사용하여 유추를 수행하기 위해 다음 코드는 테스트 데이터를 만들고 ONNX 모델을 통해 데이터를 변환합니다.

from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np

n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
    VectorAssembler()
    .setInputCols(cols)
    .setOutputCol("features")
    .transform(testDf)
    .drop(*cols)
    .cache()
)

display(onnx_ml.transform(testDf))

값과 행 수가 다를 수 있지만 출력은 다음 표와 유사하게 표시됩니다.

인덱스 기능 예측 확률
1 "{"type":1,"values":[0.105... 0 "{"0":0.835...
2 "{"type":1,"values":[0.814... 0 "{"0":0.658...