Spark ジョブでの MLflow モデルのデプロイと実行
この記事では、Spark ジョブで MLflow モデルをデプロイして実行し、大量のデータに対して、またはデータ ラングリング ジョブの一部として推論を実行する方法について説明します。
この例の概要
この例では、Azure Machine Learning に登録されている MLflow モデルを、マネージド Spark クラスター (プレビュー)、Azure Databricks、または Azure Synapse Analytics で実行されている Spark ジョブにデプロイして、大量のデータに対して推論を実行する方法を示します。
このモデルは、UCI Heart Disease Data Set をベースにしています。 このデータベースには 76 個の属性が含まれていますが、そのサブセットである 14 個を使っています。 このモデルは、患者の心臓病の存在を予測しようと試みるものです。 これは 0 (存在しない) から 1 (存在する) の整数値です。 トレーニングには XGBBoost
分類器が使われ、必要な前処理はすべて scikit-learn
パイプラインとしてパッケージ化されているため、このモデルは生データから予測までを行うエンドツーエンドのパイプラインになっています。
この記事の情報は、azureml-examples リポジトリに含まれているコード サンプルを基にしています。 ファイルをコピーして貼り付けることなくコマンドをローカルで実行するには、リポジトリを複製し、ディレクトリを sdk/using-mlflow/deploy
に変更します。
git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy
前提条件
この記事の手順に従う前に、次の前提条件が満たされていることをご確認ください。
MLflow SDK
mlflow
パッケージと MLflow 用の Azure Machine Learningazureml-mlflow
プラグインを次のようにインストールします:pip install mlflow azureml-mlflow
ヒント
SQL ストレージ、サーバー、UI、またはデータ サイエンスの依存関係のない軽量 MLflow パッケージであるパッケージ
mlflow-skinny
を使用できます。 このパッケージは、デプロイを含む一連の機能をインポートせずに、MLflow の追跡とログ記録の機能を主に必要とするユーザーに推奨されます。Azure Machine Learning ワークスペースを作成します。 ワークスペースを作成するには、「開始する必要があるリソースを作成する」を参照してください。 ワークスペース内で MLflow 操作を実行するために必要なアクセス許可を確認します。
リモート追跡 (つまり、Azure Machine Learning の外部で実行されている実験の追跡) を実行する場合は、Azure Machine Learning ワークスペースの追跡 URI を指すように MLflow を構成します。 MLflow をワークスペースに接続する方法の詳細については、「Azure Machine Learning 用に MLflow を構成する」を参照してください。
- ワークスペースに MLflow モデルが登録されている必要があります。 特に、この例では、Diabetes データセット用にトレーニングされたモデルを登録します。
ワークスペースに接続する
まず、モデルが登録されている Azure Machine Learning ワークスペースに接続しましょう。
追跡は既に構成されています。 MLflow を使用する場合は、既定の資格情報も使用されます。
モデルを登録する
推論を実行するには、Azure Machine Learning レジストリに登録されたモデルが必要です。 この場合、リポジトリにモデルのローカル コピーが既にあるので、ワークスペースのレジストリにモデルを発行するだけで済みます。 デプロイ対象のモデルが既に登録されている場合は、この手順をスキップできます。
model_name = 'heart-classifier'
model_local_path = "model"
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version
または、モデルが実行内でログに記録された場合は、モデルを直接登録できます。
ヒント
モデルを登録するには、モデルが格納されている場所を把握する必要があります。 MLflow の autolog
機能を使用する場合は、使用されるモデルの種類とフレームワークによってパスが異なります。 ジョブの出力を調べて、このフォルダーの名前を特定することをお勧めします。 MLModel
という名前のファイルが含まれているフォルダーを探すことができます。 log_model
を使用して手動でモデルを記録する場合は、対象のメソッドにこのパスを引数として渡します。 たとえば、mlflow.sklearn.log_model(my_model, "classifier")
を使用してモデルを記録する場合、モデルが格納されるパスは classifier
です。
model_name = 'heart-classifier'
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version
注意
パス MODEL_PATH
は、モデルが実行で格納された場所です。
スコア付けする入力データを取得する
ジョブで実行する入力データが必要です。 この例では、インターネットからサンプル データをダウンロードし、Spark クラスターで使用される共有ストレージに配置します。
import urllib
urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")
クラスター全体で使用できるマウントされたストレージ アカウントにデータを移動します。
dbutils.fs.mv("file:/tmp/data", "dbfs:/")
重要
前のコードでは dbutils
を使用しています。これは Azure Databricks クラスターで使用できるツールです。 使用しているプラットフォームに応じて、適切なツールを使用します。
その後、入力データは次のフォルダーに配置されます。
input_data_path = "dbfs:/data"
Spark クラスターでモデルを実行する
次のセクションでは、Spark ジョブで Azure Machine Learning に登録されている MLflow モデルを実行する方法について説明します。
クラスターに次のライブラリがインストールされていることを確認します。
- mlflow<3,>=2.1 - cloudpickle==2.2.0 - scikit-learn==1.2.0 - xgboost==1.7.2
ノートブックを使用して、Azure Machine Learning に登録された MLflow モデルを使用してスコアリング ルーチンを作成する方法を示します。 ノートブックを作成し、既定の言語として PySpark を使用します。
必要な名前空間をインポートします。
import mlflow import pyspark.sql.functions as f
モデル URI を構成します。 次の URI では、
heart-classifier
という名前のモデルの最新バージョンが取り込まれます。model_uri = "models:/heart-classifier/latest"
モデルを UDF 関数として読み込みます。 ユーザー定義関数 (UDF) はユーザーによって定義された関数であり、ユーザー環境でカスタム ロジックを再利用できます。
predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double')
ヒント
predict()
関数によって返される型を制御するには、 引数result_type
を使用します。スコア付けするデータを読み取ります。
df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
この場合、入力データは
CSV
形式で、dbfs:/data/
フォルダーに配置されます。 また、このデータセットには予測するターゲット変数が含まれているため、列target
を削除します。 運用環境のシナリオでは、データにこの列はありません。関数
predict_function
を実行し、新しい列に予測を配置します。 この場合は、予測をpredictions
列に配置します。df.withColumn("predictions", score_function(*df.columns))
ヒント
predict_function
では、必要な列を引数として受け取ります。 この例では、データ フレームのすべての列がモデルによって予期されているため、df.columns
が使用されます。 モデルで列のサブセットが必要な場合は、それらを手動で導入できます。 モデルにシグネチャがある場合、入力と、予期される型の間で型の互換性がある必要があります。予測をストレージに書き戻すことができます。
scored_data_path = "dbfs:/scored-data" scored_data.to_csv(scored_data_path)
Azure Machine Learning のスタンドアロン Spark ジョブでモデルを実行する
Azure Machine Learning では、スタンドアロン Spark ジョブの作成と、Azure Machine Learning パイプラインで使用できる再利用可能な Spark コンポーネントの作成がサポートされています。 この例では、Azure Machine Learning のスタンドアロン Spark ジョブで実行され、MLflow モデルを実行して推論を実行するスコアリング ジョブをデプロイします。
Note
Azure Machine Learning の Spark ジョブの詳細については、「Azure Machine Learning で Spark ジョブを送信する (プレビュー)」を参照してください。
Spark ジョブには、引数を受け取る Python スクリプトが必要です。 次のようにスコアリング スクリプトを作成します。
score.py
import argparse parser = argparse.ArgumentParser() parser.add_argument("--model") parser.add_argument("--input_data") parser.add_argument("--scored_data") args = parser.parse_args() print(args.model) print(args.input_data) # Load the model as an UDF function predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda") # Read the data you want to score df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target") # Run the function `predict_function` and place the predictions on a new column scored_data = df.withColumn("predictions", score_function(*df.columns)) # Save the predictions scored_data.to_csv(args.scored_data)
上記のスクリプトでは、
--model
、--input_data
、--scored_data
の 3 つの引数を受け取ります。 最初の 2 つは入力であり、実行するモデルと入力データを表します。最後の 1 つは出力であり、予測が配置される出力フォルダーです。ヒント
Python パッケージのインストール: 前のスコアリング スクリプトでは MLflow モデルを UDF 関数に読み込みますが、 パラメーター
env_manager="conda"
を示しています。 このパラメーターを設定すると、UDF 関数のみが実行される分離環境で、モデル定義で指定されている必要なパッケージが MLflow によって復元されます。 詳細については、mlflow.pyfunc.spark_udf
のドキュメントを参照してください。ジョブ定義を作成します。
mlflow-score-spark-job.yml
$schema: http://azureml/sdk-2-0/SparkJob.json type: spark code: ./src entry: file: score.py conf: spark.driver.cores: 1 spark.driver.memory: 2g spark.executor.cores: 2 spark.executor.memory: 2g spark.executor.instances: 2 inputs: model: type: mlflow_model path: azureml:heart-classifier@latest input_data: type: uri_file path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv mode: direct outputs: scored_data: type: uri_folder args: >- --model ${{inputs.model}} --input_data ${{inputs.input_data}} --scored_data ${{outputs.scored_data}} identity: type: user_identity resources: instance_type: standard_e4s_v3 runtime_version: "3.2"
ヒント
アタッチされた Synapse Spark プールを使うには、前述のサンプル YAML 仕様ファイルで
resources
プロパティではなくcompute
プロパティを定義します。上の YAML ファイルを
az ml job create
コマンドの--file
パラメーターで指定して、次のようにスタンドアロン Spark ジョブを作成できます。az ml job create -f mlflow-score-spark-job.yml