Spark에서 ONNX 유추
이 예시에서는 LightGBM 모델을 학습시키고 모델을 ONNX 형식으로 변환합니다. 변환된 후에는 모델을 사용하여 Spark에서 일부 테스트 데이터를 유추합니다.
이 예시에서는 다음 Python 패키지 및 버전을 사용합니다.
onnxmltools==1.7.0
lightgbm==3.2.1
필수 조건
- 레이크하우스에 Notebook을 첨부합니다. 왼쪽에서 추가를 선택하여 기존 레이크하우스를 추가하거나 레이크하우스를 만듭니다.
- 코드 셀에
!pip install onnxmltools==1.7.0
(을)를 추가하고 셀을 실행하여onnxmltools
(을)를 설치해야 할 수 있습니다.
예시 데이터 로드
예시 데이터를 로드하려면 Notebook의 셀에 다음 코드 예제를 추가한 다음 셀을 실행합니다.
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
from synapse.ml.core.platform import *
df = (
spark.read.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
)
)
display(df)
값과 행 수가 다를 수 있지만 출력은 다음 표와 유사하게 표시됩니다.
이자 적용 비율 | 당기순이익 플래그 | 책임에 대한 지분 |
---|---|---|
0.5641 | 1.0 | 0.0165 |
0.5702 | 1.0 | 0.0208 |
0.5673 | 1.0 | 0.0165 |
LightGBM을 사용하여 모델 학습
from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier
feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(df)["Bankrupt?", "features"]
model = (
LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?", dataTransferMode="bulk")
.setEarlyStoppingRound(300)
.setLambdaL1(0.5)
.setNumIterations(1000)
.setNumThreads(-1)
.setMaxDeltaStep(0.5)
.setNumLeaves(31)
.setMaxDepth(-1)
.setBaggingFraction(0.7)
.setFeatureFraction(0.7)
.setBaggingFreq(2)
.setObjective("binary")
.setIsUnbalance(True)
.setMinSumHessianInLeaf(20)
.setMinGainToSplit(0.01)
)
model = model.fit(train_data)
모델을 ONNX로 변환
다음 코드는 학습된 모델을 LightGBM 부스터로 내보낸 다음 ONNX 형식으로 변환합니다.
import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier
def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
from onnxmltools.convert import convert_lightgbm
from onnxconverter_common.data_types import FloatTensorType
initial_types = [("input", FloatTensorType([-1, input_size]))]
onnx_model = convert_lightgbm(
lgbm_model, initial_types=initial_types, target_opset=9
)
return onnx_model.SerializeToString()
booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))
변환 후 ONNX 페이로드를 ONNXModel
(으)로 로드하고 모델 입력 및 출력을 검사합니다.
from synapse.ml.onnx import ONNXModel
onnx_ml = ONNXModel().setModelPayload(model_payload_ml)
print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))
모델 입력을 입력 데이터 프레임의 열 이름(FeedDict)에 매핑하고 출력 데이터 프레임의 열 이름을 모델 출력(FetchDict)에 매핑합니다.
onnx_ml = (
onnx_ml.setDeviceType("CPU")
.setFeedDict({"input": "features"})
.setFetchDict({"probability": "probabilities", "prediction": "label"})
.setMiniBatchSize(5000)
)
유추에 모델 사용하기
모델을 사용하여 유추를 수행하기 위해 다음 코드는 테스트 데이터를 만들고 ONNX 모델을 통해 데이터를 변환합니다.
from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np
n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
VectorAssembler()
.setInputCols(cols)
.setOutputCol("features")
.transform(testDf)
.drop(*cols)
.cache()
)
display(onnx_ml.transform(testDf))
값과 행 수가 다를 수 있지만 출력은 다음 표와 유사하게 표시됩니다.
인덱스 | 기능 | 예측 | 확률 |
---|---|---|---|
1 | "{"type":1,"values":[0.105... |
0 | "{"0":0.835... |
2 | "{"type":1,"values":[0.814... |
0 | "{"0":0.658... |