Развертывание и запуск моделей MLflow в заданиях Spark
Из этой статьи вы узнаете, как развернуть и запустить модель MLflow в заданиях Spark, чтобы выполнить вывод по большим объемам данных или как часть заданий обработки данных.
Сведения об этом примере
В этом примере показано, как развернуть модель MLflow, зарегистрированную в Машинное обучение Azure задания Spark, выполняемые в управляемых кластерах Spark (предварительная версия), Azure Databricks или Azure Synapse Analytics, для вывода больших объемов данных.
Модель основана на наборе данных болезни сердца UCI. База данных содержит 76 атрибутов, но мы используем подмножество из 14 из них. Модель пытается предсказать наличие сердечно-сосудистых заболеваний у пациента. Целочисленное значение от 0 (нет присутствия) до 1 (присутствие). Он был обучен с помощью XGBBoost
классификатора, и все необходимые предварительной обработки были упакованы в виде scikit-learn
конвейера, что делает эту модель сквозным конвейером, который переходит от необработанных данных к прогнозам.
Сведения в этой статье основаны на примерах кода, имеющихся в репозитории azureml-examples. Для локального выполнения команд без необходимости копирования и вставки файлов клонируйте репозиторий, а затем измените каталоги sdk/using-mlflow/deploy
на .
git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy
Необходимые компоненты
Перед выполнением действий, описанных в этой статье, убедитесь, что выполнены следующие необходимые условия:
Установите пакет пакета SDK
mlflow
MLflow и подключаемый модуль Машинное обучение Azureazureml-mlflow
для MLflow следующим образом:pip install mlflow azureml-mlflow
Совет
Вы можете использовать
mlflow-skinny
пакет, который является упрощенным пакетом MLflow без хранилища SQL, сервера, пользовательского интерфейса или зависимостей для обработки и анализа данных. Этот пакет рекомендуется для пользователей, которые в первую очередь нуждаются в возможностях отслеживания и ведения журнала MLflow, не импортируя полный набор функций, включая развертывания.Создайте рабочую область Машинного обучения Azure. Чтобы создать рабочую область, см. статью "Создание ресурсов", которые необходимо приступить к работе. Просмотрите разрешения доступа, необходимые для выполнения операций MLflow в рабочей области.
Чтобы выполнить удаленное отслеживание или отслеживать эксперименты, выполняемые вне Машинное обучение Azure, настройте MLflow, чтобы указать URI отслеживания рабочей области Машинное обучение Azure. Дополнительные сведения о подключении MLflow к рабочей области см. в разделе "Настройка MLflow" для Машинное обучение Azure.
- В рабочей области должна быть зарегистрирована модель MLflow. В частности, в этом примере будет зарегистрирована модель, обученная для набора данных диабета.
Подключение к рабочей области
Сначала давайте подключимся к рабочей области Машинное обучение Azure, где зарегистрирована модель.
Отслеживание уже настроено для вас. Учетные данные по умолчанию также будут использоваться при работе с MLflow.
Регистрация модели
Нам нужна модель, зарегистрированная в реестре Машинное обучение Azure для выполнения вывода. В этом случае у нас уже есть локальная копия модели в репозитории, поэтому нам нужно опубликовать модель в реестре в рабочей области. Этот шаг можно пропустить, если модель, на который вы пытаетесь развернуть, уже зарегистрирована.
model_name = 'heart-classifier'
model_local_path = "model"
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version
Кроме того, если модель была зарегистрирована внутри запуска, ее можно зарегистрировать напрямую.
Совет
Чтобы зарегистрировать модель, необходимо знать расположение, в котором хранится модель. Если вы используете функцию autolog
MLflow, путь будет зависеть от типа и платформы используемой модели. Мы рекомендуем проверить выходные данные заданий, чтобы определить имя этой папки. Вы можете найти папку, содержащую файл с именем MLModel
. Если вы регистрируете модели вручную с использованием log_model
, то путем будет аргумент, который вы передаете такому методу. Например, если вы регистрируете модель с помощью mlflow.sklearn.log_model(my_model, "classifier")
, то путь, в котором хранится classifier
модель.
model_name = 'heart-classifier'
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version
Примечание.
Путь MODEL_PATH
— это расположение, где модель хранилась при выполнении.
Получение входных данных для оценки
Для выполнения или заданий нам потребуется некоторые входные данные. В этом примере мы скачайте примеры данных из Интернета и поместим его в общее хранилище, используемое кластером Spark.
import urllib
urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")
Переместите данные в подключенную учетную запись хранения, доступную всему кластеру.
dbutils.fs.mv("file:/tmp/data", "dbfs:/")
Внимание
Предыдущий код использует dbutils
инструмент, доступный в кластере Azure Databricks. Используйте соответствующее средство в зависимости от используемой платформы.
Затем входные данные помещаются в следующую папку:
input_data_path = "dbfs:/data"
Запуск модели в кластерах Spark
В следующем разделе объясняется, как запускать модели MLflow, зарегистрированные в Машинное обучение Azure в заданиях Spark.
Убедитесь, что в кластере установлены следующие библиотеки:
- mlflow<3,>=2.1 - cloudpickle==2.2.0 - scikit-learn==1.2.0 - xgboost==1.7.2
Мы будем использовать записную книжку для демонстрации того, как создать подпрограмму оценки с моделью MLflow, зарегистрированной в Машинное обучение Azure. Создайте записную книжку и используйте PySpark в качестве языка по умолчанию.
Импортируйте необходимые пространства имен:
import mlflow import pyspark.sql.functions as f
Настройте универсальный код ресурса (URI) модели. Следующий универсальный код ресурса (URI) содержит модель с именем
heart-classifier
в последней версии.model_uri = "models:/heart-classifier/latest"
Загрузите модель в качестве функции UDF. Определяемая пользователем функция (UDF) — это функция, определяемая пользователем, что позволяет повторно использовать пользовательскую логику в пользовательской среде.
predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double')
Совет
Используйте аргумент
result_type
для управления типом, возвращаемым функциейpredict()
.Считывайте данные, которые вы хотите оценить:
df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
В нашем случае входные данные помещаются в
CSV
формат и помещаются в папкуdbfs:/data/
. Мы также сбрасываем столбецtarget
, так как этот набор данных содержит целевую переменную для прогнозирования. В рабочих сценариях данные не будут иметь этот столбец.Запустите функцию
predict_function
и поместите прогнозы в новый столбец. В этом случае мы помещаем прогнозы в столбецpredictions
.df.withColumn("predictions", score_function(*df.columns))
Совет
Получает
predict_function
в качестве аргументов необходимые столбцы. В нашем случае все столбцы кадра данных ожидаются моделью и поэтомуdf.columns
используются. Если для модели требуется подмножество столбцов, их можно представить вручную. Если у модели есть сигнатура, типы должны быть совместимы между входными и ожидаемыми типами.Прогнозы можно записать обратно в хранилище:
scored_data_path = "dbfs:/scored-data" scored_data.to_csv(scored_data_path)
Запуск модели в автономном задании Spark в Машинное обучение Azure
Машинное обучение Azure поддерживает создание изолированного задания Spark и создание повторно используемого компонента Spark, который можно использовать в конвейерах Машинное обучение Azure. В этом примере мы развернем задание оценки, которое выполняется в автономном задании Spark Машинное обучение Azure и запускает модель MLflow для выполнения вывода.
Примечание.
Дополнительные сведения о заданиях Spark в Машинное обучение Azure см. в разделе "Отправка заданий Spark" в Машинное обучение Azure (предварительная версия).
Для задания Spark требуется скрипт Python, который принимает аргументы. Создайте скрипт оценки:
score.py
import argparse parser = argparse.ArgumentParser() parser.add_argument("--model") parser.add_argument("--input_data") parser.add_argument("--scored_data") args = parser.parse_args() print(args.model) print(args.input_data) # Load the model as an UDF function predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda") # Read the data you want to score df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target") # Run the function `predict_function` and place the predictions on a new column scored_data = df.withColumn("predictions", score_function(*df.columns)) # Save the predictions scored_data.to_csv(args.scored_data)
Приведенный выше скрипт принимает три аргумента
--model
и--scored_data
--input_data
. Первые два являются входными данными и представляют модель, которую мы хотим запустить, а входные данные — выходные данные, и это папка вывода, в которой будут размещаться прогнозы.Совет
Установка пакетов Python: предыдущий скрипт оценки загружает модель MLflow в функцию UDF, но указывает параметр
env_manager="conda"
. Если этот параметр задан, MLflow восстановит необходимые пакеты, указанные в определении модели в изолированной среде, где выполняется только функция UDF. Дополнительные сведения смmlflow.pyfunc.spark_udf
. в документации.Создайте определение задания:
mlflow-score-spark-job.yml
$schema: http://azureml/sdk-2-0/SparkJob.json type: spark code: ./src entry: file: score.py conf: spark.driver.cores: 1 spark.driver.memory: 2g spark.executor.cores: 2 spark.executor.memory: 2g spark.executor.instances: 2 inputs: model: type: mlflow_model path: azureml:heart-classifier@latest input_data: type: uri_file path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv mode: direct outputs: scored_data: type: uri_folder args: >- --model ${{inputs.model}} --input_data ${{inputs.input_data}} --scored_data ${{outputs.scored_data}} identity: type: user_identity resources: instance_type: standard_e4s_v3 runtime_version: "3.2"
Совет
Чтобы использовать присоединенный пул Synapse Spark, определите
compute
свойство в примере файла спецификации YAML, показанногоresources
выше вместо свойства.Приведенные выше файлы YAML можно использовать в
az ml job create
команде с--file
параметром для создания автономного задания Spark, как показано ниже.az ml job create -f mlflow-score-spark-job.yml