Integrace MLflow a Ray
MLflow je opensourcová platforma pro správu úloh strojového učení a umělé inteligence. Kombinace Rayu s MLflow umožňuje distribuovat úlohy pomocí Ray a sledovat modely, metriky, parametersa metadata vygenerovaná během trénování pomocí MLflow.
Tento článek popisuje, jak integrovat MLflow s následujícími komponentami Ray:
Ray Core: Distribuované aplikace pro obecné účely, které nejsou pokryty Ray Tune a Ray Train
Ray Train: Trénování distribuovaného modelu
Ray Tune: Distribuované ladění hyperparametrů
Obsluha modelů: Nasazování modelů pro odvozování v reálném čase
Integrace Ray Core a MLflow
Ray Core poskytuje základní stavební bloky pro distribuované aplikace pro obecné účely. Umožňuje škálovat funkce a třídy Pythonu napříč několika uzly.
Tato část popisuje následující vzory integrace Ray Core a MLflow:
- Protokolování modelů MLflow z procesu ovladače Ray
- Protokolování modelů MLflow z podřízených spuštění
Protokolování MLflow z procesu ovladače Ray
Obecně je nejlepší protokolovat modely MLflow z procesu ovladače místo z pracovních uzlů. Důvodem je větší složitost předávání stavových odkazů vzdáleným pracovníkům.
Například následující kód selže, protože server pro sledování MLflow není inicializován pomocí MLflow Client
z pracovních uzlů.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# This method will fail
mlflow.log_metric("x", x)
return x
with mlflow.start_run() as run:
ray.get([example_logging_task.remote(x) for x in range(10)])
Místo toho vraťte metriky do uzlu ovladače. Metriky a metadata jsou obecně dostatečně malé, aby se přenesly zpět na ovladač, aniž by to způsobilo problémy s pamětí.
Podívejte se na výše uvedený příklad a update protokolovat vrácené metriky z úlohy Ray:
import mlflow
@ray.remote
def example_logging_task(x):
# ...
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
U úloh, které vyžadují ukládání velkých artefaktů, jako jsou velké tablePandas, obrázky, grafy nebo modely, doporučuje Databricks zachovat artefakt jako soubor. Pak buď znovu načtěte artefakt v kontextu ovladače, nebo přímo protokolujte objekt pomocí MLflow zadáním cesty k uloženému souboru.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
f.write(myLargeObject)
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
# Directly log the saved file by specifying the path
mlflow.log_artifact("/dbfs/myLargeFilePath.txt")
Úlohy Log Ray jako podřízená spuštění MLflow
Ray Core můžete integrovat s MLflow pomocí podřízených spuštění. To zahrnuje následující kroky:
- Vytvoření nadřazeného spuštění: Inicializace nadřazeného spuštění v procesu ovladače Toto spuštění funguje jako hierarchický kontejner pro všechna následná podřízená spuštění.
- Vytvoření podřízených spuštění: V rámci každé úlohy Ray zahajte podřízené spuštění pod nadřazeným spuštěním. Každé podřízené spuštění může nezávisle protokolovat vlastní metriky.
Chcete-li tento přístup implementovat, ujistěte se, že každá úloha v Ray obdrží nezbytného klienta credentials a nadřazeného run_id
. Toto nastavení vytvoří hierarchický vztah nadřazený-podřízený mezi spuštěními. Následující fragment kódu ukazuje, jak načíst credentials a předat nadřazenou run_id
:
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
# Set the MLflow credentials within the Ray task
os.environ.update(mlflow_db_creds)
# Set the active MLflow experiment within each Ray task
mlflow.set_experiment(experiment_name)
# Create nested child runs associated with the parent run_id
with mlflow.start_run(run_id=run_id, nested=True):
# Log metrics to the child run within the Ray task
mlflow.log_metric("x", x)
return x
# Start parent run on the main driver process
with mlflow.start_run() as run:
# Pass the parent run's run_id to each Ray task
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Ray Train a MLflow
Nejjednodušší způsob, jak protokolovat modely Ray Train do MLflow, je použít kontrolní bod vygenerovaný spuštěním trénování. Po dokončení trénování znovu načtěte model do jeho nativní architektury hlubokého učení (například PyTorch nebo TensorFlow) a pak ho protokolujte s odpovídajícím kódem MLflow.
Tento přístup zajišťuje, že se model uloží správně a je připravený k vyhodnocení nebo nasazení.
Následující kód znovu načte model z kontrolního bodu Ray Train a zaznamená ho do MLflow:
result = trainer.fit()
checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
# Change as needed for different DL frameworks
checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
# Load the model from the checkpoint
model = MyModel.load_from_checkpoint(checkpoint_path)
with mlflow.start_run() as run:
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
I když je obecně osvědčeným postupem odesílat objekty zpět do uzlu ovladače, s Ray Train je uložení konečných výsledků jednodušší než celá historie trénování z pracovního procesu.
Chcete-li uložit více modelů z trénovacího spuštění, zadejte počet kontrolních bodů, které se mají v objektu ray.train.CheckpointConfig
. Modely je pak možné číst a protokolovat stejným způsobem jako ukládání jednoho modelu.
Poznámka:
MLflow nezodpovědí za zpracování odolnosti proti chybám během trénování modelu, ale za sledování životního cyklu modelu. Odolnost proti chybám je místo toho spravována samotným Ray Trainem.
Pokud chcete uložit trénovací metriky určené Ray Trainem, načtěte je z výsledného objektu a uložte je pomocí MLflow.
result = trainer.fit()
with mlflow.start_run() as run:
mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Pokud chcete správně nakonfigurovat clustery Spark a Ray a zabránit problémům s přidělováním prostředků, měli byste nastavení upravit resources_per_worker
. Konkrétně set počet procesorů pro každého pracovníka Ray, aby byl jeden menší než celkový počet procesorů dostupných na pracovním uzlu Ray. Tato úprava je zásadní, protože pokud trenér rezervuje všechna dostupná jádra pro ray herce, může vést k chybám kolize prostředků.
Ray Tune a MLflow
Integrace Ray Tune s MLflow umožňuje efektivně sledovat a protokolovat experimenty ladění hyperparametrů v Databricks. Tato integrace využívá funkce MLflow pro sledování experimentů k zaznamenávání metrik a výsledků přímo z úloh Ray.
Přístup s podřízeným spuštěním pro protokolování
Podobně jako protokolování z úloh Ray Core můžou aplikace Ray Tune používat přístup pro spouštění dětí k protokolování metrik z každé zkušební verze nebo ladění iterace. K implementaci přístupu s podřízeným spuštěním použijte následující kroky:
- Vytvoření nadřazeného spuštění: Inicializace nadřazeného spuštění v procesu ovladače Toto spuštění slouží jako hlavní kontejner pro všechna následná podřízená spuštění.
- Podřízená spuštění protokolu: Každá úloha Ray Tune vytvoří podřízené spuštění pod nadřazeným spuštěním a udržuje jasnou hierarchii výsledků experimentu.
Následující příklad ukazuje, jak ověřovat a protokolovat z úloh Ray Tune pomocí MLflow.
import os
import tempfile
import time
import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars
from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow
mlflow_db_creds = get_databricks_env_vars("databricks")
EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)
def evaluation_fn(step, width, height):
return (0.1 + width * step / 100) ** (-1) + height * 0.1
def train_function_mlflow(config, run_id):
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(EXPERIMENT_NAME)
# Hyperparameters
width = config["width"]
height = config["height"]
with mlflow.start_run(run_id=run_id, nested=True):
for step in range(config.get("steps", 100)):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Log the metrics to MLflow
mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
# Feed the score back to Tune.
train.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)
def tune_with_setup(run_id, finish_fast=True):
os.environ.update(mlflow_db_creds)
# Set the experiment or create a new one if it does not exist.
mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
tuner = tune.Tuner(
tune.with_parameter(train_function_mlflow, run_id),
tune_config=tune.TuneConfig(num_samples=5),
run_config=train.RunConfig(
name="mlflow",
),
param_space={
"width": tune.randint(10, 100),
"height": tune.randint(0, 100),
"steps": 20 if finish_fast else 100,
},
)
results = tuner.fit()
with mlflow.start_run() as run:
mlflow_tracking_uri = mlflow.get_tracking_uri()
tune_with_setup(run.info.run_id)
Obsluha modelu
Použití Ray Serve v clusterech Databricks pro odvozování v reálném čase představuje problémy způsobené zabezpečením sítě a omezeními připojení při interakci s externími aplikacemi.
Databricks doporučuje používat službu Model Serving k nasazení modelů strojového učení v produkčním prostředí do koncového bodu rozhraní REST API. Další informace najdete v tématu Nasazení vlastních modelů.