Partager via


Intégrer MLflow et Ray

MLflow est une plateforme open source pour la gestion de charges de travail d’apprentissage automatique et d’IA. La combinaison de Ray avec MLflow vous permet de distribuer des charges de travail avec Ray et de suivre des modèles, des indicateurs, des paramètres et des métadonnées générés pendant l’entraînement avec MLflow.

Cet article explique comment intégrer MLflow aux composants Ray suivants :

  • Ray Core : applications distribuées à usage général qui ne sont pas couvertes par Ray Tune et Ray Train

  • Ray Train : entraînement de modèle distribué

  • Ray Tune : réglage distribué des hyperparamètres

  • Service de modèle : déploiement de modèles pour l’inférence en temps réel

Intégrer Ray Core et MLflow

Ray Core fournit les blocs élémentaires fondamentaux pour les applications distribuées à usage général. Il vous permet de mettre à l’échelle des fonctions et des classes Python sur plusieurs nœuds.

Cette section décrit les modèles suivants pour intégrer Ray Core et MLflow :

  • Journaliser les modèles MLflow à partir du processus de pilote Ray
  • Journaliser des modèles MLflow à partir d’exécutions enfants

Journaliser MLflow à partir du processus de pilote Ray

Il est généralement préférable de journaliser les modèles MLflow à partir du processus de pilote plutôt qu’à partir de nœuds Worker. Cela s’explique par la complexité supplémentaire liée à la transmission de références d’état aux télétravailleurs.

Par exemple, le code suivant échoue, car le serveur de suivi MLflow n’est pas initialisé à l’aide de MLflow Client à partir des nœuds Worker.

import mlflow

@ray.remote
def example_logging_task(x):
# ...

 # This method will fail
 mlflow.log_metric("x", x)
 return x

with mlflow.start_run() as run:
 ray.get([example_logging_task.remote(x) for x in range(10)])

Au lieu de cela, retournez les indicateurs au nœud du pilote. Les indicateurs et les métadonnées sont généralement suffisamment petits pour le transfert vers le pilote sans provoquer de problèmes de mémoire.

Prenons l’exemple ci-dessus et modifions-le pour enregistrer les indicateurs renvoyés par une tâche Ray :

import mlflow

@ray.remote
def example_logging_task(x):
 # ...
 return x

with mlflow.start_run() as run:
  results = ray.get([example_logging_task.remote(x) for x in range(10)])
 for x in results:
   mlflow.log_metric("x", x)

Pour les tâches qui nécessitent l’enregistrement d’artefacts volumineux, tels qu’une table Pandas, des images, des tracés ou des modèles, Databricks recommande de conserver l’artefact sous la forme d’un fichier. Ensuite, il faut soit recharger l’artefact dans le contexte du pilote, soit enregistrer directement l’objet avec MLflow en spécifiant le chemin d’accès au fichier sauvegardé.

import mlflow

@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
  f.write(myLargeObject)
return x

with mlflow.start_run() as run:
 results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
  mlflow.log_metric("x", x)
  # Directly log the saved file by specifying the path
  mlflow.log_artifact("/dbfs/myLargeFilePath.txt")

Enregistrer les tâches de Ray en tant qu’exécution d’un enfant MLflow

Vous pouvez intégrer Ray Core à MLflow à l’aide d’exécutions enfants. Cela implique les étapes suivantes :

  1. Créer une exécution parente : initialisez une exécution parente dans le processus du pilote. Cette exécution agit comme un conteneur hiérarchique pour toutes les exécutions enfants suivantes.
  2. Créer des exécutions enfants : dans chaque tâche Ray, lancez une exécution enfant sous l’exécution parente. Chaque exécution enfant peut journaliser indépendamment ses propres indicateurs.

Pour implémenter cette approche, assurez-vous que chaque tâche Ray reçoit les informations d’identification du client nécessaires et le parent run_id. Cette configuration établit la relation parent-enfant hiérarchique entre les exécutions. L’extrait de code suivant montre comment récupérer les informations d’identification et transmettre run_id parent :

from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"

mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
   import os
  # Set the MLflow credentials within the Ray task
   os.environ.update(mlflow_db_creds)
  # Set the active MLflow experiment within each Ray task
   mlflow.set_experiment(experiment_name)
  # Create nested child runs associated with the parent run_id
   with mlflow.start_run(run_id=run_id, nested=True):
    # Log metrics to the child run within the Ray task
       mlflow.log_metric("x", x)

  return x

# Start parent run on the main driver process
with mlflow.start_run() as run:
  # Pass the parent run's run_id to each Ray task
   results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Ray Train et MLflow

La façon la plus simple d’enregistrer les modèles Ray Train sur MLflow consiste à utiliser le point de contrôle généré par l’exécution de l’entraînement. Une fois l’exécution de l’entraînement terminée, rechargez le modèle dans son cadre de deep learning natif (par exemple, PyTorch ou TensorFlow), puis connectez-le avec le code MLflow correspondant.

Cette approche garantit que le modèle est stocké correctement et prêt pour l’évaluation ou le déploiement.

Le code suivant recharge un modèle à partir d’un point de contrôle Ray Train et l’enregistre dans MLflow :

result = trainer.fit()

checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
     # Change as needed for different DL frameworks
    checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
    # Load the model from the checkpoint
    model = MyModel.load_from_checkpoint(checkpoint_path)

with mlflow.start_run() as run:
    # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Bien qu’il soit généralement préférable de renvoyer les objets au nœud pilote, avec Ray Train, il est plus facile de sauvegarder les résultats finaux que l’ensemble de l’historique de formation du processus Worker.

Pour stocker plusieurs modèles à partir d’une exécution d’entraînement, spécifiez le nombre de points de contrôle à conserver dans le ray.train.CheckpointConfig. Les modèles peuvent ensuite être lus et enregistrés de la même façon que le stockage d’un modèle unique.

Remarque

MLflow n’est pas responsable de la gestion de la tolérance de panne pendant l’entraînement du modèle, mais plutôt pour le suivi du cycle de vie du modèle. La tolérance de panne est gérée par Ray Train lui-même.

Pour stocker les indicateurs d’entraînement spécifiés par Ray Train, récupérez-les à partir de l’objet de résultat et stockez-les à l’aide de MLflow.

result = trainer.fit()

with mlflow.start_run() as run:
    mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))

  # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Pour configurer correctement vos clusters Spark et Ray et empêcher les problèmes d’affectation des ressources, vous devez ajuster le paramètre resources_per_worker. Plus précisément, le nombre d’UC de chaque Worker Ray doit être inférieur d’une unité au nombre total d’UC disponibles sur un nœud Worker Ray. Cet ajustement est crucial, car si l’entraîneur réserve tous les cœurs disponibles pour les acteurs Ray, il peut entraîner des erreurs de contention des ressources.

Ray Tune et MLflow

L’intégration de Ray Tune à MLflow vous permet de suivre et de journaliser efficacement les expériences d’optimisation des hyperparamètres dans Databricks. Cette intégration s’appuie sur les capacités de suivi des expériences de MLflow pour enregistrer les indicateurs et les résultats directement à partir des tâches de Ray.

Approche d’exécution enfant pour la journalisation

Comme pour la journalisation des tâches Ray Core, les applications Ray Tune peuvent utiliser une approche d’exécution enfant pour journaliser les indicateurs de chaque essai ou itération de réglage. Procédez comme suit pour implémenter une approche d’exécution enfant :

  1. Créer une exécution parente : initialisez une exécution parente dans le processus du pilote. Cette exécution sert de conteneur principal pour toutes les exécutions enfants suivantes.
  2. Journaliser les exécutions enfants  : chaque tâche Ray Tune crée une exécution enfant sous l’exécution parente, conservant une hiérarchie claire des résultats de l’expérience.

L’exemple suivant montre comment authentifier et journaliser à partir de tâches Ray Tune à l’aide de MLflow.

import os
import tempfile
import time

import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars

from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow

mlflow_db_creds = get_databricks_env_vars("databricks")

EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)

def evaluation_fn(step, width, height):
   return (0.1 + width * step / 100) ** (-1) + height * 0.1

def train_function_mlflow(config, run_id):
   os.environ.update(mlflow_db_creds)
   mlflow.set_experiment(EXPERIMENT_NAME)

   # Hyperparameters
   width = config["width"]
   height = config["height"]

   with mlflow.start_run(run_id=run_id, nested=True):
       for step in range(config.get("steps", 100)):
           # Iterative training function - can be any arbitrary training procedure
           intermediate_score = evaluation_fn(step, width, height)
           # Log the metrics to MLflow
           mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
           # Feed the score back to Tune.
           train.report({"iterations": step, "mean_loss": intermediate_score})
           time.sleep(0.1)

def tune_with_setup(run_id, finish_fast=True):
   os.environ.update(mlflow_db_creds)
   # Set the experiment or create a new one if it does not exist.
   mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)

   tuner = tune.Tuner(
       tune.with_parameter(train_function_mlflow, run_id),
       tune_config=tune.TuneConfig(num_samples=5),
       run_config=train.RunConfig(
           name="mlflow",
       ),
       param_space={
           "width": tune.randint(10, 100),
           "height": tune.randint(0, 100),
           "steps": 20 if finish_fast else 100,
       },
   )
   results = tuner.fit()

with mlflow.start_run() as run:
   mlflow_tracking_uri = mlflow.get_tracking_uri()
   tune_with_setup(run.info.run_id)

Mise en service de modèles

L’utilisation de Ray Serve sur les groupements Databricks pour l’inférence en temps réel pose des problèmes en raison de la sécurité du réseau et des limitations de connectivité lors de l’interaction avec des applications externes.

Databricks recommande d’utiliser Service de modèles pour déployer des modèles Machine Learning en production sur un point de terminaison d’API REST. Pour plus d’informations, consultez Déployer des modèles personnalisés.