Integrar o MLflow e o Ray
O MLflow é uma plataforma de código aberto para gerenciar o ciclo de vida de aprendizado de máquina e cargas de trabalho de IA. A combinação do Ray com o MLflow permite distribuir cargas de trabalho com o Ray e rastrear modelos, métricas, parâmetros e metadados gerados durante o treinamento com o MLflow.
Este artigo aborda como integrar o MLflow com os seguintes componentes do Ray:
Ray Core: aplicativos distribuídos de uso geral que não são cobertos pelo Ray Tune e pelo Ray Train
Ray Train: treinamento de modelo distribuído
Ray Tune: ajuste de hiperparâmetro distribuído
Serviço de Modelo: implantação de modelos para inferência em tempo real
Integre o Ray Core e o MLflow
O Ray Core fornece os blocos de construção fundamentais para aplicativos distribuídos de uso geral. Ele permite que você dimensione funções e classes do Python em vários nós.
Esta seção descreve os seguintes padrões para integrar o Ray Core e o MLflow:
- Registrar modelos do MLflow a partir do processo do driver Ray
- Registrar modelos do MLflow a partir de execuções filhas
Registrar o MLflow a partir do processo do driver Ray
Geralmente, é melhor registrar modelos de MLflow a partir do processo de driver em vez de a partir dos nós de trabalho. Isso se deve à complexidade adicional de passar referências com estado para os trabalhadores remotos.
Por exemplo, o código a seguir falha porque o Servidor de Acompanhamento do MLflow não é inicializado usando o MLflow Client
de dentro dos nós de trabalho.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# This method will fail
mlflow.log_metric("x", x)
return x
with mlflow.start_run() as run:
ray.get([example_logging_task.remote(x) for x in range(10)])
Em vez disso, retorne as métricas para o nó do driver. As métricas e os metadados geralmente são pequenos o suficiente para serem transferidos de volta para o driver sem causar problemas de memória.
Pegue o exemplo mostrado acima e atualize-o para registrar as métricas retornadas a partir de uma tarefa do Ray:
import mlflow
@ray.remote
def example_logging_task(x):
# ...
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
Para tarefas que exigem salvar artefatos grandes, como uma tabela do Pandas, imagens, gráficos ou modelos grandes, o Databricks recomenda manter o artefato como um arquivo. Em seguida, recarregue o artefato no contexto do driver ou registre diretamente o objeto com o MLflow especificando o caminho para o arquivo salvo.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
f.write(myLargeObject)
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
# Directly log the saved file by specifying the path
mlflow.log_artifact("/dbfs/myLargeFilePath.txt")
Registrar tarefas do Ray à medida que a execução filha do MLflow ocorre
Você pode integrar o Ray Core ao MLflow usando execuções filhas. Isso envolve as seguintes etapas:
- Criar uma execução pai: inicialize uma execução pai no processo do driver. Essa execução atua como um contêiner hierárquico para todas as execuções filhas subsequentes.
- Criar execuções filhas: em cada tarefa do Ray, inicie uma execução filha na execução pai. Cada execução filha pode registrar suas próprias métricas de forma independente.
Para implementar essa abordagem, certifique-se de que cada tarefa do Ray receba as credenciais de cliente necessárias e o run_id
pai. Essa configuração estabelece a relação hierárquica pai-filha entre as execuções. O trecho de código a seguir demonstra como recuperar as credenciais e passar adiante o run_id
pai:
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
# Set the MLflow credentials within the Ray task
os.environ.update(mlflow_db_creds)
# Set the active MLflow experiment within each Ray task
mlflow.set_experiment(experiment_name)
# Create nested child runs associated with the parent run_id
with mlflow.start_run(run_id=run_id, nested=True):
# Log metrics to the child run within the Ray task
mlflow.log_metric("x", x)
return x
# Start parent run on the main driver process
with mlflow.start_run() as run:
# Pass the parent run's run_id to each Ray task
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Ray Train e MLflow
A maneira mais simples de registrar os modelos do Ray Train no MLflow é usar o ponto de verificação gerado pela execução de treinamento. Após a conclusão da execução do treinamento, recarregue o modelo em sua estrutura nativa de aprendizado profundo (como PyTorch ou TensorFlow) e registre-o com o código MLflow correspondente.
Essa abordagem garante que o modelo seja armazenado corretamente e pronto para avaliação ou implantação.
O código a seguir recarrega um modelo de um ponto de verificação do Ray Train e o registra no MLflow:
result = trainer.fit()
checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
# Change as needed for different DL frameworks
checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
# Load the model from the checkpoint
model = MyModel.load_from_checkpoint(checkpoint_path)
with mlflow.start_run() as run:
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Embora geralmente seja uma prática recomendada enviar objetos de volta ao nó do driver, com o Ray Train, salvar os resultados finais é mais fácil do que todo o histórico de treinamento do processo de trabalho.
Para armazenar vários modelos de uma execução de treinamento, especifique o número de pontos de verificação a serem mantidos no ray.train.CheckpointConfig
. Os modelos podem ser lidos e registrados da mesma forma que o armazenamento de um único modelo.
Observação
O MLflow não é responsável por lidar com a tolerância a falhas durante o treinamento do modelo, mas sim por acompanhar o ciclo de vida do modelo. A tolerância a falhas é gerenciada pelo próprio Ray Train.
Para armazenar as métricas de treinamento especificadas pelo Ray Train, recupere-as do objeto de resultado e armazene-as usando o MLflow.
result = trainer.fit()
with mlflow.start_run() as run:
mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Para configurar corretamente os clusters do Spark e do Ray e evitar problemas de alocação de recursos, você deve ajustar a configuração resources_per_worker
. Especificamente, defina o número de CPUs para cada trabalho do Ray como um a menos do que o número total de CPUs disponíveis em um nó de trabalho do Ray. Esse ajuste é crucial porque, se o treinador reservar todos os núcleos disponíveis para os atores do Ray, isso pode levar a erros de contenção de recursos.
Ray Tune e MLflow
A integração do Ray Tune com o MLflow permite que você acompanhe e registre com eficiência experimentos de ajuste de hiperparâmetros no Databricks. Essa integração aproveita os recursos de rastreamento de experimentos do MLflow para registrar métricas e resultados diretamente a partir das tarefas do Ray.
Abordagem de execução filha para registro em log
Semelhante ao registro em log de tarefas do Ray Core, as aplicações do Ray Tune podem usar uma abordagem de execução filha para registrar métricas de cada iteração de avaliação ou ajuste. Use as seguintes etapas para implementar uma abordagem de execução filha:
- Criar uma execução pai: inicialize uma execução pai no processo do driver. Essa execução serve como o contêiner principal para todas as execuções filhas subsequentes.
- Registrar execuções filhas: cada tarefa do Ray Tune cria uma execução filha na execução pai, mantendo uma hierarquia clara dos resultados do experimento.
O exemplo a seguir demonstra como autenticar e registrar tarefas do Ray Tune usando o MLflow.
import os
import tempfile
import time
import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars
from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow
mlflow_db_creds = get_databricks_env_vars("databricks")
EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)
def evaluation_fn(step, width, height):
return (0.1 + width * step / 100) ** (-1) + height * 0.1
def train_function_mlflow(config, run_id):
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(EXPERIMENT_NAME)
# Hyperparameters
width = config["width"]
height = config["height"]
with mlflow.start_run(run_id=run_id, nested=True):
for step in range(config.get("steps", 100)):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Log the metrics to MLflow
mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
# Feed the score back to Tune.
train.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)
def tune_with_setup(run_id, finish_fast=True):
os.environ.update(mlflow_db_creds)
# Set the experiment or create a new one if it does not exist.
mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
tuner = tune.Tuner(
tune.with_parameter(train_function_mlflow, run_id),
tune_config=tune.TuneConfig(num_samples=5),
run_config=train.RunConfig(
name="mlflow",
),
param_space={
"width": tune.randint(10, 100),
"height": tune.randint(0, 100),
"steps": 20 if finish_fast else 100,
},
)
results = tuner.fit()
with mlflow.start_run() as run:
mlflow_tracking_uri = mlflow.get_tracking_uri()
tune_with_setup(run.info.run_id)
Serviço de Modelo
O uso do Ray Serve em clusters do Databricks para inferência em tempo real apresenta desafios devido à segurança de rede e limitações de conectividade ao interagir com aplicações externas.
O Databricks recomenda usar o Serviço de Modelo para implantar modelos de aprendizado de máquina em produção em um ponto de extremidade da API REST. Para obter mais informações, consulte Implantar modelos personalizados.