共用方式為


在 Spark 作業中部署及執行 MLflow 模型

在本文中,了解如何在 Spark 作業中部署及執行 MLflow 模型,以對大量資料或資料整頓作業執行推斷。

關於此範例

此範例說明如何將在 Azure Machine Learning 中註冊的 MLflow 模型部署到在受控 Spark 叢集 (預覽) 中執行的 Spark 作業、Azure Databricks 或 Azure Synapse Analytics,以對大量資料執行推斷。

模型是以 UCI 心臟疾病資料集為基礎。 資料庫包含 76 個屬性,但我們使用其中 14 個屬性。 此模型會嘗試預測病患是否有心臟疾病。 值為 0 (沒有) 到 1 (有) 的整數值。 模型已使用 XGBBoost 分類器進行訓練,且所有必要的前置處理都已封裝為 scikit-learn 管線,讓此模型成為從原始資料到預測的端對端管線。

本文中的資訊是以 azureml-examples 存放庫中包含的程式碼範例為基礎。 若要在本機執行命令,而不需要複製/貼上檔案,請複製存放庫,然後將目錄變更為 sdk/using-mlflow/deploy

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

必要條件

遵循本文中的步驟之前,請確定您已滿足下列必要條件:

  • 安裝 MLflow SDK mlflow 套件和適用於 MLflow 的 Azure 機器學習 azureml-mlflow 外掛程式,如下所示:

    pip install mlflow azureml-mlflow
    

    提示

    您可使用 mlflow-skinny 套件,這是輕量型 MLflow 套件,沒有 SQL 儲存體、伺服器、UI 或資料科學相依性。 對於主要需要 MLflow 追蹤和記錄功能的使用者,而不需匯入完整的功能套件,包括部署,建議使用此套件。

  • 建立 Azure Machine Learning 工作區。 若要建立工作區,請參閱 建立您需要開始使用的資源。 檢閱您在工作區中執行 MLflow 作業所需的存取權限

  • 若要執行遠程追蹤,或追蹤在 Azure 機器學習 外部執行的實驗,請將 MLflow 設定為指向 Azure 機器學習 工作區的追蹤 URI。 如需如何將 MLflow 連線至工作區的詳細資訊,請參閱設定適用於 Azure Machine Learning 的 MLflow

  • 您必須在工作區中註冊 MLflow 模型。 特別是,此範例會註冊針對糖尿病資料集定型的模型。

連線到您的工作區

首先,讓我們連線到您的模型在其中註冊的 Azure Machine Learning 工作區。

已為您設定追蹤。 使用 MLflow 時,也會使用您的預設認證。

註冊模型

我們需要在 Azure Machine Learning 登錄中註冊的模型來執行推斷。 在此案例中,存放庫中已具有模型的本機複本,因此我們只需要將模型發佈至工作區中的登錄。 如果您已註冊您嘗試部署的模型,則可以略過此步驟。

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

或者,如果您的模型是在執行內進行記錄,您可以直接註冊模型。

提示

若要登錄模型,您必須知道模型儲存的位置。 如果您使用 autolog MLflow 的功能,路徑將取決於所使用的模型類型和架構。 我們建議檢查作業輸出,以識別此資料夾的名稱。 您可以尋找包含名為 MLModel 檔案的資料夾。 如果您要使用 log_model 手動記錄模型,則路徑是您傳遞至這類方法的引數。 例如,如果您使用 mlflow.sklearn.log_model(my_model, "classifier") 記錄模型,則儲存模型的路徑為 classifier

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

注意

路徑 MODEL_PATH 是模型儲存在執行中的位置。


取得要評分的輸入資料

我們需要一些輸入資料來執行或作業。 在此範例中,我們將從網際網路下載範例資料,並將其放在 Spark 叢集所使用的共用記憶體中。

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

將資料移至可供整個叢集使用的掛接儲存體帳戶。

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

重要

先前的程式碼會使用 dbutils,這是 Azure Databricks 叢集中可用的工具。 視您使用的平台而定,請使用適當的工具。

接著,輸入資料會放在下列資料夾中:

input_data_path = "dbfs:/data"

在 Spark 叢集中執行模型

下一節說明如何在 Spark 作業中執行在 Azure Machine Learning 註冊的 MLflow 模型。

  1. 請確定叢集中已安裝下列程式庫:

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. 我們將使用筆記本來示範如何使用 Azure Machine Learning 中註冊的 MLflow 模型來建立評分常式。 建立筆記本,並使用 PySpark 做為預設語言。

  3. 匯入必要的命名空間:

    import mlflow
    import pyspark.sql.functions as f
    
  4. 設定模型 URI。 下列 URI 會在其最新版本中引進名為 heart-classifier 的模型。

    model_uri = "models:/heart-classifier/latest"
    
  5. 將模型載入成 UDF 函式。 使用者定義函式 (UDF) 是由使用者定義的函式,可讓使用者在使用者環境中重複使用自訂邏輯。

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    提示

    使用引數 result_type 來控制 predict() 函式所傳回的類型。

  6. 讀取您要評分的資料:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    在我們的案例中,輸入資料為 CSV 格式,並放在 dbfs:/data/ 資料夾中。 我們也會卸除資料行 target,因為此資料集包含要預測的目標變數。 在生產案例中,您的資料不會有此資料行。

  7. 執行函式 predict_function,並將預測放在新的資料行上。 在此情況下,我們會在資料行 predictions 中放置預測。

    df.withColumn("predictions", score_function(*df.columns))
    

    提示

    predict_function 會接收做為必要資料行的引數。 在我們的案例中,模型會預期資料框架的所有資料行,因此會使用 df.columns。 如果您的模型需要資料行的子集,您可以手動引進。 如果您的模型具有簽章,則輸入類型與預期類型之間必須相容。

  8. 您可以將預測寫回儲存體:

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

在 Azure Machine Learning 的獨立 Spark 作業中執行模型

Azure Machine Learning 支援建立獨立 Spark 作業,以及建立可在 Azure Machine Learning 管線中使用的可重複使用 Spark 元件。 在此範例中,我們將部署在 Azure Machine Learning 獨立 Spark 作業中執行的評分作業,並執行 MLflow 模型來執行推斷。

注意

若要深入了解 Azure Machine Learning 中的 Spark 作業,請參閱在 Azure Machine Learning 中提交 Spark 作業 (預覽)

  1. Spark 作業需要採用引數的 Python 指令碼。 建立評分指令碼:

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    上述指令碼接受三個引數︰--model--input_data--scored_data。 前兩個為輸入,代表我們需要執行的模型和輸入資料,最後一個則是輸出,其為將放置預測的輸出資料夾。

    提示

    安裝 Python 套件:先前的評分指令碼會將 MLflow 模型載入 UDF 函式,但會指出參數 env_manager="conda"。 設定此參數時,MLflow 會還原隔離環境中只有 UDF 函式執行時,模型定義中所指定的必要套件。 如需詳細資訊,請參閱mlflow.pyfunc.spark_udf文件。

  2. 建立工作定義:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    提示

    若要使用附加的 Synapse Spark 集區,請定義上述範例 YAML 規格檔案中的 compute 屬性,而不是 resources 屬性。

  3. 上述 YAML 檔案可以搭配 --file 參數在 az ml job create 令中使用,以建立獨立 Spark 作業,如下所示:

    az ml job create -f mlflow-score-spark-job.yml
    

下一步