教學課程第 4 部分:執行批次評分並將預測儲存至 Lakehouse
在本教學課程中,您將了解如何匯入使用 Microsoft Fabric MLflow 模型登錄在第 3 部分中進行訓練的已註冊 LightGBMClassifier 模型,以及如何在從 Lakehouse 載入的測試資料集上執行批次預測。
Microsoft Fabric 可讓您使用稱為 PREDICT 的可調整函數來運作機器學習模型,該函數支援任何計算引擎中的批次評分。 您可以直接從 Microsoft Fabric 筆記本或指定的模型項目頁面產生批次預測。 了解 PREDICT。
若要在測試資料集上產生批次預測,您將使用版本 1 已訓練的 LightGBM 模型,以展示所有已訓練機器學習模型的最佳效能。 您會將測試資料集載入 Spark DataFrame,並建立 MLFlowTransformer 物件來產生批次預測。 然後,您可以使用下列三種方式之一叫用 PREDICT 函數:
- SynapseML 的轉換器 API
- Spark SQL API
- PySpark 使用者定義函數 (UDF)
必要條件
取得 Microsoft Fabric 訂用帳戶。 或註冊免費的 Microsoft Fabric 試用版。
登入 Microsoft Fabric。
使用首頁左下方的體驗切換器,切換至 Fabric。
這是教學課程系列 5 部分中的第 4 部分。 若要完成本教學課程,請先完成:
- 第 1 部分:使用 Apache Spark 將資料內嵌至 Microsoft Fabric Lakehouse。
- 第 2 部分:使用 Microsoft Fabric 筆記本探索和視覺化資料,以深入了解資料。
- 第 3 部分:訓練和註冊機器學習模型。
遵循筆記本中的指示
4-predict.ipynb 是本教學課程隨附的筆記本。
若要開啟本教學課程隨附的筆記本,請遵循 準備系統以進行數據科學教學課程中的指示, 將筆記本匯入工作區。
如果您想要複製並貼上此頁面中的程式碼,則可以建立新的筆記本。
開始執行程式碼之前,請務必將 Lakehouse 連結至筆記本。
重要
連結您在此系列的其他部分中所使用的相同 Lakehouse。
載入測試資料
載入您在第 3 部分中儲存的測試資料。
df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)
具有轉換器 API 的 PREDICT
若要使用來自 SynapseML 的 Transformer API,您需要先建立 MLFlowTransformer 物件。
具現化 MLFlowTransformer 物件
MLFlowTransformer 物件是您在第 3 部分註冊之 MLFlow 模型周圍的包裝函式。 它可讓您在指定的 DataFrame 上產生批次預測。 若要具現化 MLFlowTransformer 物件,您需要提供下列參數:
- 測試 DataFrame 中需要作為模型輸入的資料行 (在此案例中,您需要全部資料行)。
- 新輸出資料行的名稱 (在此案例中為預測)。
- 產生預測的正確模型名稱和模型版本 (在此案例中,
lgbm_sm
和版本 1)。
from synapse.ml.predict import MLFlowTransformer
model = MLFlowTransformer(
inputCols=list(df_test.columns),
outputCol='predictions',
modelName='lgbm_sm',
modelVersion=1
)
現在您已擁有 MLFlowTransformer 物件,可以使用此物件來產生批次預測。
import pandas
predictions = model.transform(df_test)
display(predictions)
具有 Spark SQL API 的 PREDICT
下列程式碼會使用 Spark SQL API 叫用 PREDICT 函數。
from pyspark.ml.feature import SQLTransformer
# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns
sqlt = SQLTransformer().setStatement(
f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")
# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))
具有使用者定義函數 (UDF) 的 PREDICT
下列程式碼會使用 PySpark UDF 叫用 PREDICT 函數。
from pyspark.sql.functions import col, pandas_udf, udf, lit
# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns
display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))
請注意,您也可以從模型的項目頁面產生 PREDICT 程式碼。 了解 PREDICT。
將模型預測結果寫入 Lakehouse
產生批次預測之後,請將模型預測結果寫回至 Lakehouse。
# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")
後續步驟
繼續執行: