Inferenza ONNX in Spark
In questo esempio si esegue il training di un modello LightGBM e si converte il modello in formato ONNX. Dopo la conversione, si usa il modello per dedurre alcuni dati di test in Spark.
Questo esempio usa i pacchetti e le versioni Python seguenti:
onnxmltools==1.7.0
lightgbm==3.2.1
Prerequisiti
- Collegare il notebook a un lakehouse. Sul lato sinistro, selezionare Aggiungi per aggiungere un lakehouse esistente o creare un lakehouse.
- Potrebbe essere necessario eseguire l'installazione di
onnxmltools
aggiungendo!pip install onnxmltools==1.7.0
in una cella di codice e quindi eseguendo la cella.
Caricare i dati di esempio
Per caricare i dati di esempio, aggiungere gli esempi di codice seguenti alle celle del notebook e quindi eseguire le celle:
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
from synapse.ml.core.platform import *
df = (
spark.read.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
)
)
display(df)
L'output dovrebbe essere simile alla tabella seguente, anche se i valori e il numero di righe possono differire:
Rapporto copertura interessi | Contrassegno del reddito netto | Capitale azionario in responsabilità |
---|---|---|
0,5641 | 1.0 | 0,0165 |
0,5702 | 1.0 | 0,0208 |
0,5673 | 1.0 | 0,0165 |
Usare LightGBM per eseguire il training di un modello
from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier
feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(df)["Bankrupt?", "features"]
model = (
LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?", dataTransferMode="bulk")
.setEarlyStoppingRound(300)
.setLambdaL1(0.5)
.setNumIterations(1000)
.setNumThreads(-1)
.setMaxDeltaStep(0.5)
.setNumLeaves(31)
.setMaxDepth(-1)
.setBaggingFraction(0.7)
.setFeatureFraction(0.7)
.setBaggingFreq(2)
.setObjective("binary")
.setIsUnbalance(True)
.setMinSumHessianInLeaf(20)
.setMinGainToSplit(0.01)
)
model = model.fit(train_data)
Convertire il modello in formato ONNX
Il codice seguente esporta il modello sottoposto a training in un booster LightGBM e quindi lo converte in formato ONNX:
import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier
def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
from onnxmltools.convert import convert_lightgbm
from onnxconverter_common.data_types import FloatTensorType
initial_types = [("input", FloatTensorType([-1, input_size]))]
onnx_model = convert_lightgbm(
lgbm_model, initial_types=initial_types, target_opset=9
)
return onnx_model.SerializeToString()
booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))
Dopo la conversione, caricare il payload ONNX in un oggetto ONNXModel
ed esaminare gli input e gli output del modello:
from synapse.ml.onnx import ONNXModel
onnx_ml = ONNXModel().setModelPayload(model_payload_ml)
print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))
Eseguire il mapping dell'input del modello al nome della colonna del dataframe di input (FeedDict) ed eseguire il mapping dei nomi delle colonne del dataframe di output agli output del modello (FetchDict).
onnx_ml = (
onnx_ml.setDeviceType("CPU")
.setFeedDict({"input": "features"})
.setFetchDict({"probability": "probabilities", "prediction": "label"})
.setMiniBatchSize(5000)
)
Uso del modello per l'inferenza
Per eseguire l'inferenza con il modello, il codice seguente crea dati di test e trasforma i dati tramite il modello ONNX.
from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np
n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
VectorAssembler()
.setInputCols(cols)
.setOutputCol("features")
.transform(testDf)
.drop(*cols)
.cache()
)
display(onnx_ml.transform(testDf))
L'output dovrebbe essere simile alla tabella seguente, anche se i valori e il numero di righe possono differire:
Indice | Funzionalità | Previsione | Probability |
---|---|---|---|
1 | "{"type":1,"values":[0.105... |
0 | "{"0":0.835... |
2 | "{"type":1,"values":[0.814... |
0 | "{"0":0.658... |