Интеграция MLflow и Ray
MLflow — это платформа открытый код для управления рабочими нагрузками машинного обучения и искусственного интеллекта. Объединение Ray с MLflow позволяет распределять рабочие нагрузки с моделями Ray и отслеживать модели, метрики, parametersи метаданные, созданные во время обучения с помощью MLflow.
В этой статье описывается интеграция MLflow со следующими компонентами Ray:
Ray Core: распределенные приложения общего назначения, которые не охватываются Ray Tune и Ray Train
Рэй Обучение: обучение распределенной модели
Ray Tune: настройка распределенного гиперпараметра
Обслуживание моделей: развертывание моделей для вывода в режиме реального времени
Интеграция Ray Core и MLflow
Ray Core предоставляет базовые стандартные блоки для распределенных приложений общего назначения. Он позволяет масштабировать функции и классы Python на нескольких узлах.
В этом разделе описаны следующие шаблоны интеграции Ray Core и MLflow:
- Журнал моделей MLflow из процесса драйвера Ray
- Журнал моделей MLflow из дочерних запусков
Журнал MLflow из процесса драйвера Ray
Обычно рекомендуется регистрировать модели MLflow из процесса драйвера, а не из рабочих узлов. Это связано с добавленной сложностью передачи ссылок на удаленные работники с отслеживанием состояния.
Например, следующий код завершается ошибкой, так как сервер отслеживания MLflow не инициализирован с помощью MLflow Client
рабочих узлов.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# This method will fail
mlflow.log_metric("x", x)
return x
with mlflow.start_run() as run:
ray.get([example_logging_task.remote(x) for x in range(10)])
Вместо этого верните метрики на узел драйвера. Метрики и метаданные, как правило, достаточно малы, чтобы вернуться в драйвер, не вызывая проблем с памятью.
Выполните приведенный выше пример и update его для регистрации возвращаемых метрик из задачи Ray:
import mlflow
@ray.remote
def example_logging_task(x):
# ...
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
Для задач, требующих сохранения больших артефактов, таких как крупные tablePandas, изображения, графики или модели, Databricks рекомендует сохранить артефакт в виде файла. Затем перезагрузите артефакт в контексте драйвера или непосредственно зарегигируйте объект с помощью MLflow, указав путь к сохраненном файлу.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
f.write(myLargeObject)
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
# Directly log the saved file by specifying the path
mlflow.log_artifact("/dbfs/myLargeFilePath.txt")
Задачи Log Ray в качестве дочерних запусков MLflow
Вы можете интегрировать Ray Core с MLflow с помощью дочерних запусков. Для этого необходимо выполнить следующие шаги.
- Создайте родительский запуск: инициализация родительского запуска в процессе драйвера. Этот запуск выступает в качестве иерархического контейнера для всех последующих дочерних запусков.
- Создание дочерних запусков: в каждой задаче Ray инициируйте дочерний запуск под родительским запуском. Каждый дочерний запуск может самостоятельно регистрировать собственные метрики.
Для реализации этого подхода убедитесь, что каждая задача Ray получает необходимые credentials от клиента и run_id
от родителя. Эта настройка устанавливает иерархическую связь между выполнением родительского и дочернего элемента. В следующем фрагменте кода показано, как получить credentials и передать родительские run_id
:
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
# Set the MLflow credentials within the Ray task
os.environ.update(mlflow_db_creds)
# Set the active MLflow experiment within each Ray task
mlflow.set_experiment(experiment_name)
# Create nested child runs associated with the parent run_id
with mlflow.start_run(run_id=run_id, nested=True):
# Log metrics to the child run within the Ray task
mlflow.log_metric("x", x)
return x
# Start parent run on the main driver process
with mlflow.start_run() as run:
# Pass the parent run's run_id to each Ray task
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Рэй Поезд и MLflow
Самый простой способ регистрации моделей Ray Train в MLflow — использовать контрольную точку, созданную в ходе обучения. После завершения обучения перезагрузите модель в собственной платформе глубокого обучения (например, PyTorch или TensorFlow), а затем зайдите в журнал с соответствующим кодом MLflow.
Такой подход гарантирует, что модель хранится правильно и готова к оценке или развертыванию.
Следующий код перезагрузит модель из контрольной точки Ray Train и записывает ее в MLflow:
result = trainer.fit()
checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
# Change as needed for different DL frameworks
checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
# Load the model from the checkpoint
model = MyModel.load_from_checkpoint(checkpoint_path)
with mlflow.start_run() as run:
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Хотя обычно рекомендуется отправлять объекты обратно на узел драйвера, с помощью Ray Train, сохранение окончательных результатов проще, чем вся история обучения от рабочего процесса.
Чтобы сохранить несколько моделей из обучающего запуска, укажите количество контрольных точек, которые нужно сохранить в ray.train.CheckpointConfig
. Затем модели можно считывать и записывать в журнал так же, как и хранение одной модели.
Примечание.
MLflow не несет ответственности за обработку отказоустойчивости во время обучения модели, а не за отслеживание жизненного цикла модели. Отказоустойчивость вместо этого управляется самой Рэй Тренелом.
Чтобы сохранить метрики обучения, указанные Ray Train, извлеките их из объекта результата и сохраните их с помощью MLflow.
result = trainer.fit()
with mlflow.start_run() as run:
mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Чтобы правильно настроить кластеры Spark и Ray и предотвратить проблемы с выделением resources_per_worker
ресурсов, необходимо настроить этот параметр. В частности, set количество ЦП для каждой рабочей роли Ray должно быть меньше общего числа ЦП, доступных на рабочем узле Ray. Эта корректировка имеет решающее значение, так как если тренер резервирует все доступные ядра для субъектов Ray, это может привести к ошибкам в возникновении проблем с ресурсом.
Ray Tune и MLflow
Интеграция Ray Tune с MLflow позволяет эффективно отслеживать и регистрировать эксперименты по настройке гиперпараметров в Databricks. Эта интеграция использует возможности отслеживания экспериментов MLflow для записи метрик и результатов непосредственно из задач Ray.
Подход дочернего запуска для ведения журнала
Как и ведение журнала из задач Ray Core, приложения Ray Tune могут использовать дочерний подход к метрикам журналов из каждой пробной версии или настройки итерации. Выполните следующие действия для реализации дочернего подхода:
- Создайте родительский запуск: инициализация родительского запуска в процессе драйвера. Этот запуск служит основным контейнером для всех последующих дочерних запусков.
- Дочерние запуски журнала: каждая задача "Настройка луча" создает дочерний запуск под родительским запуском, поддерживая четкую иерархию результатов эксперимента.
В следующем примере показано, как выполнять проверку подлинности и журнал из задач Ray Tune с помощью MLflow.
import os
import tempfile
import time
import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars
from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow
mlflow_db_creds = get_databricks_env_vars("databricks")
EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)
def evaluation_fn(step, width, height):
return (0.1 + width * step / 100) ** (-1) + height * 0.1
def train_function_mlflow(config, run_id):
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(EXPERIMENT_NAME)
# Hyperparameters
width = config["width"]
height = config["height"]
with mlflow.start_run(run_id=run_id, nested=True):
for step in range(config.get("steps", 100)):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Log the metrics to MLflow
mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
# Feed the score back to Tune.
train.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)
def tune_with_setup(run_id, finish_fast=True):
os.environ.update(mlflow_db_creds)
# Set the experiment or create a new one if it does not exist.
mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
tuner = tune.Tuner(
tune.with_parameter(train_function_mlflow, run_id),
tune_config=tune.TuneConfig(num_samples=5),
run_config=train.RunConfig(
name="mlflow",
),
param_space={
"width": tune.randint(10, 100),
"height": tune.randint(0, 100),
"steps": 20 if finish_fast else 100,
},
)
results = tuner.fit()
with mlflow.start_run() as run:
mlflow_tracking_uri = mlflow.get_tracking_uri()
tune_with_setup(run.info.run_id)
Обслуживание моделей
Использование ray Service в кластерах Databricks для вывода в режиме реального времени вызывает проблемы из-за ограничений безопасности сети и подключений при взаимодействии с внешними приложениями.
Databricks рекомендует использовать модель обслуживания для развертывания моделей машинного обучения в рабочей среде в конечной точке REST API. Дополнительные сведения см. в разделе "Развертывание пользовательских моделей".