在 Spark 作業中部署及執行 MLflow 模型
在本文中,了解如何在 Spark 作業中部署及執行 MLflow 模型,以對大量資料或資料整頓作業執行推斷。
關於此範例
此範例說明如何將在 Azure Machine Learning 中註冊的 MLflow 模型部署到在受控 Spark 叢集 (預覽) 中執行的 Spark 作業、Azure Databricks 或 Azure Synapse Analytics,以對大量資料執行推斷。
模型是以 UCI 心臟疾病資料集為基礎。 資料庫包含 76 個屬性,但我們使用其中 14 個屬性。 此模型會嘗試預測病患是否有心臟疾病。 值為 0 (沒有) 到 1 (有) 的整數值。 模型已使用 XGBBoost
分類器進行訓練,且所有必要的前置處理都已封裝為 scikit-learn
管線,讓此模型成為從原始資料到預測的端對端管線。
本文中的資訊是以 azureml-examples 存放庫中包含的程式碼範例為基礎。 若要在本機執行命令,而不需要複製/貼上檔案,請複製存放庫,然後將目錄變更為 sdk/using-mlflow/deploy
。
git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy
必要條件
遵循本文中的步驟之前,請確定您已滿足下列必要條件:
安裝 MLflow SDK
mlflow
套件和適用於 MLflow 的 Azure 機器學習azureml-mlflow
外掛程式,如下所示:pip install mlflow azureml-mlflow
提示
您可使用
mlflow-skinny
套件,這是輕量型 MLflow 套件,沒有 SQL 儲存體、伺服器、UI 或資料科學相依性。 對於主要需要 MLflow 追蹤和記錄功能的使用者,而不需匯入完整的功能套件,包括部署,建議使用此套件。建立 Azure Machine Learning 工作區。 若要建立工作區,請參閱 建立您需要開始使用的資源。 檢閱您在工作區中執行 MLflow 作業所需的存取權限。
若要執行遠程追蹤,或追蹤在 Azure 機器學習 外部執行的實驗,請將 MLflow 設定為指向 Azure 機器學習 工作區的追蹤 URI。 如需如何將 MLflow 連線至工作區的詳細資訊,請參閱設定適用於 Azure Machine Learning 的 MLflow。
- 您必須在工作區中註冊 MLflow 模型。 特別是,此範例會註冊針對糖尿病資料集定型的模型。
連線到您的工作區
首先,讓我們連線到您的模型在其中註冊的 Azure Machine Learning 工作區。
已為您設定追蹤。 使用 MLflow 時,也會使用您的預設認證。
註冊模型
我們需要在 Azure Machine Learning 登錄中註冊的模型來執行推斷。 在此案例中,存放庫中已具有模型的本機複本,因此我們只需要將模型發佈至工作區中的登錄。 如果您已註冊您嘗試部署的模型,則可以略過此步驟。
model_name = 'heart-classifier'
model_local_path = "model"
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version
或者,如果您的模型是在執行內進行記錄,您可以直接註冊模型。
提示
若要登錄模型,您必須知道模型儲存的位置。 如果您使用 autolog
MLflow 的功能,路徑將取決於所使用的模型類型和架構。 我們建議檢查作業輸出,以識別此資料夾的名稱。 您可以尋找包含名為 MLModel
檔案的資料夾。 如果您要使用 log_model
手動記錄模型,則路徑是您傳遞至這類方法的引數。 例如,如果您使用 mlflow.sklearn.log_model(my_model, "classifier")
記錄模型,則儲存模型的路徑為 classifier
。
model_name = 'heart-classifier'
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version
注意
路徑 MODEL_PATH
是模型儲存在執行中的位置。
取得要評分的輸入資料
我們需要一些輸入資料來執行或作業。 在此範例中,我們將從網際網路下載範例資料,並將其放在 Spark 叢集所使用的共用記憶體中。
import urllib
urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")
將資料移至可供整個叢集使用的掛接儲存體帳戶。
dbutils.fs.mv("file:/tmp/data", "dbfs:/")
重要
先前的程式碼會使用 dbutils
,這是 Azure Databricks 叢集中可用的工具。 視您使用的平台而定,請使用適當的工具。
接著,輸入資料會放在下列資料夾中:
input_data_path = "dbfs:/data"
在 Spark 叢集中執行模型
下一節說明如何在 Spark 作業中執行在 Azure Machine Learning 註冊的 MLflow 模型。
請確定叢集中已安裝下列程式庫:
- mlflow<3,>=2.1 - cloudpickle==2.2.0 - scikit-learn==1.2.0 - xgboost==1.7.2
我們將使用筆記本來示範如何使用 Azure Machine Learning 中註冊的 MLflow 模型來建立評分常式。 建立筆記本,並使用 PySpark 做為預設語言。
匯入必要的命名空間:
import mlflow import pyspark.sql.functions as f
設定模型 URI。 下列 URI 會在其最新版本中引進名為
heart-classifier
的模型。model_uri = "models:/heart-classifier/latest"
將模型載入成 UDF 函式。 使用者定義函式 (UDF) 是由使用者定義的函式,可讓使用者在使用者環境中重複使用自訂邏輯。
predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double')
提示
使用引數
result_type
來控制predict()
函式所傳回的類型。讀取您要評分的資料:
df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
在我們的案例中,輸入資料為
CSV
格式,並放在dbfs:/data/
資料夾中。 我們也會卸除資料行target
,因為此資料集包含要預測的目標變數。 在生產案例中,您的資料不會有此資料行。執行函式
predict_function
,並將預測放在新的資料行上。 在此情況下,我們會在資料行predictions
中放置預測。df.withColumn("predictions", score_function(*df.columns))
提示
predict_function
會接收做為必要資料行的引數。 在我們的案例中,模型會預期資料框架的所有資料行,因此會使用df.columns
。 如果您的模型需要資料行的子集,您可以手動引進。 如果您的模型具有簽章,則輸入類型與預期類型之間必須相容。您可以將預測寫回儲存體:
scored_data_path = "dbfs:/scored-data" scored_data.to_csv(scored_data_path)
在 Azure Machine Learning 的獨立 Spark 作業中執行模型
Azure Machine Learning 支援建立獨立 Spark 作業,以及建立可在 Azure Machine Learning 管線中使用的可重複使用 Spark 元件。 在此範例中,我們將部署在 Azure Machine Learning 獨立 Spark 作業中執行的評分作業,並執行 MLflow 模型來執行推斷。
注意
若要深入了解 Azure Machine Learning 中的 Spark 作業,請參閱在 Azure Machine Learning 中提交 Spark 作業 (預覽)。
Spark 作業需要採用引數的 Python 指令碼。 建立評分指令碼:
score.py
import argparse parser = argparse.ArgumentParser() parser.add_argument("--model") parser.add_argument("--input_data") parser.add_argument("--scored_data") args = parser.parse_args() print(args.model) print(args.input_data) # Load the model as an UDF function predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda") # Read the data you want to score df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target") # Run the function `predict_function` and place the predictions on a new column scored_data = df.withColumn("predictions", score_function(*df.columns)) # Save the predictions scored_data.to_csv(args.scored_data)
上述指令碼接受三個引數︰
--model
、--input_data
和--scored_data
。 前兩個為輸入,代表我們需要執行的模型和輸入資料,最後一個則是輸出,其為將放置預測的輸出資料夾。提示
安裝 Python 套件:先前的評分指令碼會將 MLflow 模型載入 UDF 函式,但會指出參數
env_manager="conda"
。 設定此參數時,MLflow 會還原隔離環境中只有 UDF 函式執行時,模型定義中所指定的必要套件。 如需詳細資訊,請參閱mlflow.pyfunc.spark_udf
文件。建立工作定義:
mlflow-score-spark-job.yml
$schema: http://azureml/sdk-2-0/SparkJob.json type: spark code: ./src entry: file: score.py conf: spark.driver.cores: 1 spark.driver.memory: 2g spark.executor.cores: 2 spark.executor.memory: 2g spark.executor.instances: 2 inputs: model: type: mlflow_model path: azureml:heart-classifier@latest input_data: type: uri_file path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv mode: direct outputs: scored_data: type: uri_folder args: >- --model ${{inputs.model}} --input_data ${{inputs.input_data}} --scored_data ${{outputs.scored_data}} identity: type: user_identity resources: instance_type: standard_e4s_v3 runtime_version: "3.2"
提示
若要使用附加的 Synapse Spark 集區,請定義上述範例 YAML 規格檔案中的
compute
屬性,而不是resources
屬性。上述 YAML 檔案可以搭配
--file
參數在az ml job create
令中使用,以建立獨立 Spark 作業,如下所示:az ml job create -f mlflow-score-spark-job.yml