集成 MLflow 和 Ray
MLflow 是用于管理机器学习和 AI 工作负载的开源平台。 通过将 Ray 与 MLflow 结合使用,可以使用 Ray 分配工作负载,并在使用 MLflow 训练的过程中,跟踪生成的模型、指标、参数和元数据。
本文介绍了如何将 MLflow 与以下 Ray 组件集成:
集成 Ray Core 和 MLflow
Ray Core 为通用分布式应用程序提供了基础构建基块。 使用它可以跨多个节点缩放 Python 函数和类。
本部分介绍用于集成 Ray Core 和 MLflow 的以下模式:
- 记录 Ray 驱动程序进程的 MLflow 模型
- 记录子运行的 MLflow 模型
记录 Ray 驱动程序进程的 MLflow
通常最好从驱动程序进程记录 MLflow 模型,而不是从工作器节点记录。 这是因为向远程工作器传递有状态引用会增加复杂性。
例如,由于 MLflow 跟踪服务器未使用工作器节点内的 MLflow Client
进行初始化,以下代码失败。
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# This method will fail
mlflow.log_metric("x", x)
return x
with mlflow.start_run() as run:
ray.get([example_logging_task.remote(x) for x in range(10)])
相反,应将指标返回到驱动程序节点。 指标和元数据通常足够小,可以传输回到驱动程序,而不会造成内存问题。
以上述示例为例,更新它以记录来自 Ray 任务的返回指标:
import mlflow
@ray.remote
def example_logging_task(x):
# ...
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
对于需要保存大型项目的任务,例如大型 Pandas 表、图像、绘图或模型,Databricks 建议将项目保留为文件。 然后,通过指定已保存文件的路径,重新加载驱动程序上下文中的项目,或者直接使用 MLflow 来记录对象。
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
f.write(myLargeObject)
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
# Directly log the saved file by specifying the path
mlflow.log_artifact("/dbfs/myLargeFilePath.txt")
将 Ray 任务记录为 MLflow 子运行
可以使用子运行将 Ray Core 与 MLflow 集成。 这包括以下步骤:
- 创建父运行:在驱动程序进程中初始化父运行。 此运行充当所有后续子运行的分层容器。
- 创建子运行:在每个 Ray 任务中,启动父运行下的子运行。 每个子运行可以独立记录自身的指标。
若要实现此方法,请确保每个 Ray 任务都会收到必需的客户端凭据和父级 run_id
。 此设置可在运行之间建立分层父子关系。 以下代码片段演示如何检索凭据并传递至父级 run_id
:
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
# Set the MLflow credentials within the Ray task
os.environ.update(mlflow_db_creds)
# Set the active MLflow experiment within each Ray task
mlflow.set_experiment(experiment_name)
# Create nested child runs associated with the parent run_id
with mlflow.start_run(run_id=run_id, nested=True):
# Log metrics to the child run within the Ray task
mlflow.log_metric("x", x)
return x
# Start parent run on the main driver process
with mlflow.start_run() as run:
# Pass the parent run's run_id to each Ray task
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Ray Train 和 MLflow
将 Ray 训练模型记录到 MLflow 的最简单方式是使用训练运行生成的检查点。 训练运行完成后,在本机深度学习框架(如 PyTorch 或 TensorFlow)中重新加载模型,然后使用相应的 MLflow 代码,对其进行记录。
此方法可确保模型正确存储,并为评估或部署做好准备。
以下代码可从 Ray Train 检查点重新加载模型并将其记录到 MLflow:
result = trainer.fit()
checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
# Change as needed for different DL frameworks
checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
# Load the model from the checkpoint
model = MyModel.load_from_checkpoint(checkpoint_path)
with mlflow.start_run() as run:
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
尽管通常最佳做法是将对象发送回驱动程序节点,但在使用 Ray Train 的情况下,保存最终结果比从工作进程获取整个训练历史记录更容易。
若要存储来自训练运行的多个模型,请指定要保留在 ray.train.CheckpointConfig
中的检查点数量。 然后,可以采用与存储单个模型相同的方式,读取和记录这些模型。
注意
MLflow 不负责在模型训练过程中处理容错,而是负责跟踪模型的生命周期。 容错由 Ray Train 本身管理。
若要存储 Ray Train 指定的训练指标,请从结果对象中检索它们,并使用 MLflow 存储。
result = trainer.fit()
with mlflow.start_run() as run:
mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
若要正确配置 Spark 和 Ray 群集并防止资源分配问题,应调节 resources_per_worker
设置。 具体而言,应将每个 Ray 工作器的 CPU 数设置为比 Ray 工作器节点上可用的 CPU 总数小一。 这种调节至关重要,因为如果训练器保留了 Ray 执行组件的所有可用内核,则可能会导致资源争用错误。
Ray Tune 和 MLflow
将 Ray Tune 与 MLflow 集成,可以有效地跟踪和记录 Databricks 中的超参数优化试验。 此集成利用 MLflow 的试验跟踪功能,直接从 Ray 任务记录指标和结果。
用于日志记录的子运行方法
与从 Ray Core 任务进行日志记录类似,Ray Tune 应用程序可以使用子运行方法,来记录每次试验或优化迭代的指标。 使用以下步骤实现子运行方法:
- 创建父运行:在驱动程序进程中初始化父运行。 此运行用作所有后续子运行的主容器。
- 日志子运行:每个 Ray Tune 任务都会在父运行下创建子运行,维持试验结果的清晰层次结构。
以下示例演示如何使用 MLflow 对 Ray Tune 任务进行身份验证和日志记录。
import os
import tempfile
import time
import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars
from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow
mlflow_db_creds = get_databricks_env_vars("databricks")
EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)
def evaluation_fn(step, width, height):
return (0.1 + width * step / 100) ** (-1) + height * 0.1
def train_function_mlflow(config, run_id):
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(EXPERIMENT_NAME)
# Hyperparameters
width = config["width"]
height = config["height"]
with mlflow.start_run(run_id=run_id, nested=True):
for step in range(config.get("steps", 100)):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Log the metrics to MLflow
mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
# Feed the score back to Tune.
train.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)
def tune_with_setup(run_id, finish_fast=True):
os.environ.update(mlflow_db_creds)
# Set the experiment or create a new one if it does not exist.
mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
tuner = tune.Tuner(
tune.with_parameter(train_function_mlflow, run_id),
tune_config=tune.TuneConfig(num_samples=5),
run_config=train.RunConfig(
name="mlflow",
),
param_space={
"width": tune.randint(10, 100),
"height": tune.randint(0, 100),
"steps": 20 if finish_fast else 100,
},
)
results = tuner.fit()
with mlflow.start_run() as run:
mlflow_tracking_uri = mlflow.get_tracking_uri()
tune_with_setup(run.info.run_id)
模型服务
在 Databricks 群集上使用 Ray Serve 进行实时推理会带来挑战,因为与外部应用程序交互时受到网络安全和连接限制。
Databricks 建议使用模型服务,将生产中的机器学习模型部署到 REST API 终结点。 有关详细信息,请参阅部署自定义模型。