MLflow en Ray integreren
MLflow is een opensource-platform voor het beheren van machine learning- en AI-workloads. Door Ray te combineren met MLflow kunt u workloads distribueren met Ray en modellen, metrische gegevens, parameters en metagegevens bijhouden die tijdens de training met MLflow worden gegenereerd.
In dit artikel wordt beschreven hoe u MLflow integreert met de volgende Ray-onderdelen:
Ray Core: Gedistribueerde toepassingen voor algemeen gebruik die niet worden gedekt door Ray Tune en Ray Train
Ray Train: Training voor gedistribueerd model
Ray Tune: Gedistribueerde hyperparameterafstemming
Modelverdiening: Modellen implementeren voor realtime deductie
Ray Core en MLflow integreren
Ray Core biedt de basisbouwstenen voor gedistribueerde toepassingen voor algemeen gebruik. Hiermee kunt u Python-functies en -klassen schalen op meerdere knooppunten.
In deze sectie worden de volgende patronen beschreven om Ray Core en MLflow te integreren:
- MLflow-modellen vastleggen vanuit het Ray-stuurprogrammaproces
- MLflow-modellen vastleggen van onderliggende uitvoeringen
Log MLflow van het Ray-stuurprogrammaproces
Het is over het algemeen het beste om MLflow-modellen te registreren vanuit het stuurprogrammaproces in plaats van van werkknooppunten. Dit komt door de extra complexiteit van het doorgeven van stateful verwijzingen naar de externe werknemers.
De volgende code mislukt bijvoorbeeld omdat de MLflow-traceringsserver niet wordt geïnitialiseerd met behulp van de MLflow Client
server vanuit werkknooppunten.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# This method will fail
mlflow.log_metric("x", x)
return x
with mlflow.start_run() as run:
ray.get([example_logging_task.remote(x) for x in range(10)])
Retourneer in plaats daarvan de metrische gegevens naar het stuurprogrammaknooppunt. De metrische gegevens en metagegevens zijn over het algemeen klein genoeg om terug te zetten naar het stuurprogramma zonder geheugenproblemen te veroorzaken.
Neem het bovenstaande voorbeeld en werk het bij om de geretourneerde metrische gegevens van een Ray-taak te registreren:
import mlflow
@ray.remote
def example_logging_task(x):
# ...
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
Voor taken waarvoor grote artefacten moeten worden opgeslagen, zoals een grote Pandas-tabel, afbeeldingen, plots of modellen, raadt Databricks aan om het artefact als een bestand te behouden. Laad vervolgens het artefact opnieuw in de stuurprogrammacontext of meld het object rechtstreeks aan met MLflow door het pad naar het opgeslagen bestand op te geven.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
f.write(myLargeObject)
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
# Directly log the saved file by specifying the path
mlflow.log_artifact("/dbfs/myLargeFilePath.txt")
Log Ray-taken als onderliggende MLflow-uitvoeringen
U kunt Ray Core integreren met MLflow met behulp van onderliggende uitvoeringen. Dit omvat de volgende stappen:
- Een bovenliggende uitvoering maken: initialiseer een bovenliggende uitvoering in het stuurprogrammaproces. Deze uitvoering fungeert als een hiërarchische container voor alle volgende onderliggende uitvoeringen.
- Onderliggende uitvoeringen maken: in elke Ray-taak start u een onderliggende uitvoering onder de bovenliggende uitvoering. Elke onderliggende uitvoering kan onafhankelijk zijn eigen metrische gegevens vastleggen.
Als u deze aanpak wilt implementeren, moet u ervoor zorgen dat elke Ray-taak de benodigde klantgegevens en de bovenliggende entiteit run_id
ontvangt. Met deze instelling wordt de hiërarchische relatie tussen bovenliggende en onderliggende items tussen uitvoeringen ingesteld. Het volgende codefragment laat zien hoe u de referenties ophaalt en het bovenliggende element run_id
doorgeeft.
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
# Set the MLflow credentials within the Ray task
os.environ.update(mlflow_db_creds)
# Set the active MLflow experiment within each Ray task
mlflow.set_experiment(experiment_name)
# Create nested child runs associated with the parent run_id
with mlflow.start_run(run_id=run_id, nested=True):
# Log metrics to the child run within the Ray task
mlflow.log_metric("x", x)
return x
# Start parent run on the main driver process
with mlflow.start_run() as run:
# Pass the parent run's run_id to each Ray task
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Ray Train en MLflow
De eenvoudigste manier om de Ray Train-modellen te registreren bij MLflow, is door het controlepunt te gebruiken dat is gegenereerd door de trainingsuitvoering. Nadat de training is voltooid, laadt u het model opnieuw in het systeemeigen Deep Learning-framework (zoals PyTorch of TensorFlow) en meldt u het vervolgens aan met de bijbehorende MLflow-code.
Deze aanpak zorgt ervoor dat het model correct wordt opgeslagen en gereed is voor evaluatie of implementatie.
Met de volgende code wordt een model opnieuw geladen vanuit een Ray Train-controlepunt en wordt dit vastgelegd in MLflow:
result = trainer.fit()
checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
# Change as needed for different DL frameworks
checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
# Load the model from the checkpoint
model = MyModel.load_from_checkpoint(checkpoint_path)
with mlflow.start_run() as run:
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Hoewel het over het algemeen een best practice is om objecten terug te sturen naar het stuurprogrammaknooppunt, is het opslaan van de uiteindelijke resultaten eenvoudiger dan de hele trainingsgeschiedenis van het werkproces.
Als u meerdere modellen wilt opslaan vanuit een trainingsuitvoering, geeft u het aantal controlepunten op dat moet worden bewaard in de ray.train.CheckpointConfig
. De modellen kunnen vervolgens op dezelfde manier worden gelezen en geregistreerd als het opslaan van één model.
Notitie
MLflow is niet verantwoordelijk voor het afhandelen van fouttolerantie tijdens het trainen van modellen, maar voor het bijhouden van de levenscyclus van het model. Fouttolerantie wordt in plaats daarvan beheerd door Ray Train zelf.
Als u de metrische trainingsgegevens wilt opslaan die zijn opgegeven door Ray Train, haalt u deze op uit het resultaatobject en slaat u ze op met behulp van MLflow.
result = trainer.fit()
with mlflow.start_run() as run:
mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Als u uw Spark- en Ray-clusters correct wilt configureren en problemen met resourcetoewijzing wilt voorkomen, moet u de resources_per_worker
instelling aanpassen. Stel specifiek het aantal CPU's voor elke Ray-worker in op één minder dan het totale aantal CPU's dat beschikbaar is op een Ray-werkknooppunt. Deze aanpassing is van cruciaal belang omdat als de trainer alle beschikbare kernen voor Ray-acteurs reserveert, dit kan leiden tot conflicten tussen resources.
Ray Tune en MLflow
Door Ray Tune te integreren met MLflow, kunt u efficiënt experimenten voor het afstemmen van hyperparameters in Databricks bijhouden en vastleggen. Deze integratie maakt gebruik van de mogelijkheden voor het bijhouden van experimenten van MLflow om metrische gegevens en resultaten rechtstreeks vanuit Ray-taken vast te leggen.
Benadering van onderliggende uitvoering voor logboekregistratie
Net als bij logboekregistratie van Ray Core-taken kunnen Ray Tune-toepassingen gebruikmaken van een benadering die wordt uitgevoerd om metrische gegevens te registreren vanuit elke proefversie of afstemmingsiteratie. Gebruik de volgende stappen om een onderliggende benadering te implementeren:
- Een bovenliggende uitvoering maken: initialiseer een bovenliggende uitvoering in het stuurprogrammaproces. Deze uitvoering fungeert als de hoofdcontainer voor alle volgende onderliggende uitvoeringen.
- Onderliggende logboekuitvoeringen: elke Ray Tune-taak maakt een onderliggende uitvoering onder de bovenliggende uitvoering, met behoud van een duidelijke hiërarchie van experimentresultaten.
In het volgende voorbeeld ziet u hoe u met MLflow verifieert en aanmeldt bij Ray Tune-taken.
import os
import tempfile
import time
import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars
from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow
mlflow_db_creds = get_databricks_env_vars("databricks")
EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)
def evaluation_fn(step, width, height):
return (0.1 + width * step / 100) ** (-1) + height * 0.1
def train_function_mlflow(config, run_id):
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(EXPERIMENT_NAME)
# Hyperparameters
width = config["width"]
height = config["height"]
with mlflow.start_run(run_id=run_id, nested=True):
for step in range(config.get("steps", 100)):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Log the metrics to MLflow
mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
# Feed the score back to Tune.
train.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)
def tune_with_setup(run_id, finish_fast=True):
os.environ.update(mlflow_db_creds)
# Set the experiment or create a new one if it does not exist.
mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
tuner = tune.Tuner(
tune.with_parameter(train_function_mlflow, run_id),
tune_config=tune.TuneConfig(num_samples=5),
run_config=train.RunConfig(
name="mlflow",
),
param_space={
"width": tune.randint(10, 100),
"height": tune.randint(0, 100),
"steps": 20 if finish_fast else 100,
},
)
results = tuner.fit()
with mlflow.start_run() as run:
mlflow_tracking_uri = mlflow.get_tracking_uri()
tune_with_setup(run.info.run_id)
Model serveren
Het gebruik van Ray Serve op Databricks-clusters voor realtime deductie brengt uitdagingen met zich mee als gevolg van netwerkbeveiligings- en connectiviteitsbeperkingen bij interactie met externe toepassingen.
Databricks raadt het gebruik van Model Serving aan om machine learning-modellen in productie te implementeren naar een REST API-eindpunt. Zie Aangepaste modellen implementeren voor meer informatie.