Integre MLflow e Ray
O MLflow é uma plataforma de código aberto para gerenciar cargas de trabalho de aprendizado de máquina e IA. A combinação do Ray com o MLflow permite distribuir cargas de trabalho com o Ray e rastrear modelos, métricas, parameterse metadados gerados durante o treinamento com o MLflow.
Este artigo aborda como integrar o MLflow com os seguintes componentes do Ray:
Ray Core: aplicativos distribuídos de uso geral que não são cobertos pelo Ray Tune e pelo Ray Train
Ray Train: Treinamento de modelo distribuído
Ray Tune: Ajuste de hiperparâmetros distribuídos
Serviço de modelos: Implantando modelos para inferência em tempo real
Integre o Ray Core e o MLflow
O Ray Core fornece os blocos de construção fundamentais para aplicações distribuídas de uso geral. Ele permite que você dimensione funções e classes Python em vários nós.
Esta seção descreve os seguintes padrões para integrar o Ray Core e o MLflow:
- Registrar modelos MLflow a partir do processo do driver Ray
- Registrar modelos MLflow de execuções filho
Registrar MLflow do processo do driver Ray
Geralmente, é melhor registrar modelos MLflow do processo do driver em vez de nós de trabalho. Isso se deve à complexidade adicional de passar referências stateful para os trabalhadores remotos.
Por exemplo, o código a seguir falha porque o MLflow Tracking Server não é inicializado usando os MLflow Client
nós de trabalho de dentro.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# This method will fail
mlflow.log_metric("x", x)
return x
with mlflow.start_run() as run:
ray.get([example_logging_task.remote(x) for x in range(10)])
Em vez disso, retorne as métricas para o nó do driver. As métricas e metadados são geralmente pequenos o suficiente para serem transferidos de volta para o driver sem causar problemas de memória.
Pegue o exemplo mostrado acima e update-lo para registrar as métricas retornadas de uma tarefa Ray:
import mlflow
@ray.remote
def example_logging_task(x):
# ...
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
Para tarefas que exigem salvar artefatos grandes, como um grande Pandas table, imagens, gráficos ou modelos, o Databricks recomenda guardar o artefato como um arquivo. Em seguida, recarregue o artefato dentro do contexto do driver ou registre diretamente o objeto com MLflow especificando o caminho para o arquivo salvo.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
f.write(myLargeObject)
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
# Directly log the saved file by specifying the path
mlflow.log_artifact("/dbfs/myLargeFilePath.txt")
Tarefas do Log Ray enquanto o filho MLflow é executado
Você pode integrar o Ray Core com o MLflow usando execuções filhas. Isso envolve as seguintes etapas:
- Criar uma execução pai: inicialize uma execução pai no processo de driver. Essa execução atua como um contêiner hierárquico para todas as execuções filho subsequentes.
- Criar execuções filhas: Dentro de cada tarefa Ray, inicie uma execução filho sob a execução pai. Cada execução filho pode registrar de forma independente suas próprias métricas.
Para implementar esta abordagem, certifique-se de que cada tarefa Ray receba o necessário do cliente credentials e o pai run_id
. Esta configuração estabelece a relação hierárquica pai-filho entre execuções. O trecho de código a seguir demonstra como recuperar o credentials e passar ao run_id
pai.
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
# Set the MLflow credentials within the Ray task
os.environ.update(mlflow_db_creds)
# Set the active MLflow experiment within each Ray task
mlflow.set_experiment(experiment_name)
# Create nested child runs associated with the parent run_id
with mlflow.start_run(run_id=run_id, nested=True):
# Log metrics to the child run within the Ray task
mlflow.log_metric("x", x)
return x
# Start parent run on the main driver process
with mlflow.start_run() as run:
# Pass the parent run's run_id to each Ray task
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Ray Train e MLflow
A maneira mais simples de registrar os modelos do Ray Train no MLflow é usar o ponto de verificação gerado pela corrida de treinamento. Após a conclusão da execução de treinamento, recarregue o modelo em sua estrutura nativa de aprendizado profundo (como PyTorch ou TensorFlow) e registre-o com o código MLflow correspondente.
Essa abordagem garante que o modelo seja armazenado corretamente e esteja pronto para avaliação ou implantação.
O código a seguir recarrega um modelo de um ponto de verificação do Ray Train e o registra no MLflow:
result = trainer.fit()
checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
# Change as needed for different DL frameworks
checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
# Load the model from the checkpoint
model = MyModel.load_from_checkpoint(checkpoint_path)
with mlflow.start_run() as run:
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Embora geralmente seja uma prática recomendada enviar objetos de volta para o nó do motorista, com o Ray Train, salvar os resultados finais é mais fácil do que todo o histórico de treinamento do processo de trabalho.
Para armazenar vários modelos de uma execução de treinamento, especifique o número de pontos de verificação a serem mantidos no ray.train.CheckpointConfig
. Os modelos podem então ser lidos e registados da mesma forma que armazenam um único modelo.
Nota
O MLflow não é responsável por lidar com a tolerância a falhas durante o treinamento do modelo, mas sim por rastrear o ciclo de vida do modelo. Em vez disso, a tolerância a falhas é gerenciada pelo próprio Ray Train.
Para armazenar as métricas de treinamento especificadas pelo Ray Train, recupere-as do objeto de resultado e armazene-as usando MLflow.
result = trainer.fit()
with mlflow.start_run() as run:
mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Para configurar corretamente seus clusters Spark e Ray e evitar problemas de alocação de recursos, você deve ajustar a resources_per_worker
configuração. Especificamente, set o número de CPUs disponíveis para cada trabalhador Ray seja menos um que o número total de CPUs disponíveis em um nó de trabalhador Ray. Esse ajuste é crucial porque se o treinador reservar todos os núcleos disponíveis para os atores de Ray, isso pode levar a erros de contenção de recursos.
Ray Tune e MLflow
A integração do Ray Tune com o MLflow permite rastrear e registrar com eficiência experimentos de ajuste de hiperparâmetros no Databricks. Essa integração aproveita os recursos de rastreamento de experimentos do MLflow para registrar métricas e resultados diretamente das tarefas do Ray.
Abordagem de execução infantil para registro em log
Semelhante ao registro de tarefas do Ray Core, os aplicativos Ray Tune podem usar uma abordagem de execução infantil para registrar métricas de cada iteração de avaliação ou ajuste. Use as seguintes etapas para implementar uma abordagem administrada por crianças:
- Criar uma execução pai: inicialize uma execução pai no processo de driver. Essa execução serve como o contêiner principal para todas as execuções filho subsequentes.
- Log filho executa: Cada tarefa do Ray Tune cria uma execução filho sob a execução pai, mantendo uma hierarquia clara dos resultados do experimento.
O exemplo a seguir demonstra como autenticar e registrar tarefas do Ray Tune usando MLflow.
import os
import tempfile
import time
import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars
from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow
mlflow_db_creds = get_databricks_env_vars("databricks")
EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)
def evaluation_fn(step, width, height):
return (0.1 + width * step / 100) ** (-1) + height * 0.1
def train_function_mlflow(config, run_id):
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(EXPERIMENT_NAME)
# Hyperparameters
width = config["width"]
height = config["height"]
with mlflow.start_run(run_id=run_id, nested=True):
for step in range(config.get("steps", 100)):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Log the metrics to MLflow
mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
# Feed the score back to Tune.
train.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)
def tune_with_setup(run_id, finish_fast=True):
os.environ.update(mlflow_db_creds)
# Set the experiment or create a new one if it does not exist.
mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
tuner = tune.Tuner(
tune.with_parameter(train_function_mlflow, run_id),
tune_config=tune.TuneConfig(num_samples=5),
run_config=train.RunConfig(
name="mlflow",
),
param_space={
"width": tune.randint(10, 100),
"height": tune.randint(0, 100),
"steps": 20 if finish_fast else 100,
},
)
results = tuner.fit()
with mlflow.start_run() as run:
mlflow_tracking_uri = mlflow.get_tracking_uri()
tune_with_setup(run.info.run_id)
Modelo de Servir
O uso do Ray Serve em clusters Databricks para inferência em tempo real representa desafios devido à segurança da rede e às limitações de conectividade ao interagir com aplicativos externos.
A Databricks recomenda o uso do Model Serving para implantar modelos de aprendizado de máquina em produção em um ponto de extremidade da API REST. Para obter mais informações, consulte Implantar modelos personalizados.