Integración de MLflow y Ray
MLflow es una plataforma de código abierto para administrar cargas de trabajo de aprendizaje automático e IA. La combinación de Ray con MLflow permite distribuir cargas de trabajo con Ray y realizar un seguimiento de modelos, métricas, parámetros y metadatos generados durante el entrenamiento con MLflow.
En este artículo se explica cómo integrar MLflow con los siguientes componentes de Ray:
Ray Core: aplicaciones distribuidas de uso general que no están cubiertas por Ray Tune y Ray Train
Ray Train: entrenamiento de modelos distribuidos
Ray Tune: ajuste distribuido de hiperparámetros
Model Serving: implementación de modelos para la inferencia en tiempo real
Integración de Ray Core y MLflow
Ray Core proporciona los bloques de creación fundamentales para aplicaciones distribuidas de uso general. Permite escalar las funciones y clases de Python entre varios nodos.
En esta sección se describen los siguientes patrones para integrar Ray Core y MLflow:
- Registrar modelos de MLflow desde el proceso del controlador Ray
- Registrar modelos de MLflow desde ejecuciones secundarias
Registro de MLflow desde el proceso del controlador Ray
Por lo general, es mejor registrar modelos de MLflow desde el proceso de controlador en lugar de desde nodos de trabajo. Esto se debe a la complejidad adicional de pasar referencias con estado a los trabajadores remotos.
Por ejemplo, se produce un error en el código siguiente porque el servidor de seguimiento de MLflow no se inicializa mediante MLflow Client
desde dentro de los nodos de trabajo.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# This method will fail
mlflow.log_metric("x", x)
return x
with mlflow.start_run() as run:
ray.get([example_logging_task.remote(x) for x in range(10)])
En su lugar, devuelva las métricas al nodo del controlador. Las métricas y los metadatos suelen ser lo suficientemente pequeñas como para volver al controlador sin causar problemas de memoria.
Tome el ejemplo anterior y actualícelo para registrar las métricas devueltas desde una tarea de Ray:
import mlflow
@ray.remote
def example_logging_task(x):
# ...
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
En el caso de las tareas que requieren guardar artefactos grandes, como una tabla de Pandas grande, imágenes, trazados o modelos, Databricks recomienda conservar el artefacto como un archivo. A continuación, vuelva a cargar el artefacto dentro del contexto del controlador o registre directamente el objeto con MLflow especificando la ruta de acceso al archivo guardado.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
f.write(myLargeObject)
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
# Directly log the saved file by specifying the path
mlflow.log_artifact("/dbfs/myLargeFilePath.txt")
Tareas de Log Ray como ejecuciones secundarias de MLflow
Puede integrar Ray Core con MLflow mediante ejecuciones secundarias. Esto implica los pasos siguientes:
- Crear una ejecución primaria: inicialice una ejecución primaria en el proceso del controlador. Esta ejecución actúa como un contenedor jerárquico para todas las ejecuciones secundarias posteriores.
- Crear ejecuciones secundarias: en cada tarea de Ray, inicie una ejecución secundaria bajo la ejecución primaria. Cada ejecución secundaria puede registrar sus propias métricas de forma independiente.
Para implementar este enfoque, asegúrese de que cada tarea de Ray recibe las credenciales de cliente necesarias y el elemento primario run_id
. Esta configuración establece la relación jerárquica de elementos primarios y secundarios entre ejecuciones. En el fragmento de código siguiente se muestra cómo recuperar las credenciales y pasar a lo largo del elemento primario run_id
:
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
# Set the MLflow credentials within the Ray task
os.environ.update(mlflow_db_creds)
# Set the active MLflow experiment within each Ray task
mlflow.set_experiment(experiment_name)
# Create nested child runs associated with the parent run_id
with mlflow.start_run(run_id=run_id, nested=True):
# Log metrics to the child run within the Ray task
mlflow.log_metric("x", x)
return x
# Start parent run on the main driver process
with mlflow.start_run() as run:
# Pass the parent run's run_id to each Ray task
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Entrenamiento de Ray y MLflow
La manera más sencilla de registrar los modelos de Ray Train en MLflow es usar el punto de control generado por la ejecución de entrenamiento. Una vez completada la ejecución de entrenamiento, vuelva a cargar el modelo en su marco de aprendizaje profundo nativo (por ejemplo, PyTorch o TensorFlow), después regístrelo con el código de MLflow correspondiente.
Este enfoque garantiza que el modelo se almacene correctamente y esté listo para la evaluación o la implementación.
El código siguiente vuelve a cargar un modelo desde un punto de control de Ray Train y lo registra en MLflow:
result = trainer.fit()
checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
# Change as needed for different DL frameworks
checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
# Load the model from the checkpoint
model = MyModel.load_from_checkpoint(checkpoint_path)
with mlflow.start_run() as run:
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Aunque suele ser un procedimiento recomendado enviar objetos al nodo de controlador, con Ray Train, guardar los resultados finales es más fácil que todo el historial de entrenamiento del proceso de trabajo.
Para almacenar varios modelos desde una ejecución de entrenamiento, especifique el número de puntos de control que se mantendrán en ray.train.CheckpointConfig
. Los modelos se pueden leer y registrar de la misma manera que almacenar un único modelo.
Nota:
MLflow no es responsable de controlar la tolerancia frente a errores durante el entrenamiento del modelo, sino para realizar el seguimiento del ciclo de vida del modelo. En su lugar, Ray Train administra la tolerancia frente a errores.
Para almacenar las métricas de entrenamiento especificadas por Ray Train, recuperarlas del objeto de resultado y almacenarlas mediante MLflow.
result = trainer.fit()
with mlflow.start_run() as run:
mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Para configurar correctamente los clústeres de Spark y Ray y evitar problemas de asignación de recursos, debe ajustar la configuración resources_per_worker
. En concreto, establezca el número de CPU para cada trabajador de Ray como uno menor que el número total de CPU disponibles en un nodo de trabajo de Ray. Este ajuste es fundamental porque si el instructor reserva todos los núcleos disponibles para los actores de Ray, puede provocar errores de contención de recursos.
Ray Tune y MLflow
La integración de Ray Tune con MLflow permite realizar un seguimiento eficaz y registrar experimentos de ajuste de hiperparámetros en Databricks. Esta integración aprovecha las funcionalidades de seguimiento de experimentos de MLflow para registrar métricas y resultados directamente desde las tareas de Ray.
Enfoque de ejecución secundaria para el registro
De forma similar al registro de tareas de Ray Core, las aplicaciones de Ray Tune pueden usar un enfoque de ejecución secundaria para registrar métricas de cada prueba o ajustar la iteración. Siga estos pasos para implementar un enfoque de ejecución secundaria:
- Crear una ejecución primaria: inicialice una ejecución primaria en el proceso del controlador. Esta ejecución sirve como contenedor principal para todas las ejecuciones secundarias posteriores.
- Ejecuciones secundarias de registro: cada tarea Ray Tune crea una ejecución secundaria en la ejecución primaria, manteniendo una jerarquía clara de resultados del experimento.
En el ejemplo siguiente se muestra cómo autenticar y registrar las tareas de Ray Tune mediante MLflow.
import os
import tempfile
import time
import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars
from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow
mlflow_db_creds = get_databricks_env_vars("databricks")
EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)
def evaluation_fn(step, width, height):
return (0.1 + width * step / 100) ** (-1) + height * 0.1
def train_function_mlflow(config, run_id):
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(EXPERIMENT_NAME)
# Hyperparameters
width = config["width"]
height = config["height"]
with mlflow.start_run(run_id=run_id, nested=True):
for step in range(config.get("steps", 100)):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Log the metrics to MLflow
mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
# Feed the score back to Tune.
train.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)
def tune_with_setup(run_id, finish_fast=True):
os.environ.update(mlflow_db_creds)
# Set the experiment or create a new one if it does not exist.
mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
tuner = tune.Tuner(
tune.with_parameter(train_function_mlflow, run_id),
tune_config=tune.TuneConfig(num_samples=5),
run_config=train.RunConfig(
name="mlflow",
),
param_space={
"width": tune.randint(10, 100),
"height": tune.randint(0, 100),
"steps": 20 if finish_fast else 100,
},
)
results = tuner.fit()
with mlflow.start_run() as run:
mlflow_tracking_uri = mlflow.get_tracking_uri()
tune_with_setup(run.info.run_id)
Servicio de modelos
El uso de Ray Serve en clústeres de Databricks para la inferencia en tiempo real plantea desafíos debido a las limitaciones de conectividad y seguridad de red al interactuar con aplicaciones externas.
Databricks recomienda usar Model Serving para implementar modelos de aprendizaje automático en producción en un punto de conexión de la API de REST. Para obtener más información, consulte Implementación de modelos personalizados.