Rediger

Del via


Deploy and run MLflow models in Spark jobs

In this article, learn how to deploy and run your MLflow model in Spark jobs to perform inference over large amounts of data or as part of data wrangling jobs.

About this example

This example shows how you can deploy an MLflow model registered in Azure Machine Learning to Spark jobs running in managed Spark clusters (preview), Azure Databricks, or Azure Synapse Analytics, to perform inference over large amounts of data.

The model is based on the UCI Heart Disease Data Set. The database contains 76 attributes, but we are using a subset of 14 of them. The model tries to predict the presence of heart disease in a patient. It is integer valued from 0 (no presence) to 1 (presence). It has been trained using an XGBBoost classifier and all the required preprocessing has been packaged as a scikit-learn pipeline, making this model an end-to-end pipeline that goes from raw data to predictions.

The information in this article is based on code samples contained in the azureml-examples repository. To run the commands locally without having to copy/paste files, clone the repo, and then change directories to sdk/using-mlflow/deploy.

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

Prerequisites

Before following the steps in this article, make sure you have the following prerequisites:

  • Install the MLflow SDK mlflow package and the Azure Machine Learning azureml-mlflow plugin for MLflow as follows:

    pip install mlflow azureml-mlflow
    

    Tip

    You can use the mlflow-skinny package, which is a lightweight MLflow package without SQL storage, server, UI, or data science dependencies. This package is recommended for users who primarily need the MLflow tracking and logging capabilities without importing the full suite of features, including deployments.

  • Create an Azure Machine Learning workspace. To create a workspace, see Create resources you need to get started. Review the access permissions you need to perform your MLflow operations in your workspace.

  • To do remote tracking, or track experiments running outside Azure Machine Learning, configure MLflow to point to the tracking URI of your Azure Machine Learning workspace. For more information on how to connect MLflow to your workspace, see Configure MLflow for Azure Machine Learning.

  • You must have a MLflow model registered in your workspace. Particularly, this example will register a model trained for the Diabetes dataset.

Connect to your workspace

First, let's connect to Azure Machine Learning workspace where your model is registered.

Tracking is already configured for you. Your default credentials will also be used when working with MLflow.

Registering the model

We need a model registered in the Azure Machine Learning registry to perform inference. In this case, we already have a local copy of the model in the repository, so we only need to publish the model to the registry in the workspace. You can skip this step if the model you are trying to deploy is already registered.

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

Alternatively, if your model was logged inside of a run, you can register it directly.

Tip

To register the model, you'll need to know the location where the model has been stored. If you are using autolog feature of MLflow, the path will depend on the type and framework of the model being used. We recommend to check the jobs output to identify which is the name of this folder. You can look for the folder that contains a file named MLModel. If you are logging your models manually using log_model, then the path is the argument you pass to such method. As an example, if you log the model using mlflow.sklearn.log_model(my_model, "classifier"), then the path where the model is stored is classifier.

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

Note

The path MODEL_PATH is the location where the model has been stored in the run.


Get input data to score

We'll need some input data to run or jobs on. In this example, we'll download sample data from internet and place it in a shared storage used by the Spark cluster.

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

Move the data to a mounted storage account available to the entire cluster.

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

Important

The previous code uses dbutils, which is a tool available in Azure Databricks cluster. Use the appropriate tool depending on the platform you are using.

The input data is then placed in the following folder:

input_data_path = "dbfs:/data"

Run the model in Spark clusters

The following section explains how to run MLflow models registered in Azure Machine Learning in Spark jobs.

  1. Ensure the following libraries are installed in the cluster:

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. We'll use a notebook to demonstrate how to create a scoring routine with an MLflow model registered in Azure Machine Learning. Create a notebook and use PySpark as the default language.

  3. Import the required namespaces:

    import mlflow
    import pyspark.sql.functions as f
    
  4. Configure the model URI. The following URI brings a model named heart-classifier in its latest version.

    model_uri = "models:/heart-classifier/latest"
    
  5. Load the model as an UDF function. A user-defined function (UDF) is a function defined by a user, allowing custom logic to be reused in the user environment.

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    Tip

    Use the argument result_type to control the type returned by the predict() function.

  6. Read the data you want to score:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    In our case, the input data is on CSV format and placed in the folder dbfs:/data/. We're also dropping the column target as this dataset contains the target variable to predict. In production scenarios, your data won't have this column.

  7. Run the function predict_function and place the predictions on a new column. In this case, we're placing the predictions in the column predictions.

    df.withColumn("predictions", score_function(*df.columns))
    

    Tip

    The predict_function receives as arguments the columns required. In our case, all the columns of the data frame are expected by the model and hence df.columns is used. If your model requires a subset of the columns, you can introduce them manually. If you model has a signature, types need to be compatible between inputs and expected types.

  8. You can write your predictions back to storage:

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

Run the model in a standalone Spark job in Azure Machine Learning

Azure Machine Learning supports creation of a standalone Spark job, and creation of a reusable Spark component that can be used in Azure Machine Learning pipelines. In this example, we'll deploy a scoring job that runs in Azure Machine Learning standalone Spark job and runs an MLflow model to perform inference.

Note

To learn more about Spark jobs in Azure Machine Learning, see Submit Spark jobs in Azure Machine Learning (preview).

  1. A Spark job requires a Python script that takes arguments. Create a scoring script:

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    The above script takes three arguments --model, --input_data and --scored_data. The first two are inputs and represent the model we want to run and the input data, the last one is an output and it is the output folder where predictions will be placed.

    Tip

    Installation of Python packages: The previous scoring script loads the MLflow model into an UDF function, but it indicates the parameter env_manager="conda". When this parameter is set, MLflow will restore the required packages as specified in the model definition in an isolated environment where only the UDF function runs. For more details see mlflow.pyfunc.spark_udf documentation.

  2. Create a job definition:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    Tip

    To use an attached Synapse Spark pool, define compute property in the sample YAML specification file shown above instead of resources property.

  3. The YAML files shown above can be used in the az ml job create command, with the --file parameter, to create a standalone Spark job as shown:

    az ml job create -f mlflow-score-spark-job.yml
    

Next steps