ONNX-Rückschluss in Spark
In diesem Beispiel trainieren Sie ein LightGBM-Modell und konvertieren es in das ONNX-Format. Nach dem Konvertieren verwenden Sie das Modell, um einige Testdaten in Spark abzuleiten.
In diesem Beispiel werden die folgenden Python-Pakete und -Versionen verwendet:
onnxmltools==1.7.0
lightgbm==3.2.1
Voraussetzungen
- Fügen Sie Ihr Notebook an ein Lakehouse an. Wählen Sie auf der linken Seite Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein Lakehouse zu erstellen.
- Möglicherweise müssen Sie
onnxmltools
installieren, indem Sie!pip install onnxmltools==1.7.0
einer Codezelle hinzufügen und dann die Zelle ausführen.
Laden der Beispieldaten
Um die Beispieldaten zu laden, fügen Sie den Zellen in Ihrem Notebook die folgenden Codebeispiele hinzu, und führen Sie dann die Zellen aus:
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
from synapse.ml.core.platform import *
df = (
spark.read.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
)
)
display(df)
Die Ausgabe sollte der folgenden Tabelle ähnlich sein, obwohl sich die Werte und die Anzahl der Zeilen unterscheiden können:
Kreditzinsdeckungsquote | Nettoeinkommensflag | Eigenkapital in Passiva |
---|---|---|
0,5641 | 1.0 | 0,0165 |
0,5702 | 1.0 | 0,0208 |
0,5673 | 1.0 | 0,0165 |
Verwenden von LightGBM zum Trainieren eines Modells
from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier
feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(df)["Bankrupt?", "features"]
model = (
LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?", dataTransferMode="bulk")
.setEarlyStoppingRound(300)
.setLambdaL1(0.5)
.setNumIterations(1000)
.setNumThreads(-1)
.setMaxDeltaStep(0.5)
.setNumLeaves(31)
.setMaxDepth(-1)
.setBaggingFraction(0.7)
.setFeatureFraction(0.7)
.setBaggingFreq(2)
.setObjective("binary")
.setIsUnbalance(True)
.setMinSumHessianInLeaf(20)
.setMinGainToSplit(0.01)
)
model = model.fit(train_data)
Konvertieren des Modells in das ONNX-Format
Mit dem folgenden Code wird das trainierte Modell in einen LightGBM-Booster exportiert und dann in das ONNX-Format konvertiert:
import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier
def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
from onnxmltools.convert import convert_lightgbm
from onnxconverter_common.data_types import FloatTensorType
initial_types = [("input", FloatTensorType([-1, input_size]))]
onnx_model = convert_lightgbm(
lgbm_model, initial_types=initial_types, target_opset=9
)
return onnx_model.SerializeToString()
booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))
Laden Sie nach dem Konvertieren die ONNX-Payload in ein ONNXModel
, und überprüfen Sie die Modelleingaben und -ausgaben:
from synapse.ml.onnx import ONNXModel
onnx_ml = ONNXModel().setModelPayload(model_payload_ml)
print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))
Ordnen Sie die Modelleingabe dem Spaltennamen (FeedDict) des Eingabedataframes zu und die Spaltennamen des Ausgabedataframes den Modellausgaben (FetchDict).
onnx_ml = (
onnx_ml.setDeviceType("CPU")
.setFeedDict({"input": "features"})
.setFetchDict({"probability": "probabilities", "prediction": "label"})
.setMiniBatchSize(5000)
)
Verwenden des Modells für Rückschlüsse
Um Rückschlüsse mit dem Modell festzustellen, werden mit dem folgenden Code Testdaten erstellt und über das ONNX-Modell transformiert.
from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np
n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
VectorAssembler()
.setInputCols(cols)
.setOutputCol("features")
.transform(testDf)
.drop(*cols)
.cache()
)
display(onnx_ml.transform(testDf))
Die Ausgabe sollte der folgenden Tabelle ähnlich sein, obwohl sich die Werte und die Anzahl der Zeilen unterscheiden können:
Index | Features | Vorhersage | Probability |
---|---|---|---|
1 | "{"type":1,"values":[0.105... |
0 | "{"0":0.835... |
2 | "{"type":1,"values":[0.814... |
0 | "{"0":0.658... |