Compartir a través de


Integración de MLflow y Ray

MLflow es una plataforma de código abierto para administrar cargas de trabajo de aprendizaje automático e IA. La combinación de Ray con MLflow permite distribuir cargas de trabajo con Ray y realizar un seguimiento de modelos, métricas, parámetros y metadatos generados durante el entrenamiento con MLflow.

En este artículo se explica cómo integrar MLflow con los siguientes componentes de Ray:

  • Ray Core: aplicaciones distribuidas de uso general que no están cubiertas por Ray Tune y Ray Train

  • Ray Train: entrenamiento de modelos distribuidos

  • Ray Tune: ajuste distribuido de hiperparámetros

  • Model Serving: implementación de modelos para la inferencia en tiempo real

Integración de Ray Core y MLflow

Ray Core proporciona los bloques de creación fundamentales para aplicaciones distribuidas de uso general. Permite escalar las funciones y clases de Python entre varios nodos.

En esta sección se describen los siguientes patrones para integrar Ray Core y MLflow:

  • Registrar modelos de MLflow desde el proceso del controlador Ray
  • Registrar modelos de MLflow desde ejecuciones secundarias

Registro de MLflow desde el proceso del controlador Ray

Por lo general, es mejor registrar modelos de MLflow desde el proceso de controlador en lugar de desde nodos de trabajo. Esto se debe a la complejidad adicional de pasar referencias con estado a los trabajadores remotos.

Por ejemplo, se produce un error en el código siguiente porque el servidor de seguimiento de MLflow no se inicializa mediante MLflow Client desde dentro de los nodos de trabajo.

import mlflow

@ray.remote
def example_logging_task(x):
# ...

 # This method will fail
 mlflow.log_metric("x", x)
 return x

with mlflow.start_run() as run:
 ray.get([example_logging_task.remote(x) for x in range(10)])

En su lugar, devuelva las métricas al nodo del controlador. Las métricas y los metadatos suelen ser lo suficientemente pequeñas como para volver al controlador sin causar problemas de memoria.

Tome el ejemplo anterior y actualícelo para registrar las métricas devueltas desde una tarea de Ray:

import mlflow

@ray.remote
def example_logging_task(x):
 # ...
 return x

with mlflow.start_run() as run:
  results = ray.get([example_logging_task.remote(x) for x in range(10)])
 for x in results:
   mlflow.log_metric("x", x)

En el caso de las tareas que requieren guardar artefactos grandes, como una tabla de Pandas grande, imágenes, trazados o modelos, Databricks recomienda conservar el artefacto como un archivo. A continuación, vuelva a cargar el artefacto dentro del contexto del controlador o registre directamente el objeto con MLflow especificando la ruta de acceso al archivo guardado.

import mlflow

@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
  f.write(myLargeObject)
return x

with mlflow.start_run() as run:
 results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
  mlflow.log_metric("x", x)
  # Directly log the saved file by specifying the path
  mlflow.log_artifact("/dbfs/myLargeFilePath.txt")

Tareas de Log Ray como ejecuciones secundarias de MLflow

Puede integrar Ray Core con MLflow mediante ejecuciones secundarias. Esto implica los pasos siguientes:

  1. Crear una ejecución primaria: inicialice una ejecución primaria en el proceso del controlador. Esta ejecución actúa como un contenedor jerárquico para todas las ejecuciones secundarias posteriores.
  2. Crear ejecuciones secundarias: en cada tarea de Ray, inicie una ejecución secundaria bajo la ejecución primaria. Cada ejecución secundaria puede registrar sus propias métricas de forma independiente.

Para implementar este enfoque, asegúrese de que cada tarea de Ray recibe las credenciales de cliente necesarias y el elemento primario run_id. Esta configuración establece la relación jerárquica de elementos primarios y secundarios entre ejecuciones. En el fragmento de código siguiente se muestra cómo recuperar las credenciales y pasar a lo largo del elemento primario run_id:

from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"

mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
   import os
  # Set the MLflow credentials within the Ray task
   os.environ.update(mlflow_db_creds)
  # Set the active MLflow experiment within each Ray task
   mlflow.set_experiment(experiment_name)
  # Create nested child runs associated with the parent run_id
   with mlflow.start_run(run_id=run_id, nested=True):
    # Log metrics to the child run within the Ray task
       mlflow.log_metric("x", x)

  return x

# Start parent run on the main driver process
with mlflow.start_run() as run:
  # Pass the parent run's run_id to each Ray task
   results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Entrenamiento de Ray y MLflow

La manera más sencilla de registrar los modelos de Ray Train en MLflow es usar el punto de control generado por la ejecución de entrenamiento. Una vez completada la ejecución de entrenamiento, vuelva a cargar el modelo en su marco de aprendizaje profundo nativo (por ejemplo, PyTorch o TensorFlow), después regístrelo con el código de MLflow correspondiente.

Este enfoque garantiza que el modelo se almacene correctamente y esté listo para la evaluación o la implementación.

El código siguiente vuelve a cargar un modelo desde un punto de control de Ray Train y lo registra en MLflow:

result = trainer.fit()

checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
     # Change as needed for different DL frameworks
    checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
    # Load the model from the checkpoint
    model = MyModel.load_from_checkpoint(checkpoint_path)

with mlflow.start_run() as run:
    # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Aunque suele ser un procedimiento recomendado enviar objetos al nodo de controlador, con Ray Train, guardar los resultados finales es más fácil que todo el historial de entrenamiento del proceso de trabajo.

Para almacenar varios modelos desde una ejecución de entrenamiento, especifique el número de puntos de control que se mantendrán en ray.train.CheckpointConfig. Los modelos se pueden leer y registrar de la misma manera que almacenar un único modelo.

Nota:

MLflow no es responsable de controlar la tolerancia frente a errores durante el entrenamiento del modelo, sino para realizar el seguimiento del ciclo de vida del modelo. En su lugar, Ray Train administra la tolerancia frente a errores.

Para almacenar las métricas de entrenamiento especificadas por Ray Train, recuperarlas del objeto de resultado y almacenarlas mediante MLflow.

result = trainer.fit()

with mlflow.start_run() as run:
    mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))

  # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Para configurar correctamente los clústeres de Spark y Ray y evitar problemas de asignación de recursos, debe ajustar la configuración resources_per_worker. En concreto, establezca el número de CPU para cada trabajador de Ray como uno menor que el número total de CPU disponibles en un nodo de trabajo de Ray. Este ajuste es fundamental porque si el instructor reserva todos los núcleos disponibles para los actores de Ray, puede provocar errores de contención de recursos.

Ray Tune y MLflow

La integración de Ray Tune con MLflow permite realizar un seguimiento eficaz y registrar experimentos de ajuste de hiperparámetros en Databricks. Esta integración aprovecha las funcionalidades de seguimiento de experimentos de MLflow para registrar métricas y resultados directamente desde las tareas de Ray.

Enfoque de ejecución secundaria para el registro

De forma similar al registro de tareas de Ray Core, las aplicaciones de Ray Tune pueden usar un enfoque de ejecución secundaria para registrar métricas de cada prueba o ajustar la iteración. Siga estos pasos para implementar un enfoque de ejecución secundaria:

  1. Crear una ejecución primaria: inicialice una ejecución primaria en el proceso del controlador. Esta ejecución sirve como contenedor principal para todas las ejecuciones secundarias posteriores.
  2. Ejecuciones secundarias de registro: cada tarea Ray Tune crea una ejecución secundaria en la ejecución primaria, manteniendo una jerarquía clara de resultados del experimento.

En el ejemplo siguiente se muestra cómo autenticar y registrar las tareas de Ray Tune mediante MLflow.

import os
import tempfile
import time

import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars

from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow

mlflow_db_creds = get_databricks_env_vars("databricks")

EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)

def evaluation_fn(step, width, height):
   return (0.1 + width * step / 100) ** (-1) + height * 0.1

def train_function_mlflow(config, run_id):
   os.environ.update(mlflow_db_creds)
   mlflow.set_experiment(EXPERIMENT_NAME)

   # Hyperparameters
   width = config["width"]
   height = config["height"]

   with mlflow.start_run(run_id=run_id, nested=True):
       for step in range(config.get("steps", 100)):
           # Iterative training function - can be any arbitrary training procedure
           intermediate_score = evaluation_fn(step, width, height)
           # Log the metrics to MLflow
           mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
           # Feed the score back to Tune.
           train.report({"iterations": step, "mean_loss": intermediate_score})
           time.sleep(0.1)

def tune_with_setup(run_id, finish_fast=True):
   os.environ.update(mlflow_db_creds)
   # Set the experiment or create a new one if it does not exist.
   mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)

   tuner = tune.Tuner(
       tune.with_parameter(train_function_mlflow, run_id),
       tune_config=tune.TuneConfig(num_samples=5),
       run_config=train.RunConfig(
           name="mlflow",
       ),
       param_space={
           "width": tune.randint(10, 100),
           "height": tune.randint(0, 100),
           "steps": 20 if finish_fast else 100,
       },
   )
   results = tuner.fit()

with mlflow.start_run() as run:
   mlflow_tracking_uri = mlflow.get_tracking_uri()
   tune_with_setup(run.info.run_id)

Servicio de modelos

El uso de Ray Serve en clústeres de Databricks para la inferencia en tiempo real plantea desafíos debido a las limitaciones de conectividad y seguridad de red al interactuar con aplicaciones externas.

Databricks recomienda usar Model Serving para implementar modelos de aprendizaje automático en producción en un punto de conexión de la API de REST. Para obtener más información, consulte Implementación de modelos personalizados.