教程第 4 部分:执行批量评分并将预测保存到湖屋
本教程介绍如何使用 Microsoft Fabric MLflow 模型注册表导入第 3 部分中经过训练和注册的 LightGBMRegressor 模型,并针对从湖屋加载的测试数据集执行批量预测。
Microsoft Fabric 允许用户使用名为 PREDICT 的可缩放函数操作机器学习模型,该函数支持在任何计算引擎中进行批量评分。 可以直接从 Microsoft Fabric 笔记本或给定模型的项页生成批量预测。 了解 PREDICT。
若要基于测试数据集生成批量预测,将使用经过训练的 LightGBM 模型的版本 1,该版本模型在所有已训练的机器学习模型中性能最佳。 将测试数据集加载到 Spark DataFrame 中,并创建 MLFlowTransformer 对象以生成批量预测。 然后,可通过以下三种方式之一调用 PREDICT 函数:
- SynapseML 中的转换器 API
- Spark SQL API
- PySpark 用户定义函数 (UDF)
先决条件
获取 Microsoft Fabric 订阅。 或者注册免费的 Microsoft Fabric 试用版。
登录 Microsoft Fabric。
使用主页左侧的体验切换器切换到 Synapse 数据科学体验。
这是系列教程的第 4 部分(共 5 部分)。 若要完成本教程,请先完成:
- 第 1 部分:使用 Apache Spark 将数据引入 Microsoft Fabric 湖屋。
- 第 2 部分:使用 Microsoft Fabric 笔记本浏览并可视化数据,以详细了解数据。
- 第 3 部分:训练和注册机器学习模型。
在笔记本中继续操作
4-predict.ipynb 是本教程随附的笔记本。
若要打开本教程随附的笔记本,请按照让系统为数据科学做好准备教程中的说明操作,将该笔记本导入到工作区。
如果要从此页面复制并粘贴代码,则可以创建新的笔记本。
在开始运行代码之前,请务必将湖屋连接到笔记本。
重要
随附在此系列的其他部分中使用的同一个湖屋。
加载测试数据
加载第 3 部分中保存的测试数据。
df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)
使用转换器 API 的 PREDICT
若要使用 SynapseML 中的转换器 API,首先需要创建 MLFlowTransformer 对象。
实例化 MLFlowTransformer 对象
MLFlowTransformer 对象是在第 3 部分中注册的 MLFlow 模型周围的包装器。 它允许基于给定数据帧生成批量预测。 若要实例化 MLFlowTransformer 对象,需要提供以下参数:
- 测试数据帧中需要作为模型输入的列(在本例中,需要所有列)。
- 新输出列的名称(本例中为“预测”)。
- 用于生成预测的正确模型名称和模型版本(本例中为
lgbm_sm
和版本 1)。
from synapse.ml.predict import MLFlowTransformer
model = MLFlowTransformer(
inputCols=list(df_test.columns),
outputCol='predictions',
modelName='lgbm_sm',
modelVersion=1
)
有了 MLFlowTransformer 对象后,可以使用它来生成批量预测。
import pandas
predictions = model.transform(df_test)
display(predictions)
使用 Spark SQL API 的 PREDICT
以下代码使用 Spark SQL API 调用 PREDICT 函数。
from pyspark.ml.feature import SQLTransformer
# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns
sqlt = SQLTransformer().setStatement(
f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")
# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))
使用用户定义函数 (UDF) 的 PREDICT
以下代码使用 PySpark UDF 调用 PREDICT 函数。
from pyspark.sql.functions import col, pandas_udf, udf, lit
# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns
display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))
请注意,也可以从模型的项页生成 PREDICT 代码。 了解 PREDICT。
将模型预测结果写入湖屋
生成批量预测后,立即将模型预测结果写回到湖屋。
# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")
下一步
继续学习: