Sdílet prostřednictvím


Integrace MLflow a Ray

MLflow je opensourcová platforma pro správu úloh strojového učení a umělé inteligence. Kombinace Rayu s MLflow umožňuje distribuovat úlohy pomocí Ray a sledovat modely, metriky, parametersa metadata vygenerovaná během trénování pomocí MLflow.

Tento článek popisuje, jak integrovat MLflow s následujícími komponentami Ray:

  • Ray Core: Distribuované aplikace pro obecné účely, které nejsou pokryty Ray Tune a Ray Train

  • Ray Train: Trénování distribuovaného modelu

  • Ray Tune: Distribuované ladění hyperparametrů

  • Obsluha modelů: Nasazování modelů pro odvozování v reálném čase

Integrace Ray Core a MLflow

Ray Core poskytuje základní stavební bloky pro distribuované aplikace pro obecné účely. Umožňuje škálovat funkce a třídy Pythonu napříč několika uzly.

Tato část popisuje následující vzory integrace Ray Core a MLflow:

  • Protokolování modelů MLflow z procesu ovladače Ray
  • Protokolování modelů MLflow z podřízených spuštění

Protokolování MLflow z procesu ovladače Ray

Obecně je nejlepší protokolovat modely MLflow z procesu ovladače místo z pracovních uzlů. Důvodem je větší složitost předávání stavových odkazů vzdáleným pracovníkům.

Například následující kód selže, protože server pro sledování MLflow není inicializován pomocí MLflow Client z pracovních uzlů.

import mlflow

@ray.remote
def example_logging_task(x):
# ...

 # This method will fail
 mlflow.log_metric("x", x)
 return x

with mlflow.start_run() as run:
 ray.get([example_logging_task.remote(x) for x in range(10)])

Místo toho vraťte metriky do uzlu ovladače. Metriky a metadata jsou obecně dostatečně malé, aby se přenesly zpět na ovladač, aniž by to způsobilo problémy s pamětí.

Podívejte se na výše uvedený příklad a update protokolovat vrácené metriky z úlohy Ray:

import mlflow

@ray.remote
def example_logging_task(x):
 # ...
 return x

with mlflow.start_run() as run:
  results = ray.get([example_logging_task.remote(x) for x in range(10)])
 for x in results:
   mlflow.log_metric("x", x)

U úloh, které vyžadují ukládání velkých artefaktů, jako jsou velké tablePandas, obrázky, grafy nebo modely, doporučuje Databricks zachovat artefakt jako soubor. Pak buď znovu načtěte artefakt v kontextu ovladače, nebo přímo protokolujte objekt pomocí MLflow zadáním cesty k uloženému souboru.

import mlflow

@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
  f.write(myLargeObject)
return x

with mlflow.start_run() as run:
 results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
  mlflow.log_metric("x", x)
  # Directly log the saved file by specifying the path
  mlflow.log_artifact("/dbfs/myLargeFilePath.txt")

Úlohy Log Ray jako podřízená spuštění MLflow

Ray Core můžete integrovat s MLflow pomocí podřízených spuštění. To zahrnuje následující kroky:

  1. Vytvoření nadřazeného spuštění: Inicializace nadřazeného spuštění v procesu ovladače Toto spuštění funguje jako hierarchický kontejner pro všechna následná podřízená spuštění.
  2. Vytvoření podřízených spuštění: V rámci každé úlohy Ray zahajte podřízené spuštění pod nadřazeným spuštěním. Každé podřízené spuštění může nezávisle protokolovat vlastní metriky.

Chcete-li tento přístup implementovat, ujistěte se, že každá úloha v Ray obdrží nezbytného klienta credentials a nadřazeného run_id. Toto nastavení vytvoří hierarchický vztah nadřazený-podřízený mezi spuštěními. Následující fragment kódu ukazuje, jak načíst credentials a předat nadřazenou run_id:

from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"

mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
   import os
  # Set the MLflow credentials within the Ray task
   os.environ.update(mlflow_db_creds)
  # Set the active MLflow experiment within each Ray task
   mlflow.set_experiment(experiment_name)
  # Create nested child runs associated with the parent run_id
   with mlflow.start_run(run_id=run_id, nested=True):
    # Log metrics to the child run within the Ray task
       mlflow.log_metric("x", x)

  return x

# Start parent run on the main driver process
with mlflow.start_run() as run:
  # Pass the parent run's run_id to each Ray task
   results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Ray Train a MLflow

Nejjednodušší způsob, jak protokolovat modely Ray Train do MLflow, je použít kontrolní bod vygenerovaný spuštěním trénování. Po dokončení trénování znovu načtěte model do jeho nativní architektury hlubokého učení (například PyTorch nebo TensorFlow) a pak ho protokolujte s odpovídajícím kódem MLflow.

Tento přístup zajišťuje, že se model uloží správně a je připravený k vyhodnocení nebo nasazení.

Následující kód znovu načte model z kontrolního bodu Ray Train a zaznamená ho do MLflow:

result = trainer.fit()

checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
     # Change as needed for different DL frameworks
    checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
    # Load the model from the checkpoint
    model = MyModel.load_from_checkpoint(checkpoint_path)

with mlflow.start_run() as run:
    # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

I když je obecně osvědčeným postupem odesílat objekty zpět do uzlu ovladače, s Ray Train je uložení konečných výsledků jednodušší než celá historie trénování z pracovního procesu.

Chcete-li uložit více modelů z trénovacího spuštění, zadejte počet kontrolních bodů, které se mají v objektu ray.train.CheckpointConfig. Modely je pak možné číst a protokolovat stejným způsobem jako ukládání jednoho modelu.

Poznámka:

MLflow nezodpovědí za zpracování odolnosti proti chybám během trénování modelu, ale za sledování životního cyklu modelu. Odolnost proti chybám je místo toho spravována samotným Ray Trainem.

Pokud chcete uložit trénovací metriky určené Ray Trainem, načtěte je z výsledného objektu a uložte je pomocí MLflow.

result = trainer.fit()

with mlflow.start_run() as run:
    mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))

  # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Pokud chcete správně nakonfigurovat clustery Spark a Ray a zabránit problémům s přidělováním prostředků, měli byste nastavení upravit resources_per_worker . Konkrétně set počet procesorů pro každého pracovníka Ray, aby byl jeden menší než celkový počet procesorů dostupných na pracovním uzlu Ray. Tato úprava je zásadní, protože pokud trenér rezervuje všechna dostupná jádra pro ray herce, může vést k chybám kolize prostředků.

Ray Tune a MLflow

Integrace Ray Tune s MLflow umožňuje efektivně sledovat a protokolovat experimenty ladění hyperparametrů v Databricks. Tato integrace využívá funkce MLflow pro sledování experimentů k zaznamenávání metrik a výsledků přímo z úloh Ray.

Přístup s podřízeným spuštěním pro protokolování

Podobně jako protokolování z úloh Ray Core můžou aplikace Ray Tune používat přístup pro spouštění dětí k protokolování metrik z každé zkušební verze nebo ladění iterace. K implementaci přístupu s podřízeným spuštěním použijte následující kroky:

  1. Vytvoření nadřazeného spuštění: Inicializace nadřazeného spuštění v procesu ovladače Toto spuštění slouží jako hlavní kontejner pro všechna následná podřízená spuštění.
  2. Podřízená spuštění protokolu: Každá úloha Ray Tune vytvoří podřízené spuštění pod nadřazeným spuštěním a udržuje jasnou hierarchii výsledků experimentu.

Následující příklad ukazuje, jak ověřovat a protokolovat z úloh Ray Tune pomocí MLflow.

import os
import tempfile
import time

import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars

from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow

mlflow_db_creds = get_databricks_env_vars("databricks")

EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)

def evaluation_fn(step, width, height):
   return (0.1 + width * step / 100) ** (-1) + height * 0.1

def train_function_mlflow(config, run_id):
   os.environ.update(mlflow_db_creds)
   mlflow.set_experiment(EXPERIMENT_NAME)

   # Hyperparameters
   width = config["width"]
   height = config["height"]

   with mlflow.start_run(run_id=run_id, nested=True):
       for step in range(config.get("steps", 100)):
           # Iterative training function - can be any arbitrary training procedure
           intermediate_score = evaluation_fn(step, width, height)
           # Log the metrics to MLflow
           mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
           # Feed the score back to Tune.
           train.report({"iterations": step, "mean_loss": intermediate_score})
           time.sleep(0.1)

def tune_with_setup(run_id, finish_fast=True):
   os.environ.update(mlflow_db_creds)
   # Set the experiment or create a new one if it does not exist.
   mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)

   tuner = tune.Tuner(
       tune.with_parameter(train_function_mlflow, run_id),
       tune_config=tune.TuneConfig(num_samples=5),
       run_config=train.RunConfig(
           name="mlflow",
       ),
       param_space={
           "width": tune.randint(10, 100),
           "height": tune.randint(0, 100),
           "steps": 20 if finish_fast else 100,
       },
   )
   results = tuner.fit()

with mlflow.start_run() as run:
   mlflow_tracking_uri = mlflow.get_tracking_uri()
   tune_with_setup(run.info.run_id)

Obsluha modelu

Použití Ray Serve v clusterech Databricks pro odvozování v reálném čase představuje problémy způsobené zabezpečením sítě a omezeními připojení při interakci s externími aplikacemi.

Databricks doporučuje používat službu Model Serving k nasazení modelů strojového učení v produkčním prostředí do koncového bodu rozhraní REST API. Další informace najdete v tématu Nasazení vlastních modelů.