共用方式為


教學課程第 4 部分:執行批次評分並將預測儲存至 Lakehouse

在本教學課程中,您將了解如何匯入使用 Microsoft Fabric MLflow 模型登錄在第 3 部分中進行訓練的已註冊 LightGBMClassifier 模型,以及如何在從 Lakehouse 載入的測試資料集上執行批次預測。

Microsoft Fabric 可讓您使用稱為 PREDICT 的可調整函數來運作機器學習模型,該函數支援任何計算引擎中的批次評分。 您可以直接從 Microsoft Fabric 筆記本或指定的模型項目頁面產生批次預測。 了解 PREDICT

若要在測試資料集上產生批次預測,您將使用版本 1 已訓練的 LightGBM 模型,以展示所有已訓練機器學習模型的最佳效能。 您會將測試資料集載入 Spark DataFrame,並建立 MLFlowTransformer 物件來產生批次預測。 然後,您可以使用下列三種方式之一叫用 PREDICT 函數:

  • SynapseML 的轉換器 API
  • Spark SQL API
  • PySpark 使用者定義函數 (UDF)

必要條件

這是教學課程系列 5 部分中的第 4 部分。 若要完成本教學課程,請先完成:

遵循筆記本中的指示

4-predict.ipynb 是本教學課程隨附的筆記本。

重要

連結您在此系列的其他部分中所使用的相同 Lakehouse。

載入測試資料

載入您在第 3 部分中儲存的測試資料。

df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)

具有轉換器 API 的 PREDICT

若要使用來自 SynapseML 的 Transformer API,您需要先建立 MLFlowTransformer 物件。

具現化 MLFlowTransformer 物件

MLFlowTransformer 物件是您在第 3 部分註冊之 MLFlow 模型周圍的包裝函式。 它可讓您在指定的 DataFrame 上產生批次預測。 若要具現化 MLFlowTransformer 物件,您需要提供下列參數:

  • 測試 DataFrame 中需要作為模型輸入的資料行 (在此案例中,您需要全部資料行)。
  • 新輸出資料行的名稱 (在此案例中為預測)。
  • 產生預測的正確模型名稱和模型版本 (在此案例中,lgbm_sm 和版本 1)。
from synapse.ml.predict import MLFlowTransformer

model = MLFlowTransformer(
    inputCols=list(df_test.columns),
    outputCol='predictions',
    modelName='lgbm_sm',
    modelVersion=1
)

現在您已擁有 MLFlowTransformer 物件,可以使用此物件來產生批次預測。

import pandas

predictions = model.transform(df_test)
display(predictions)

具有 Spark SQL API 的 PREDICT

下列程式碼會使用 Spark SQL API 叫用 PREDICT 函數。

from pyspark.ml.feature import SQLTransformer 

# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns

sqlt = SQLTransformer().setStatement( 
    f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")

# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))

具有使用者定義函數 (UDF) 的 PREDICT

下列程式碼會使用 PySpark UDF 叫用 PREDICT 函數。

from pyspark.sql.functions import col, pandas_udf, udf, lit

# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns

display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))

請注意,您也可以從模型的項目頁面產生 PREDICT 程式碼。 了解 PREDICT

將模型預測結果寫入 Lakehouse

產生批次預測之後,請將模型預測結果寫回至 Lakehouse。

# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")

後續步驟

繼續執行: