Compartir a través de


Implementación y ejecución de modelos de MLflow en trabajos de Spark

En este artículo, aprenderá a implementar y ejecutar el modelo de MLflow en trabajos de Spark para realizar inferencias en grandes cantidades de datos o como parte de trabajos de limpieza y transformación de datos.

Acerca de este ejemplo

En este ejemplo se muestra cómo puede implementar un modelo de MLflow registrado en Azure Machine Learning en trabajos de Spark que se ejecutan en clústeres de Spark administrados (versión preliminar), Azure Databricks o Azure Synapse Analytics, para realizar inferencias en grandes cantidades de datos.

El modelo se basa en el conjunto de datos sobre enfermedades cardiacas de la UCI. La base de datos contiene 76 atributos, pero se usa un subconjunto de 14 de ellos. El modelo intenta predecir la presencia de enfermedades cardíacas en un paciente. Es un entero cuyo valor es 0 (sin presencia) o 1 (presencia). Se ha entrenado mediante un clasificador de XGBBoost y todo el preprocesamiento necesarios se ha empaquetado en forma de canalización de scikit-learn, lo que convierte a este modelo en una canalización de un extremo a otro que pasa de los datos sin procesar a las predicciones.

La información de este artículo se basa en ejemplos de código que se encuentran en el repositorio azureml-examples. Para ejecutar los comandos localmente sin tener que copiar y pegar los archivos, clone el repositorio y luego cambie los directorios a sdk/using-mlflow/deploy.

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

Requisitos previos

Antes de seguir los pasos de este artículo, asegúrese de que tiene los siguientes requisitos previos:

  • Instale el paquete mlflow del SDK de MLflow y el complemento azureml-mlflow de Azure Machine Learning para MLflow:

    pip install mlflow azureml-mlflow
    

    Sugerencia

    Puede usar el paquete de mlflow-skinny, que es un paquete MLflow ligero sin dependencias de ciencia de datos, interfaz de usuario, servidor o almacenamiento de SQL. mlflow-skinny se recomienda para los usuarios que necesitan principalmente las funcionalidades de seguimiento y registro de MLflow sin importar el conjunto completo de características, incluidas las implementaciones.

  • Un área de trabajo de Azure Machine Learning. Para crear un área de trabajo, consulte el tutorial Creación de recursos de aprendizaje automático. Revise los permisos de acceso que necesita para realizar las operaciones de MLflow en el área de trabajo.

  • Si realiza el seguimiento remoto (es decir, realiza un seguimiento de los experimentos que se ejecutan fuera de Azure Machine Learning), configure MLflow para que apunte al URI de seguimiento del área de trabajo de Azure Machine Learning. Para más información sobre cómo conectar MLflow al área de trabajo, consulte Configuración de MLflow para Azure Machine Learning.

  • Debe tener un modelo de MLflow registrado en el área de trabajo. En particular, en este ejemplo se registrará un modelo entrenado para el conjunto de datos Diabetes.

Conexión con su área de trabajo

En primer lugar, vamos a conectarnos al área de trabajo de Azure Machine Learning donde está registrado el modelo.

El seguimiento ya está configurado. Las credenciales predeterminadas también se usarán al trabajar con MLflow.

Registro del modelo

Necesitamos un modelo registrado en el registro de Azure Machine Learning para realizar la inferencia. En este caso, ya tenemos una copia local del modelo en el repositorio, por lo que solo es necesario publicar el modelo en el registro en el área de trabajo. Puede omitir este paso si el modelo que está intentando implementar ya está registrado.

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

Como alternativa, si el modelo se registró dentro de una ejecución, puede registrarlo directamente.

Sugerencia

Para registrar el modelo, deberá conocer la ubicación donde se ha almacenado el modelo. Si usa la característica autolog de MLflow, la ruta de acceso dependerá del tipo y el marco del modelo que se va a usar. Se recomienda comprobar la salida de los trabajos para identificar cuál es el nombre de esta carpeta. Localice una carpeta que contiene un archivo denominado MLModel. Si va a registrar los modelos manualmente mediante log_model, la ruta de acceso es el argumento que pasa a este método. Como ejemplo, si registra el modelo mediante mlflow.sklearn.log_model(my_model, "classifier"), la ruta de acceso donde se almacena el modelo es classifier.

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

Nota

La ruta de acceso MODEL_PATH es la ubicación donde se ha almacenado el modelo en la ejecución.


Obtención de datos de entrada para puntuar

Necesitaremos algunos datos de entrada sobre los que ejecutar los trabajos. En este ejemplo, descargaremos datos de ejemplo de Internet y los colocaremos en un almacenamiento compartido que usa el clúster de Spark.

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

Mueva los datos a una cuenta de almacenamiento montada disponible para todo el clúster.

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

Importante

El código anterior usa dbutils, que es una herramienta disponible en el clúster de Azure Databricks. Use la herramienta adecuada en función de la plataforma que use.

A continuación, los datos de entrada se colocan en la siguiente carpeta:

input_data_path = "dbfs:/data"

Ejecución del modelo en clústeres de Spark

En la sección siguiente se explica cómo ejecutar modelos de MLflow registrados en Azure Machine Learning en trabajos de Spark.

  1. Asegúrese de que las bibliotecas siguientes están instaladas en el clúster:

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. Usaremos un cuaderno para demostrar cómo crear una rutina de puntuación con un modelo de MLflow registrado en Azure Machine Learning. Cree un cuaderno y use PySpark como idioma predeterminado.

  3. Importe los espacios de nombres necesarios:

    import mlflow
    import pyspark.sql.functions as f
    
  4. Configure el URI del modelo. El siguiente URI trae un modelo denominado heart-classifier en su versión más reciente.

    model_uri = "models:/heart-classifier/latest"
    
  5. Cargue el modelo como una función UDF. Una función definida por el usuario (UDF) es una función definida por un usuario, lo que permite reutilizar la lógica personalizada en el entorno de usuario.

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    Sugerencia

    Use el argumento result_type para controlar el tipo devuelto por la función predict().

  6. Lea los datos que desea puntuar:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    En nuestro caso, los datos de entrada están en formato CSV y se colocan en la carpeta dbfs:/data/. También vamos a quitar la columna target, ya que este conjunto de datos contiene la variable de destino que se va a predecir. En escenarios de producción, los datos no tendrán esta columna.

  7. Ejecute la función predict_function y coloque las predicciones en una nueva columna. En este caso, vamos a colocar las predicciones en la columna predictions.

    df.withColumn("predictions", score_function(*df.columns))
    

    Sugerencia

    La función predict_function recibe como argumentos las columnas necesarias. En nuestro caso, el modelo espera todas las columnas de la trama de datos y, por tanto se usa df.columns. Si el modelo requiere un subconjunto de las columnas, puede introducirlas manualmente. Si el modelo tiene una firma, los tipos deben ser compatibles entre las entradas y los tipos esperados.

  8. Puede volver a escribir las predicciones en el almacenamiento:

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

Ejecución del modelo en un trabajo de Spark independiente en Azure Machine Learning

Azure Machine Learning admite la creación de un trabajo de Spark independiente y la creación de un componente de Spark reutilizable que se puede usar en canalizaciones de Azure Machine Learning. En este ejemplo, implementaremos un trabajo de puntuación que se ejecuta en un trabajo de Spark independiente de Azure Machine Learning y ejecuta un modelo de MLflow para realizar la inferencia.

Nota:

Para más información sobre los trabajos de Spark en Azure Machine Learning, consulte Envío de trabajos de Spark en Azure Machine Learning (versión preliminar).

  1. Un trabajo de Spark requiere un script de Python que tome argumentos. Creación del script de puntuación:

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    El script anterior toma tres argumentos --model, --input_data y --scored_data. Las dos primeras son entradas y representan el modelo que queremos ejecutar y los datos de entrada, el último es una salida y es la carpeta de salida donde se colocarán las predicciones.

    Sugerencia

    Instalación de paquetes de Python: El script de puntuación anterior carga el modelo de MLflow en una función UDF, pero indica el parámetro env_manager="conda". Cuando se establece este parámetro, MLflow restaurará los paquetes necesarios como se especifica en la definición del modelo en un entorno aislado donde solo se ejecuta la función UDF. Para más información, consulte la documentación mlflow.pyfunc.spark_udf.

  2. Creación de una definición de trabajo:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    Sugerencia

    Para usar un grupo de Synapse Spark adjunto, defina la propiedad compute en el archivo de especificación YAML de ejemplo mostrado anteriormente en lugar de la propiedad resources.

  3. Los archivos YAML mostrados anteriormente se pueden usar en el comando az ml job create, con el parámetro --file, para crear un trabajo de Spark independiente como se muestra:

    az ml job create -f mlflow-score-spark-job.yml
    

Pasos siguientes