Udostępnij za pośrednictwem


Integrowanie biblioteki MLflow i Ray

MLflow to platforma typu open source do zarządzania obciążeniami uczenia maszynowego i sztucznej inteligencji. Połączenie raya z MLflow umożliwia dystrybucję obciążeń za pomocą raya i śledzenia modeli, metryk, parametrów i metadanych generowanych podczas trenowania za pomocą platformy MLflow.

W tym artykule opisano sposób integrowania biblioteki MLflow z następującymi składnikami Ray:

  • Ray Core: aplikacje rozproszone ogólnego przeznaczenia, które nie są objęte Ray Tune i Ray Train

  • Ray Train: Trenowanie modelu rozproszonego

  • Ray Tune: Dostrajanie hiperparametrów rozproszonych

  • Obsługa modeli: wdrażanie modeli na potrzeby wnioskowania w czasie rzeczywistym

Integrowanie technologii Ray Core i MLflow

Ray Core udostępnia podstawowe bloki konstrukcyjne dla aplikacji rozproszonych ogólnego przeznaczenia. Umożliwia skalowanie funkcji i klas języka Python w wielu węzłach.

W tej sekcji opisano następujące wzorce integracji technologii Ray Core i MLflow:

  • Rejestrowanie modeli MLflow z procesu sterownika Ray
  • Rejestrowanie modeli MLflow z przebiegów podrzędnych

Rejestrowanie biblioteki MLflow z procesu sterownika Ray

Zazwyczaj najlepiej rejestrować modele MLflow z procesu sterownika, a nie z węzłów roboczych. Jest to spowodowane dodatkową złożonością przekazywania odwołań stanowych do pracowników zdalnych.

Na przykład następujący kod kończy się niepowodzeniem, ponieważ serwer śledzenia MLflow nie jest inicjowany przy użyciu elementu MLflow Client z węzłów procesu roboczego.

import mlflow

@ray.remote
def example_logging_task(x):
# ...

 # This method will fail
 mlflow.log_metric("x", x)
 return x

with mlflow.start_run() as run:
 ray.get([example_logging_task.remote(x) for x in range(10)])

Zamiast tego zwróć metryki do węzła sterownika. Metryki i metadane są na ogół wystarczająco małe, aby przenieść z powrotem do sterownika bez powodowania problemów z pamięcią.

Weź przykład przedstawiony powyżej i zaktualizuj go, aby zarejestrować zwrócone metryki z zadania Ray:

import mlflow

@ray.remote
def example_logging_task(x):
 # ...
 return x

with mlflow.start_run() as run:
  results = ray.get([example_logging_task.remote(x) for x in range(10)])
 for x in results:
   mlflow.log_metric("x", x)

W przypadku zadań wymagających zapisywania dużych artefaktów, takich jak duża tabela Pandas, obrazy, wykresy lub modele, usługa Databricks zaleca utrwalanie artefaktu jako pliku. Następnie załaduj ponownie artefakt w kontekście sterownika lub bezpośrednio zarejestruj obiekt za pomocą biblioteki MLflow, określając ścieżkę do zapisanego pliku.

import mlflow

@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
  f.write(myLargeObject)
return x

with mlflow.start_run() as run:
 results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
  mlflow.log_metric("x", x)
  # Directly log the saved file by specifying the path
  mlflow.log_artifact("/dbfs/myLargeFilePath.txt")

Zadania rejestrowania raya podczas uruchamiania podrzędnego MLflow

Program Ray Core można zintegrować z platformą MLflow przy użyciu przebiegów podrzędnych. Obejmuje to następujące kroki:

  1. Utwórz uruchomienie nadrzędne: zainicjuj uruchomienie nadrzędne w procesie sterownika. Ten przebieg działa jako kontener hierarchiczny dla wszystkich kolejnych przebiegów podrzędnych.
  2. Tworzenie przebiegów podrzędnych: w ramach każdego zadania Ray zainicjuj uruchomienie podrzędne w ramach uruchomienia nadrzędnego. Każde uruchomienie podrzędne może niezależnie rejestrować własne metryki.

Aby zaimplementować to podejście, upewnij się, że każde zadanie Ray otrzymuje niezbędne poświadczenia klienta i element nadrzędny run_id. Ta konfiguracja ustanawia hierarchiczną relację nadrzędny-podrzędny między przebiegami. Poniższy fragment kodu pokazuje, jak pobrać poświadczenia i przekazać je razem z elementem nadrzędnym run_id:

from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"

mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
   import os
  # Set the MLflow credentials within the Ray task
   os.environ.update(mlflow_db_creds)
  # Set the active MLflow experiment within each Ray task
   mlflow.set_experiment(experiment_name)
  # Create nested child runs associated with the parent run_id
   with mlflow.start_run(run_id=run_id, nested=True):
    # Log metrics to the child run within the Ray task
       mlflow.log_metric("x", x)

  return x

# Start parent run on the main driver process
with mlflow.start_run() as run:
  # Pass the parent run's run_id to each Ray task
   results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Ray Train i MLflow

Najprostszym sposobem rejestrowania modeli Ray Train do MLflow jest użycie punktu kontrolnego wygenerowanego przez przebieg trenowania. Po zakończeniu przebiegu trenowania załaduj ponownie model w natywnej strukturze uczenia głębokiego (np. PyTorch lub TensorFlow), a następnie zarejestruj go przy użyciu odpowiedniego kodu MLflow.

Takie podejście gwarantuje, że model jest prawidłowo przechowywany i gotowy do oceny lub wdrożenia.

Poniższy kod ponownie ładuje model z punktu kontrolnego Ray Train i rejestruje go w usłudze MLflow:

result = trainer.fit()

checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
     # Change as needed for different DL frameworks
    checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
    # Load the model from the checkpoint
    model = MyModel.load_from_checkpoint(checkpoint_path)

with mlflow.start_run() as run:
    # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Chociaż zazwyczaj najlepszym rozwiązaniem jest wysyłanie obiektów z powrotem do węzła sterownika, zapisywanie wyników końcowych jest łatwiejsze niż cała historia trenowania z procesu roboczego.

Aby przechowywać wiele modeli z przebiegu trenowania, określ liczbę punktów kontrolnych, które mają być przechowywane w obiekcie ray.train.CheckpointConfig. Modele można następnie odczytywać i rejestrować w taki sam sposób, jak przechowywanie pojedynczego modelu.

Uwaga

Platforma MLflow nie jest odpowiedzialna za obsługę odporności na uszkodzenia podczas trenowania modelu, a nie do śledzenia cyklu życia modelu. Odporność na uszkodzenia jest zamiast tego zarządzana przez samą firmę Ray Train.

Aby przechowywać metryki trenowania określone przez ray train, pobierz je z obiektu wyników i zapisz je przy użyciu biblioteki MLflow.

result = trainer.fit()

with mlflow.start_run() as run:
    mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))

  # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Aby prawidłowo skonfigurować klastry Spark i Ray i zapobiec problemom z alokacją resources_per_worker zasobów, należy dostosować to ustawienie. W szczególności ustaw liczbę procesorów CPU dla każdego procesu roboczego raya na mniejszą niż łączna liczba procesorów CPU dostępnych w węźle procesu roboczego Ray. Ta korekta ma kluczowe znaczenie, ponieważ jeśli trener rezerwuje wszystkie dostępne rdzenie dla aktorów Raya, może prowadzić do błędów rywalizacji o zasoby.

Ray Tune i MLflow

Zintegrowanie aplikacji Ray Tune z platformą MLflow umożliwia efektywne śledzenie i rejestrowanie eksperymentów dostrajania hiperparametrów w usłudze Databricks. Ta integracja wykorzystuje możliwości śledzenia eksperymentów platformy MLflow do rejestrowania metryk i wyników bezpośrednio z zadań raya.

Podejście podrzędne do rejestrowania

Podobnie jak w przypadku rejestrowania z zadań Ray Core, aplikacje Ray Tune mogą używać podejścia podrzędnego do rejestrowania metryk z każdej wersji próbnej lub dostrajania iteracji. Aby zaimplementować podejście podrzędne, wykonaj następujące czynności:

  1. Utwórz uruchomienie nadrzędne: zainicjuj uruchomienie nadrzędne w procesie sterownika. To uruchomienie służy jako główny kontener dla wszystkich kolejnych przebiegów podrzędnych.
  2. Przebiegi podrzędne dziennika: każde zadanie Ray Tune tworzy podrzędne uruchomienie w ramach przebiegu nadrzędnego, zachowując wyraźną hierarchię wyników eksperymentu.

W poniższym przykładzie pokazano, jak uwierzytelniać się i rejestrować zadania ray tune przy użyciu biblioteki MLflow.

import os
import tempfile
import time

import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars

from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow

mlflow_db_creds = get_databricks_env_vars("databricks")

EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)

def evaluation_fn(step, width, height):
   return (0.1 + width * step / 100) ** (-1) + height * 0.1

def train_function_mlflow(config, run_id):
   os.environ.update(mlflow_db_creds)
   mlflow.set_experiment(EXPERIMENT_NAME)

   # Hyperparameters
   width = config["width"]
   height = config["height"]

   with mlflow.start_run(run_id=run_id, nested=True):
       for step in range(config.get("steps", 100)):
           # Iterative training function - can be any arbitrary training procedure
           intermediate_score = evaluation_fn(step, width, height)
           # Log the metrics to MLflow
           mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
           # Feed the score back to Tune.
           train.report({"iterations": step, "mean_loss": intermediate_score})
           time.sleep(0.1)

def tune_with_setup(run_id, finish_fast=True):
   os.environ.update(mlflow_db_creds)
   # Set the experiment or create a new one if it does not exist.
   mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)

   tuner = tune.Tuner(
       tune.with_parameter(train_function_mlflow, run_id),
       tune_config=tune.TuneConfig(num_samples=5),
       run_config=train.RunConfig(
           name="mlflow",
       ),
       param_space={
           "width": tune.randint(10, 100),
           "height": tune.randint(0, 100),
           "steps": 20 if finish_fast else 100,
       },
   )
   results = tuner.fit()

with mlflow.start_run() as run:
   mlflow_tracking_uri = mlflow.get_tracking_uri()
   tune_with_setup(run.info.run_id)

Obsługa modelu

Korzystanie z usługi Ray Serve w klastrach usługi Databricks na potrzeby wnioskowania w czasie rzeczywistym stanowi wyzwanie ze względu na ograniczenia zabezpieczeń sieci i łączności podczas interakcji z aplikacjami zewnętrznymi.

Usługa Databricks zaleca używanie usługi Model Serving do wdrażania modeli uczenia maszynowego w środowisku produkcyjnym w punkcie końcowym interfejsu API REST. Aby uzyskać więcej informacji, zobacz Wdrażanie modeli niestandardowych.