Spark에서 ONNX 유추
이 예제에서는 LightGBM 모델을 학습시키고 모델을 ONNX 형식으로 변환합니다. 변환된 후에는 모델을 사용하여 Spark에서 일부 테스트 데이터를 유추합니다.
이 예제에서는 다음 Python 패키지 및 버전을 사용합니다.
onnxmltools==1.7.0
lightgbm==3.2.1
필수 조건
- 레이크하우스에 전자 필기장을 첨부합니다. 왼쪽에서 추가를 선택하여 기존 레이크하우스를 추가하거나 레이크하우스를 만듭니다.
- 코드 셀에 추가
!pip install onnxmltools==1.7.0
한 다음 셀을 실행하여 설치onnxmltools
해야 할 수 있습니다.
예제 데이터 로드
예제 데이터를 로드하려면 Notebook의 셀에 다음 코드 예제를 추가한 다음 셀을 실행합니다.
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
from synapse.ml.core.platform import *
df = (
spark.read.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
)
)
display(df)
값과 행 수가 다를 수 있지만 출력은 다음 표와 유사하게 표시됩니다.
이자 적용 비율 | 당기순이익 플래그 | 책임에 대한 지분 |
---|---|---|
0.5641 | 1.0 | 0.0165 |
0.5702 | 1.0 | 0.0208 |
0.5673 | 1.0 | 0.0165 |
LightGBM을 사용하여 모델 학습
from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier
feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(df)["Bankrupt?", "features"]
model = (
LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?")
.setEarlyStoppingRound(300)
.setLambdaL1(0.5)
.setNumIterations(1000)
.setNumThreads(-1)
.setMaxDeltaStep(0.5)
.setNumLeaves(31)
.setMaxDepth(-1)
.setBaggingFraction(0.7)
.setFeatureFraction(0.7)
.setBaggingFreq(2)
.setObjective("binary")
.setIsUnbalance(True)
.setMinSumHessianInLeaf(20)
.setMinGainToSplit(0.01)
)
model = model.fit(train_data)
모델을 ONNX 형식으로 변환
다음 코드는 학습된 모델을 LightGBM 부스터로 내보낸 다음 ONNX 형식으로 변환합니다.
import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier
def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
from onnxmltools.convert import convert_lightgbm
from onnxconverter_common.data_types import FloatTensorType
initial_types = [("input", FloatTensorType([-1, input_size]))]
onnx_model = convert_lightgbm(
lgbm_model, initial_types=initial_types, target_opset=9
)
return onnx_model.SerializeToString()
booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))
변환 후 ONNX 페이로드를 ONNXModel
로드하고 모델 입력 및 출력을 검사합니다.
from synapse.ml.onnx import ONNXModel
onnx_ml = ONNXModel().setModelPayload(model_payload_ml)
print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))
모델 입력을 입력 데이터 프레임의 열 이름(FeedDict)에 매핑하고 출력 데이터 프레임의 열 이름을 모델 출력(FetchDict)에 매핑합니다.
onnx_ml = (
onnx_ml.setDeviceType("CPU")
.setFeedDict({"input": "features"})
.setFetchDict({"probability": "probabilities", "prediction": "label"})
.setMiniBatchSize(5000)
)
유추를 위해 모델 사용
모델을 사용하여 유추를 수행하기 위해 다음 코드는 테스트 데이터를 만들고 ONNX 모델을 통해 데이터를 변환합니다.
from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np
n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
VectorAssembler()
.setInputCols(cols)
.setOutputCol("features")
.transform(testDf)
.drop(*cols)
.cache()
)
display(onnx_ml.transform(testDf))
값과 행 수가 다를 수 있지만 출력은 다음 표와 유사하게 표시됩니다.
인덱스 | 기능 | 예측 | 확률 |
---|---|---|---|
1 | "{"type":1,"values":[0.105... |
0 | "{"0":0.835... |
2 | "{"type":1,"values":[0.814... |
0 | "{"0":0.658... |
관련 콘텐츠
피드백
https://aka.ms/ContentUserFeedback
출시 예정: 2024년 내내 콘텐츠에 대한 피드백 메커니즘으로 GitHub 문제를 단계적으로 폐지하고 이를 새로운 피드백 시스템으로 바꿀 예정입니다. 자세한 내용은 다음을 참조하세요.다음에 대한 사용자 의견 제출 및 보기