共用方式為


整合 MLflow 和 Ray

MLflow 是開放原始碼平台,可管理機器學習和 AI 工作負載。 結合 Ray 與 MLflow,可讓您使用 Ray 散發工作負載,並使用 MLflow 來追蹤在訓練期間產生的模型、計量、參數和中繼資料。

本文會說明如何將 MLflow 與下列 Ray 元件整合:

  • Ray Core:Ray Tune 和 Ray Train 未涵蓋的一般用途分散式應用程式

  • Ray 訓練:分散式模型訓練

  • Ray 微調:分散式超參數微調

  • 模型服務:部署模型以進行即時推斷

整合 Ray Core 和 MLflow

Ray Core 提供一般用途分散式應用程式的基礎建構元素。 其可讓您跨多個節點縮放 Python 函式和類別。

本節說明整合 Ray Core 和 MLflow 的下列模式:

  • 記錄來自 Ray 驅動程式程序的 MLflow 模型
  • 從子執行記錄 MLflow 模型

從 Ray 驅動程式程序記錄 MLflow

通常建議您從驅動程式程序記錄 MLflow 模型,而不是從背景工作角色節點記錄。 這是因為將具狀態參考傳遞至遠端工作者的作業額外複雜。

例如,因為 MLflow 追蹤伺服器不會使用背景工作節點內的 MLflow Client 初始化,所以下列程式碼會失敗。

import mlflow

@ray.remote
def example_logging_task(x):
# ...

 # This method will fail
 mlflow.log_metric("x", x)
 return x

with mlflow.start_run() as run:
 ray.get([example_logging_task.remote(x) for x in range(10)])

相反地,請將計量傳回至驅動程式節點。 計量和中繼資料通常夠小,能夠在不造成記憶體問題的情況下傳輸回驅動程式。

請以上述範例為例,並加以更新,以記錄來自 Ray 工作的傳回計量:

import mlflow

@ray.remote
def example_logging_task(x):
 # ...
 return x

with mlflow.start_run() as run:
  results = ray.get([example_logging_task.remote(x) for x in range(10)])
 for x in results:
   mlflow.log_metric("x", x)

對於需要儲存大型成品的工作 (例如大型 Pandas 資料表、影像、繪圖或模型),Databricks 建議您將成品保存為檔案。 然後,藉由指定儲存檔案的路徑,重新載入驅動程式內容中的成品,或使用 MLflow 直接記錄物件。

import mlflow

@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
  f.write(myLargeObject)
return x

with mlflow.start_run() as run:
 results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
  mlflow.log_metric("x", x)
  # Directly log the saved file by specifying the path
  mlflow.log_artifact("/dbfs/myLargeFilePath.txt")

將 Ray 工作記錄為 MLflow 子執行

您可以使用子執行來整合 Ray Core 與 MLflow。 這與下列步驟有關:

  1. 建立父執行:初始化驅動程式程序中的父執行。 此執行可作為所有後續子執行的階層式容器。
  2. 建立子執行:在每個 Ray 工作中,起始父執行下的子執行。 每個子執行都可以獨立記錄自身的計量。

若要實作此方法,請確定每個 Ray 工作都會收到必要的用戶端認證和父系 run_id。 此設定會建立執行之間的階層式父子關聯性。 下列程式碼片段會示範如何擷取認證並傳遞父系 run_id

from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"

mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
   import os
  # Set the MLflow credentials within the Ray task
   os.environ.update(mlflow_db_creds)
  # Set the active MLflow experiment within each Ray task
   mlflow.set_experiment(experiment_name)
  # Create nested child runs associated with the parent run_id
   with mlflow.start_run(run_id=run_id, nested=True):
    # Log metrics to the child run within the Ray task
       mlflow.log_metric("x", x)

  return x

# Start parent run on the main driver process
with mlflow.start_run() as run:
  # Pass the parent run's run_id to each Ray task
   results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Ray Train 和 MLflow

將 Ray 訓練模型記錄至 MLflow 的最簡單方式,就是使用訓練執行所產生的檢查點。 訓練執行完成之後,請在其原生深度學習架構 (例如 PyTorch 或 TensorFlow) 中重新載入模型,然後使用對應的 MLflow 程式碼加以記錄。

此方法可確保模型經過正確儲存,並準備好進行評估或部署。

下列程式碼會從 Ray Train 檢查點重新載入模型,並將其記錄至 MLflow:

result = trainer.fit()

checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
     # Change as needed for different DL frameworks
    checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
    # Load the model from the checkpoint
    model = MyModel.load_from_checkpoint(checkpoint_path)

with mlflow.start_run() as run:
    # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

雖然最佳做法通常是將物件傳回驅動程式節點,但有了 Ray Train 後,儲存最終結果比從背景工作處理序儲存完整訓練歷程記錄更容易。

若要從訓練執行儲存多個模型,請指定要保留在 ray.train.CheckpointConfig 中的檢查點數目。 然後,您就可以像儲存單一模型一樣地讀取和記錄模型。

注意

MLflow 不負責在模型訓練期間處理容錯,而是負責追蹤模型的生命週期。 容錯是由 Ray Train 本身所管理。

若要儲存 Ray Train 所指定的訓練計量,請從結果物件加以擷取,並使用 MLflow 儲存。

result = trainer.fit()

with mlflow.start_run() as run:
    mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))

  # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

若要正確設定 Spark 和 Ray 叢集並防止資源分派問題,您應該調整 resources_per_worker 設定。 具體來說,將每個 Ray 背景工作角色的 CPU 數目設定為 Ray 背景工作角色節點上可用 CPU 總數減一即可。 這項調整非常重要,因為如果訓練工具為 Ray 動作項目保留所有可用的核心,可能會導致資源爭用錯誤。

Ray Tune 和 MLflow

整合 Ray Tune 與 MLflow,可讓您有效率地追蹤和記錄 Databricks 內的超參數微調實驗。 此整合會利用 MLflow 的實驗追蹤功能,直接從 Ray 工作記錄計量和結果。

用於記錄的子執行方法

與從 Ray Core 工作記錄類似,Ray Tune 應用程式可以使用子執行方法來記錄每個試用或微調反覆項目的計量。 使用下列步驟來實作子執行方法:

  1. 建立父執行:初始化驅動程式程序中的父執行。 此執行可作為所有後續子執行的主要容器。
  2. 記錄子執行:每項 Ray Tune 工作都會在父執行下建立子執行,並維護實驗結果的明確階層圖。

下列範例示範如何使用 MLflow,從 Ray Tune 工作進行驗證和記錄。

import os
import tempfile
import time

import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars

from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow

mlflow_db_creds = get_databricks_env_vars("databricks")

EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)

def evaluation_fn(step, width, height):
   return (0.1 + width * step / 100) ** (-1) + height * 0.1

def train_function_mlflow(config, run_id):
   os.environ.update(mlflow_db_creds)
   mlflow.set_experiment(EXPERIMENT_NAME)

   # Hyperparameters
   width = config["width"]
   height = config["height"]

   with mlflow.start_run(run_id=run_id, nested=True):
       for step in range(config.get("steps", 100)):
           # Iterative training function - can be any arbitrary training procedure
           intermediate_score = evaluation_fn(step, width, height)
           # Log the metrics to MLflow
           mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
           # Feed the score back to Tune.
           train.report({"iterations": step, "mean_loss": intermediate_score})
           time.sleep(0.1)

def tune_with_setup(run_id, finish_fast=True):
   os.environ.update(mlflow_db_creds)
   # Set the experiment or create a new one if it does not exist.
   mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)

   tuner = tune.Tuner(
       tune.with_parameter(train_function_mlflow, run_id),
       tune_config=tune.TuneConfig(num_samples=5),
       run_config=train.RunConfig(
           name="mlflow",
       ),
       param_space={
           "width": tune.randint(10, 100),
           "height": tune.randint(0, 100),
           "steps": 20 if finish_fast else 100,
       },
   )
   results = tuner.fit()

with mlflow.start_run() as run:
   mlflow_tracking_uri = mlflow.get_tracking_uri()
   tune_with_setup(run.info.run_id)

模型服務

在 Databricks 叢集上使用 Ray Serve 進行即時推斷時,會因為與外部應用程式互動時的網路安全性和連線限制而帶來挑戰。

Databricks 建議您使用模型服務,將生產環境中的機器學習模型部署到 REST API 端點。 如需詳細資訊,請參閱部署自訂模型 (英文)。