Partilhar via


Integre MLflow e Ray

O MLflow é uma plataforma de código aberto para gerenciar cargas de trabalho de aprendizado de máquina e IA. A combinação do Ray com o MLflow permite distribuir cargas de trabalho com o Ray e rastrear modelos, métricas, parameterse metadados gerados durante o treinamento com o MLflow.

Este artigo aborda como integrar o MLflow com os seguintes componentes do Ray:

  • Ray Core: aplicativos distribuídos de uso geral que não são cobertos pelo Ray Tune e pelo Ray Train

  • Ray Train: Treinamento de modelo distribuído

  • Ray Tune: Ajuste de hiperparâmetros distribuídos

  • Serviço de modelos: Implantando modelos para inferência em tempo real

Integre o Ray Core e o MLflow

O Ray Core fornece os blocos de construção fundamentais para aplicações distribuídas de uso geral. Ele permite que você dimensione funções e classes Python em vários nós.

Esta seção descreve os seguintes padrões para integrar o Ray Core e o MLflow:

  • Registrar modelos MLflow a partir do processo do driver Ray
  • Registrar modelos MLflow de execuções filho

Registrar MLflow do processo do driver Ray

Geralmente, é melhor registrar modelos MLflow do processo do driver em vez de nós de trabalho. Isso se deve à complexidade adicional de passar referências stateful para os trabalhadores remotos.

Por exemplo, o código a seguir falha porque o MLflow Tracking Server não é inicializado usando os MLflow Client nós de trabalho de dentro.

import mlflow

@ray.remote
def example_logging_task(x):
# ...

 # This method will fail
 mlflow.log_metric("x", x)
 return x

with mlflow.start_run() as run:
 ray.get([example_logging_task.remote(x) for x in range(10)])

Em vez disso, retorne as métricas para o nó do driver. As métricas e metadados são geralmente pequenos o suficiente para serem transferidos de volta para o driver sem causar problemas de memória.

Pegue o exemplo mostrado acima e update-lo para registrar as métricas retornadas de uma tarefa Ray:

import mlflow

@ray.remote
def example_logging_task(x):
 # ...
 return x

with mlflow.start_run() as run:
  results = ray.get([example_logging_task.remote(x) for x in range(10)])
 for x in results:
   mlflow.log_metric("x", x)

Para tarefas que exigem salvar artefatos grandes, como um grande Pandas table, imagens, gráficos ou modelos, o Databricks recomenda guardar o artefato como um arquivo. Em seguida, recarregue o artefato dentro do contexto do driver ou registre diretamente o objeto com MLflow especificando o caminho para o arquivo salvo.

import mlflow

@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
  f.write(myLargeObject)
return x

with mlflow.start_run() as run:
 results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
  mlflow.log_metric("x", x)
  # Directly log the saved file by specifying the path
  mlflow.log_artifact("/dbfs/myLargeFilePath.txt")

Tarefas do Log Ray enquanto o filho MLflow é executado

Você pode integrar o Ray Core com o MLflow usando execuções filhas. Isso envolve as seguintes etapas:

  1. Criar uma execução pai: inicialize uma execução pai no processo de driver. Essa execução atua como um contêiner hierárquico para todas as execuções filho subsequentes.
  2. Criar execuções filhas: Dentro de cada tarefa Ray, inicie uma execução filho sob a execução pai. Cada execução filho pode registrar de forma independente suas próprias métricas.

Para implementar esta abordagem, certifique-se de que cada tarefa Ray receba o necessário do cliente credentials e o pai run_id. Esta configuração estabelece a relação hierárquica pai-filho entre execuções. O trecho de código a seguir demonstra como recuperar o credentials e passar ao run_idpai.

from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"

mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
   import os
  # Set the MLflow credentials within the Ray task
   os.environ.update(mlflow_db_creds)
  # Set the active MLflow experiment within each Ray task
   mlflow.set_experiment(experiment_name)
  # Create nested child runs associated with the parent run_id
   with mlflow.start_run(run_id=run_id, nested=True):
    # Log metrics to the child run within the Ray task
       mlflow.log_metric("x", x)

  return x

# Start parent run on the main driver process
with mlflow.start_run() as run:
  # Pass the parent run's run_id to each Ray task
   results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Ray Train e MLflow

A maneira mais simples de registrar os modelos do Ray Train no MLflow é usar o ponto de verificação gerado pela corrida de treinamento. Após a conclusão da execução de treinamento, recarregue o modelo em sua estrutura nativa de aprendizado profundo (como PyTorch ou TensorFlow) e registre-o com o código MLflow correspondente.

Essa abordagem garante que o modelo seja armazenado corretamente e esteja pronto para avaliação ou implantação.

O código a seguir recarrega um modelo de um ponto de verificação do Ray Train e o registra no MLflow:

result = trainer.fit()

checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
     # Change as needed for different DL frameworks
    checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
    # Load the model from the checkpoint
    model = MyModel.load_from_checkpoint(checkpoint_path)

with mlflow.start_run() as run:
    # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Embora geralmente seja uma prática recomendada enviar objetos de volta para o nó do motorista, com o Ray Train, salvar os resultados finais é mais fácil do que todo o histórico de treinamento do processo de trabalho.

Para armazenar vários modelos de uma execução de treinamento, especifique o número de pontos de verificação a serem mantidos no ray.train.CheckpointConfig. Os modelos podem então ser lidos e registados da mesma forma que armazenam um único modelo.

Nota

O MLflow não é responsável por lidar com a tolerância a falhas durante o treinamento do modelo, mas sim por rastrear o ciclo de vida do modelo. Em vez disso, a tolerância a falhas é gerenciada pelo próprio Ray Train.

Para armazenar as métricas de treinamento especificadas pelo Ray Train, recupere-as do objeto de resultado e armazene-as usando MLflow.

result = trainer.fit()

with mlflow.start_run() as run:
    mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))

  # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Para configurar corretamente seus clusters Spark e Ray e evitar problemas de alocação de recursos, você deve ajustar a resources_per_worker configuração. Especificamente, set o número de CPUs disponíveis para cada trabalhador Ray seja menos um que o número total de CPUs disponíveis em um nó de trabalhador Ray. Esse ajuste é crucial porque se o treinador reservar todos os núcleos disponíveis para os atores de Ray, isso pode levar a erros de contenção de recursos.

Ray Tune e MLflow

A integração do Ray Tune com o MLflow permite rastrear e registrar com eficiência experimentos de ajuste de hiperparâmetros no Databricks. Essa integração aproveita os recursos de rastreamento de experimentos do MLflow para registrar métricas e resultados diretamente das tarefas do Ray.

Abordagem de execução infantil para registro em log

Semelhante ao registro de tarefas do Ray Core, os aplicativos Ray Tune podem usar uma abordagem de execução infantil para registrar métricas de cada iteração de avaliação ou ajuste. Use as seguintes etapas para implementar uma abordagem administrada por crianças:

  1. Criar uma execução pai: inicialize uma execução pai no processo de driver. Essa execução serve como o contêiner principal para todas as execuções filho subsequentes.
  2. Log filho executa: Cada tarefa do Ray Tune cria uma execução filho sob a execução pai, mantendo uma hierarquia clara dos resultados do experimento.

O exemplo a seguir demonstra como autenticar e registrar tarefas do Ray Tune usando MLflow.

import os
import tempfile
import time

import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars

from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow

mlflow_db_creds = get_databricks_env_vars("databricks")

EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)

def evaluation_fn(step, width, height):
   return (0.1 + width * step / 100) ** (-1) + height * 0.1

def train_function_mlflow(config, run_id):
   os.environ.update(mlflow_db_creds)
   mlflow.set_experiment(EXPERIMENT_NAME)

   # Hyperparameters
   width = config["width"]
   height = config["height"]

   with mlflow.start_run(run_id=run_id, nested=True):
       for step in range(config.get("steps", 100)):
           # Iterative training function - can be any arbitrary training procedure
           intermediate_score = evaluation_fn(step, width, height)
           # Log the metrics to MLflow
           mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
           # Feed the score back to Tune.
           train.report({"iterations": step, "mean_loss": intermediate_score})
           time.sleep(0.1)

def tune_with_setup(run_id, finish_fast=True):
   os.environ.update(mlflow_db_creds)
   # Set the experiment or create a new one if it does not exist.
   mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)

   tuner = tune.Tuner(
       tune.with_parameter(train_function_mlflow, run_id),
       tune_config=tune.TuneConfig(num_samples=5),
       run_config=train.RunConfig(
           name="mlflow",
       ),
       param_space={
           "width": tune.randint(10, 100),
           "height": tune.randint(0, 100),
           "steps": 20 if finish_fast else 100,
       },
   )
   results = tuner.fit()

with mlflow.start_run() as run:
   mlflow_tracking_uri = mlflow.get_tracking_uri()
   tune_with_setup(run.info.run_id)

Modelo de Servir

O uso do Ray Serve em clusters Databricks para inferência em tempo real representa desafios devido à segurança da rede e às limitações de conectividade ao interagir com aplicativos externos.

A Databricks recomenda o uso do Model Serving para implantar modelos de aprendizado de máquina em produção em um ponto de extremidade da API REST. Para obter mais informações, consulte Implantar modelos personalizados.