Integrieren von MLflow und Ray
MLflow ist eine Open-Source-Plattform zum Verwalten von Machine Learning und KI-Workloads. Durch die Kombination von Ray mit MLflow können Sie Workloads mit Ray und Track-Modellen, Metriken, Parametern und Metadaten verteilen, die während der Schulung mit MLflow generiert werden.
In diesem Artikel wird beschrieben, wie MLflow mit den folgenden Ray-Komponenten integriert wird:
Ray Core: Allgemeine verteilte Anwendungen, die nicht von Ray Tune und Ray Train abgedeckt werden
Ray Train: Verteilte Modellschulung
Ray Tune: Verteilte Hyperparameteroptimierung
Modellbereitstellung: Bereitstellen von Modellen für echtzeitbasierten Rückschluss
Integrieren von Ray Core und MLflow
Ray Core bietet die grundlegenden Bausteine für allgemeine verteilte Anwendungen. Sie können Python-Funktionen und -Klassen auf mehrere Knoten skalieren.
In diesem Abschnitt werden die folgenden Muster zur Integration von Ray Core und MLflow beschrieben:
- Protokollieren von MLflow-Modellen aus dem Ray-Treiberprozess
- Protokollieren von MLflow-Modellen aus untergeordneten Ausführungen
Protokollieren von MLflow aus dem Ray-Treiberprozess
Es ist im Allgemeinen am besten, MLflow-Modelle aus dem Treiberprozess statt von Arbeitsknoten zu protokollieren. Dies liegt an der zusätzlichen Komplexität der Übergabe zustandsbehafteter Verweise an Remotearbeitskräfte.
Der folgende Code schlägt z. B. fehl, da der MLflow-Tracking-Server nicht mithilfe vonMLflow Client
von den Arbeitsknoten initialisiert wird.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# This method will fail
mlflow.log_metric("x", x)
return x
with mlflow.start_run() as run:
ray.get([example_logging_task.remote(x) for x in range(10)])
Geben Sie stattdessen die Metriken an den Treiberknoten zurück. Die Metriken und Metadaten sind im Allgemeinen klein genug, um ohne Arbeitsspeicherprobleme zurück an den Treiber zu übertragen.
Nehmen Sie sich das oben gezeigte Beispiel an, und aktualisieren Sie es, um die zurückgegebenen Metriken aus einer Ray-Aufgabe zu protokollieren:
import mlflow
@ray.remote
def example_logging_task(x):
# ...
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
Für Aufgaben, für die große Artefakte gespeichert werden müssen, z. B. eine große Pandas-Tabelle, Bilder, Plots oder Modelle, empfiehlt Databricks, das Artefakt als Datei beizubehalten. Laden Sie dann entweder das Artefakt im Treiberkontext neu oder protokollieren Sie das Objekt direkt mit MLflow, indem Sie den Pfad zur gespeicherten Datei angeben.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
f.write(myLargeObject)
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
# Directly log the saved file by specifying the path
mlflow.log_artifact("/dbfs/myLargeFilePath.txt")
Log Ray-Aufgaben als untergeordnete MLflow-Ausführung
Sie können Ray Core mit MLflow integrieren, indem Sie untergeordnete Ausführungen verwenden. Dieser Vorgang umfasst die folgenden Schritte:
- Erstellen Sie eine übergeordnete Ausführung: Initialisieren sie eine übergeordnete Ausführung im Treiberprozess. Diese Ausführung fungiert als hierarchischer Container für alle nachfolgenden untergeordneten Ausführungen.
- Untergeordnete Ausführung erstellen: Initiieren Sie innerhalb jeder Ray-Aufgabe eine untergeordnete Ausführung unter der übergeordneten Ausführung. Jede untergeordnete Ausführung kann unabhängig voneinander eigene Metriken protokollieren.
Um diesen Ansatz zu implementieren, stellen Sie sicher, dass jede Ray-Aufgabe die erforderlichen Client-Anmeldedaten und das übergeordnete run_id
empfängt. Mit diesem Setup wird die hierarchische Beziehung zwischen ausgeführten übergeordneten und untergeordneten Elementen hergestellt. Der folgende Codeausschnitt veranschaulicht, wie die Anmelddaten abgerufen und das übergeordnete run_id
weitergegeben wird:
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
# Set the MLflow credentials within the Ray task
os.environ.update(mlflow_db_creds)
# Set the active MLflow experiment within each Ray task
mlflow.set_experiment(experiment_name)
# Create nested child runs associated with the parent run_id
with mlflow.start_run(run_id=run_id, nested=True):
# Log metrics to the child run within the Ray task
mlflow.log_metric("x", x)
return x
# Start parent run on the main driver process
with mlflow.start_run() as run:
# Pass the parent run's run_id to each Ray task
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Ray Train und MLflow
Die einfachste Möglichkeit zum Protokollieren der Ray Train-Modelle bei MLflow besteht darin, den Prüfpunkt zu verwenden, der durch den Trainingslauf generiert wird. Laden Sie nach Abschluss des Trainings das Modell in seinem systemeigenen Deep Learning-Framework neu (z. B. PyTorch oder TensorFlow) und protokollieren Sie es dann mit dem entsprechenden MLflow-Code.
Mit diesem Ansatz wird sichergestellt, dass das Modell korrekt gespeichert und für die Auswertung oder Bereitstellung bereit ist.
Der folgende Code lädt ein Modell aus einem Ray Train-Prüfpunkt neu und protokolliert es in MLflow:
result = trainer.fit()
checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
# Change as needed for different DL frameworks
checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
# Load the model from the checkpoint
model = MyModel.load_from_checkpoint(checkpoint_path)
with mlflow.start_run() as run:
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Obwohl es im Allgemeinen eine bewährte Methode ist, Objekte zurück an den Treiberknoten zu senden, ist mit Ray Train das Speichern der endgültigen Ergebnisse einfacher als die gesamte Trainingshistorie aus dem Workerprozess.
Um mehrere Modelle aus einer Trainingsausführung zu speichern, geben Sie die Anzahl der Prüfpunkte an, die in ray.train.CheckpointConfig
beibehalten werden sollen. Die Modelle können dann genauso gelesen und protokolliert werden wie das Speichern eines einzelnen Modells.
Hinweis
MLflow ist nicht verantwortlich für die Behandlung der Fehlertoleranz während der Modellschulung, sondern für die Nachverfolgung des Lebenszyklus des Modells. Fehlertoleranz wird stattdessen von Ray Train selbst verwaltet.
Um die von Ray Train angegebenen Trainingsmetriken zu speichern, rufen Sie sie aus dem Ergebnisobjekt ab und speichern sie mithilfe von MLflow.
result = trainer.fit()
with mlflow.start_run() as run:
mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Um Ihre Spark- und Ray-Cluster ordnungsgemäß zu konfigurieren und Ressourcenzuordnungsprobleme zu verhindern, sollten Sie die resources_per_worker
-Einstellung anpassen. Legen Sie insbesondere die Anzahl der CPUs für jeden Ray-Worker auf einen kleiner als die Gesamtanzahl der CPUs fest, die auf einem Ray-Workerknoten verfügbar sind. Diese Anpassung ist entscheidend, denn wenn der Trainer alle verfügbaren Kerne für Ray-Akteure reserviert, kann es zu Ressourcenkonfliktfehlern führen.
Ray Tune und MLflow
Durch die Integration von Ray Tune in MLflow können Sie Hyperparameter-Optimierungsexperimente innerhalb von Databricks effizient nachverfolgen und protokollieren. Diese Integration nutzt die Experimentverfolgungsfunktionen von MLflow, um Metriken und Ergebnisse direkt aus Ray-Aufgaben aufzuzeichnen.
Ansatz für die untergeordnete Ausführung für die Protokollierung
Ähnlich wie bei der Protokollierung von Ray Core-Aufgaben können Ray Tune-Anwendungen einen untergeordneten Ansatz verwenden, um Metriken aus jeder Testversion oder Optimierungsiteration zu protokollieren. Führen Sie die folgenden Schritte aus, um einen Ansatz für die untergeordnete Ausführung zu implementieren:
- Erstellen Sie eine übergeordnete Ausführung: Initialisieren sie eine übergeordnete Ausführung im Treiberprozess. Diese Ausführung dient als Hauptcontainer für alle nachfolgenden untergeordneten Ausführungen.
- Untergeordnete Protokollausführung: Jede Ray Tune-Aufgabe erstellt eine untergeordnete Ausführung unter der übergeordneten Ausführung, wobei eine klare Hierarchie von Experimentergebnissen beibehalten wird.
Im folgenden Beispiel wird veranschaulicht, wie Sie sich mithilfe von MLflow bei Ray Tune-Aufgaben authentifizieren und protokollieren.
import os
import tempfile
import time
import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars
from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow
mlflow_db_creds = get_databricks_env_vars("databricks")
EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)
def evaluation_fn(step, width, height):
return (0.1 + width * step / 100) ** (-1) + height * 0.1
def train_function_mlflow(config, run_id):
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(EXPERIMENT_NAME)
# Hyperparameters
width = config["width"]
height = config["height"]
with mlflow.start_run(run_id=run_id, nested=True):
for step in range(config.get("steps", 100)):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Log the metrics to MLflow
mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
# Feed the score back to Tune.
train.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)
def tune_with_setup(run_id, finish_fast=True):
os.environ.update(mlflow_db_creds)
# Set the experiment or create a new one if it does not exist.
mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
tuner = tune.Tuner(
tune.with_parameter(train_function_mlflow, run_id),
tune_config=tune.TuneConfig(num_samples=5),
run_config=train.RunConfig(
name="mlflow",
),
param_space={
"width": tune.randint(10, 100),
"height": tune.randint(0, 100),
"steps": 20 if finish_fast else 100,
},
)
results = tuner.fit()
with mlflow.start_run() as run:
mlflow_tracking_uri = mlflow.get_tracking_uri()
tune_with_setup(run.info.run_id)
Modellbereitstellung
Die Verwendung von Ray Serve auf Databricks-Clustern in Echtzeit stellt Herausforderungen aufgrund von Netzwerksicherheits- und Konnektivitätseinschränkungen bei der Interaktion mit externen Anwendungen dar.
Databricks empfiehlt die Verwendung von Model Serving zum Bereitstellen von Machine Learning-Modellen in der Produktion auf einem REST-API-Endpunkt. Weitere Informationen finden Sie unter Bereitstellen von angepassten Modellen.