Tutorial Part 4: Perform batch scoring and save predictions to a lakehouse
In this tutorial, you'll learn to import the registered LightGBMClassifier model that was trained in part 3 using the Microsoft Fabric MLflow model registry, and perform batch predictions on a test dataset loaded from a lakehouse.
Microsoft Fabric allows you to operationalize machine learning models with a scalable function called PREDICT, which supports batch scoring in any compute engine. You can generate batch predictions directly from a Microsoft Fabric notebook or from a given model's item page. Learn about PREDICT.
To generate batch predictions on the test dataset, you'll use version 1 of the trained LightGBM model that demonstrated the best performance among all trained machine learning models. You'll load the test dataset into a spark DataFrame and create an MLFlowTransformer object to generate batch predictions. You can then invoke the PREDICT function using one of following three ways:
- Transformer API from SynapseML
- Spark SQL API
- PySpark user-defined function (UDF)
Prerequisites
Get a Microsoft Fabric subscription. Or, sign up for a free Microsoft Fabric trial.
Sign in to Microsoft Fabric.
Use the experience switcher on the bottom left side of your home page to switch to Fabric.
This part 4 of 5 in the tutorial series. To complete this tutorial, first complete:
- Part 1: Ingest data into a Microsoft Fabric lakehouse using Apache Spark.
- Part 2: Explore and visualize data using Microsoft Fabric notebooks to learn more about the data.
- Part 3: Train and register machine learning models.
Follow along in notebook
4-predict.ipynb is the notebook that accompanies this tutorial.
To open the accompanying notebook for this tutorial, follow the instructions in Prepare your system for data science tutorials to import the notebook to your workspace.
If you'd rather copy and paste the code from this page, you can create a new notebook.
Be sure to attach a lakehouse to the notebook before you start running code.
Important
Attach the same lakehouse you used in the other parts of this series.
Load the test data
Load the test data that you saved in Part 3.
df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)
PREDICT with the Transformer API
To use the Transformer API from SynapseML, you'll need to first create an MLFlowTransformer object.
Instantiate MLFlowTransformer object
The MLFlowTransformer object is a wrapper around the MLFlow model that you registered in Part 3. It allows you to generate batch predictions on a given DataFrame. To instantiate the MLFlowTransformer object, you'll need to provide the following parameters:
- The columns from the test DataFrame that you need as input to the model (in this case, you would need all of them).
- A name for the new output column (in this case, predictions).
- The correct model name and model version to generate the predictions (in this case,
lgbm_sm
and version 1).
from synapse.ml.predict import MLFlowTransformer
model = MLFlowTransformer(
inputCols=list(df_test.columns),
outputCol='predictions',
modelName='lgbm_sm',
modelVersion=1
)
Now that you have the MLFlowTransformer object, you can use it to generate batch predictions.
import pandas
predictions = model.transform(df_test)
display(predictions)
PREDICT with the Spark SQL API
The following code invokes the PREDICT function with the Spark SQL API.
from pyspark.ml.feature import SQLTransformer
# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns
sqlt = SQLTransformer().setStatement(
f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")
# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))
PREDICT with a user-defined function (UDF)
The following code invokes the PREDICT function with a PySpark UDF.
from pyspark.sql.functions import col, pandas_udf, udf, lit
# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns
display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))
Note that you can also generate PREDICT code from a model's item page. Learn about PREDICT.
Write model prediction results to the lakehouse
Once you have generated batch predictions, write the model prediction results back to the lakehouse.
# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")
Next step
Continue on to: