Integrera MLflow och Ray
MLflow är en öppen källkod plattform för hantering av maskininlärning och AI-arbetsbelastningar. Genom att kombinera Ray med MLflow kan du distribuera arbetsbelastningar med Ray och spåra modeller, mått, parametrar och metadata som genererats under träning med MLflow.
Den här artikeln beskriver hur du integrerar MLflow med följande Ray-komponenter:
Ray Core: Allmänna distribuerade program som inte omfattas av Ray Tune och Ray Train
Ray Train: Distribuerad modellträning
Ray Tune: Distribuerad hyperparameterjustering
Modellservering: Distribuera modeller för slutsatsdragning i realtid
Integrera Ray Core och MLflow
Ray Core tillhandahåller de grundläggande byggstenarna för allmänna distribuerade program. Det gör att du kan skala Python-funktioner och klasser över flera noder.
I det här avsnittet beskrivs följande mönster för att integrera Ray Core och MLflow:
- Logga MLflow-modeller från Ray-drivrutinsprocessen
- Logga MLflow-modeller från underordnade körningar
Logga MLflow från Ray-drivrutinsprocessen
Det är vanligtvis bäst att logga MLflow-modeller från drivrutinsprocessen i stället för från arbetsnoder. Detta beror på den extra komplexiteten i att skicka tillståndskänsliga referenser till distansarbetarna.
Följande kod misslyckas till exempel eftersom MLflow Tracking Server inte initieras med hjälp av MLflow Client
inifrån arbetsnoderna.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# This method will fail
mlflow.log_metric("x", x)
return x
with mlflow.start_run() as run:
ray.get([example_logging_task.remote(x) for x in range(10)])
Returnera i stället måtten till drivrutinsnoden. Måtten och metadata är i allmänhet tillräckligt små för att överföras tillbaka till drivrutinen utan att orsaka minnesproblem.
Ta exemplet ovan och uppdatera det för att logga de returnerade måtten från en Ray-uppgift:
import mlflow
@ray.remote
def example_logging_task(x):
# ...
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
För uppgifter som kräver att stora artefakter sparas, till exempel en stor Pandas-tabell, bilder, diagram eller modeller, rekommenderar Databricks att artefakten bevaras som en fil. Läs sedan in artefakten i drivrutinskontexten igen eller logga objektet direkt med MLflow genom att ange sökvägen till den sparade filen.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
f.write(myLargeObject)
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
# Directly log the saved file by specifying the path
mlflow.log_artifact("/dbfs/myLargeFilePath.txt")
Log Ray-uppgifter som underordnade MLflow-körningar
Du kan integrera Ray Core med MLflow med hjälp av underordnade körningar. Detta omfattar följande steg:
- Skapa en överordnad körning: Initiera en överordnad körning i drivrutinsprocessen. Den här körningen fungerar som en hierarkisk container för alla efterföljande underordnade körningar.
- Skapa underordnade körningar: Initiera en underordnad körning under den överordnade körningen inom varje Ray-uppgift. Varje underordnad körning kan separat logga sina egna mått.
Om du vill implementera den här metoden kontrollerar du att varje Ray-uppgift tar emot nödvändiga klientautentiseringsuppgifter och att den överordnade run_id
. Den här konfigurationen upprättar den hierarkiska överordnad-underordnade relationen mellan körningar. Följande kodfragment visar hur du hämtar autentiseringsuppgifterna och vidarebefordrar den överordnade run_id
:
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
# Set the MLflow credentials within the Ray task
os.environ.update(mlflow_db_creds)
# Set the active MLflow experiment within each Ray task
mlflow.set_experiment(experiment_name)
# Create nested child runs associated with the parent run_id
with mlflow.start_run(run_id=run_id, nested=True):
# Log metrics to the child run within the Ray task
mlflow.log_metric("x", x)
return x
# Start parent run on the main driver process
with mlflow.start_run() as run:
# Pass the parent run's run_id to each Ray task
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Ray Train och MLflow
Det enklaste sättet att logga Ray Train-modellerna till MLflow är att använda kontrollpunkten som genereras av träningskörningen. När träningskörningen är klar läser du in modellen igen i det interna djupinlärningsramverket (till exempel PyTorch eller TensorFlow) och loggar den sedan med motsvarande MLflow-kod.
Den här metoden säkerställer att modellen lagras korrekt och redo för utvärdering eller distribution.
Följande kod läser in en modell från en Kontrollpunkt för Ray Train och loggar den till MLflow:
result = trainer.fit()
checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
# Change as needed for different DL frameworks
checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
# Load the model from the checkpoint
model = MyModel.load_from_checkpoint(checkpoint_path)
with mlflow.start_run() as run:
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Även om det i allmänhet är en bra idé att skicka tillbaka objekt till drivrutinsnoden med Ray Train är det enklare att spara slutresultatet än hela träningshistoriken från arbetsprocessen.
Om du vill lagra flera modeller från en träningskörning anger du antalet kontrollpunkter som ska behållas i ray.train.CheckpointConfig
. Modellerna kan sedan läsas och loggas på samma sätt som när en enskild modell lagras.
Kommentar
MLflow ansvarar inte för att hantera feltolerans under modellträningen, utan för att spåra modellens livscykel. Feltolerans hanteras i stället av Själva Ray Train.
Om du vill lagra träningsmåtten som anges av Ray Train hämtar du dem från resultatobjektet och lagrar dem med hjälp av MLflow.
result = trainer.fit()
with mlflow.start_run() as run:
mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Om du vill konfigurera Spark- och Ray-kluster korrekt och förhindra resursallokeringsproblem bör du justera inställningen resources_per_worker
. Mer specifikt anger du att antalet processorer för varje Ray-arbetare är ett mindre än det totala antalet processorer som är tillgängliga på en Ray-arbetsnod. Den här justeringen är avgörande eftersom om tränaren reserverar alla tillgängliga kärnor för Ray-aktörer kan det leda till resurskonkurrensfel.
Ray Tune och MLflow
Genom att integrera Ray Tune med MLflow kan du effektivt spåra och logga hyperparameterjusteringsexperiment i Databricks. Den här integreringen utnyttjar MLflows funktioner för experimentspårning för att registrera mått och resultat direkt från Ray-uppgifter.
Underordnad körningsmetod för loggning
På samma sätt som med loggning från Ray Core-uppgifter kan Ray Tune-program använda en underordnad körningsmetod för att logga mått från varje utvärderingsversion eller justera iterationen. Använd följande steg för att implementera en underordnad körningsmetod:
- Skapa en överordnad körning: Initiera en överordnad körning i drivrutinsprocessen. Den här körningen fungerar som huvudcontainer för alla efterföljande underordnade körningar.
- Loggunderordnad körning: Varje Ray Tune-uppgift skapar en underordnad körning under den överordnade körningen och upprätthåller en tydlig hierarki med experimentresultat.
I följande exempel visas hur du autentiserar och loggar från Ray Tune-uppgifter med MLflow.
import os
import tempfile
import time
import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars
from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow
mlflow_db_creds = get_databricks_env_vars("databricks")
EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)
def evaluation_fn(step, width, height):
return (0.1 + width * step / 100) ** (-1) + height * 0.1
def train_function_mlflow(config, run_id):
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(EXPERIMENT_NAME)
# Hyperparameters
width = config["width"]
height = config["height"]
with mlflow.start_run(run_id=run_id, nested=True):
for step in range(config.get("steps", 100)):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Log the metrics to MLflow
mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
# Feed the score back to Tune.
train.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)
def tune_with_setup(run_id, finish_fast=True):
os.environ.update(mlflow_db_creds)
# Set the experiment or create a new one if it does not exist.
mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
tuner = tune.Tuner(
tune.with_parameter(train_function_mlflow, run_id),
tune_config=tune.TuneConfig(num_samples=5),
run_config=train.RunConfig(
name="mlflow",
),
param_space={
"width": tune.randint(10, 100),
"height": tune.randint(0, 100),
"steps": 20 if finish_fast else 100,
},
)
results = tuner.fit()
with mlflow.start_run() as run:
mlflow_tracking_uri = mlflow.get_tracking_uri()
tune_with_setup(run.info.run_id)
Modellservering
Att använda Ray Serve i Databricks-kluster för slutsatsdragning i realtid innebär utmaningar på grund av nätverkssäkerhet och anslutningsbegränsningar när du interagerar med externa program.
Databricks rekommenderar att du använder Modellservering för att distribuera maskininlärningsmodeller i produktion till en REST API-slutpunkt. Mer information finns i Distribuera anpassade modeller.