Udostępnij za pośrednictwem


Wnioskowanie ONNX na platformie Spark

W tym przykładzie wytrenujesz model LightGBM i przekonwertujesz model na format ONNX . Po przekonwertowaniu modelu użyjesz modelu, aby wywnioskować niektóre dane testowe na platformie Spark.

W tym przykładzie użyto następujących pakietów i wersji języka Python:

  • onnxmltools==1.7.0
  • lightgbm==3.2.1

Wymagania wstępne

  • Dołącz notes do magazynu lakehouse. Po lewej stronie wybierz pozycję Dodaj , aby dodać istniejący obiekt lakehouse lub utworzyć jezioro.
  • Może być konieczne zainstalowanie onnxmltools , dodając !pip install onnxmltools==1.7.0 w komórce kodu, a następnie uruchamiając komórkę.

Ładowanie przykładowych danych

Aby załadować przykładowe dane, dodaj następujące przykłady kodu do komórek w notesie, a następnie uruchom komórki:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)

display(df)

Dane wyjściowe powinny wyglądać podobnie do poniższej tabeli, ale wartości i liczba wierszy mogą się różnić:

Współczynnik pokrycia odsetek Flaga dochodu netto Kapitał własny do odpowiedzialności
0.5641 1.0 0.0165
0.5702 1.0 0.0208
0.5673 1.0 0.0165

Trenowanie modelu przy użyciu programu LightGBM

from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

train_data = featurizer.transform(df)["Bankrupt?", "features"]

model = (
    LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?", dataTransferMode="bulk")
    .setEarlyStoppingRound(300)
    .setLambdaL1(0.5)
    .setNumIterations(1000)
    .setNumThreads(-1)
    .setMaxDeltaStep(0.5)
    .setNumLeaves(31)
    .setMaxDepth(-1)
    .setBaggingFraction(0.7)
    .setFeatureFraction(0.7)
    .setBaggingFreq(2)
    .setObjective("binary")
    .setIsUnbalance(True)
    .setMinSumHessianInLeaf(20)
    .setMinGainToSplit(0.01)
)

model = model.fit(train_data)

Konwertowanie modelu na format ONNX

Poniższy kod eksportuje wytrenowany model do wzmacniacza LightGBM, a następnie konwertuje go na format ONNX:

import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier


def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
    from onnxmltools.convert import convert_lightgbm
    from onnxconverter_common.data_types import FloatTensorType

    initial_types = [("input", FloatTensorType([-1, input_size]))]
    onnx_model = convert_lightgbm(
        lgbm_model, initial_types=initial_types, target_opset=9
    )
    return onnx_model.SerializeToString()


booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))

Po konwersji załaduj ładunek ONNX do elementu ONNXModel i sprawdź dane wejściowe i wyjściowe modelu:

from synapse.ml.onnx import ONNXModel

onnx_ml = ONNXModel().setModelPayload(model_payload_ml)

print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))

Zamapuj dane wejściowe modelu na nazwę kolumny ramki danych wejściowych (FeedDict) i zamapuj nazwy kolumn wyjściowej ramki danych na dane wyjściowe modelu (FetchDict).

onnx_ml = (
    onnx_ml.setDeviceType("CPU")
    .setFeedDict({"input": "features"})
    .setFetchDict({"probability": "probabilities", "prediction": "label"})
    .setMiniBatchSize(5000)
)

Używanie modelu do wnioskowania

Aby przeprowadzić wnioskowanie za pomocą modelu, poniższy kod tworzy dane testowe i przekształca dane za pomocą modelu ONNX.

from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np

n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
    VectorAssembler()
    .setInputCols(cols)
    .setOutputCol("features")
    .transform(testDf)
    .drop(*cols)
    .cache()
)

display(onnx_ml.transform(testDf))

Dane wyjściowe powinny wyglądać podobnie do poniższej tabeli, ale wartości i liczba wierszy mogą się różnić:

Indeks Funkcje Przewidywanie Prawdopodobieństwo
1 "{"type":1,"values":[0.105... 0 "{"0":0.835...
2 "{"type":1,"values":[0.814... 0 "{"0":0.658...