Nasazení a spouštění modelů MLflow v úlohách Sparku
V tomto článku se dozvíte, jak nasadit a spustit model MLflow v úlohách Sparku, abyste mohli provádět odvozování velkých objemů dat nebo jako součást úloh transformace dat.
O tomto příkladu
Tento příklad ukazuje, jak můžete nasadit model MLflow zaregistrovaný ve službě Azure Machine Learning do úloh Spark spuštěných ve spravovaných clusterech Spark (Preview), Azure Databricks nebo Azure Synapse Analytics, aby bylo možné provádět odvozování nad velkými objemy dat.
Model je založený na sadě dat UCI Srdeční onemocnění. Databáze obsahuje 76 atributů, ale používáme podmnožinu 14 z nich. Model se snaží předpovědět přítomnost onemocnění srdce u pacienta. Hodnota integer je od 0 (bez přítomnosti) do 1 (přítomnost). Byl vytrénován pomocí XGBBoost
klasifikátoru a veškeré požadované předběžné zpracování bylo zabaleno jako scikit-learn
kanál, takže tento model představuje kompletní kanál, který přechází z nezpracovaných dat na predikce.
Informace v tomto článku vycházejí z ukázek kódu obsažených v úložišti azureml-examples . Pokud chcete příkazy spustit místně, aniž byste museli kopírovat nebo vkládat soubory, naklonujte úložiště a pak změňte adresáře na sdk/using-mlflow/deploy
.
git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy
Požadavky
Než budete postupovat podle kroků v tomto článku, ujistěte se, že máte následující požadavky:
Nainstalujte balíček MLflow SDK
mlflow
a modul plug-in Azure Machine Learningazureml-mlflow
pro MLflow následujícím způsobem:pip install mlflow azureml-mlflow
Tip
Můžete použít
mlflow-skinny
balíček, což je jednoduchý balíček MLflow bez závislostí sql Storage, serveru, uživatelského rozhraní nebo datových věd. Tento balíček se doporučuje uživatelům, kteří primárně potřebují funkce sledování a protokolování MLflow bez importu celé sady funkcí, včetně nasazení.Vytvořte pracovní prostor Azure Machine Learning. Pokud chcete vytvořit pracovní prostor, přečtěte si téma Vytvoření prostředků, které potřebujete, abyste mohli začít. Zkontrolujte přístupová oprávnění , která potřebujete k provádění operací MLflow ve vašem pracovním prostoru.
Pokud chcete provádět vzdálené sledování nebo sledovat experimenty spuštěné mimo Azure Machine Learning, nakonfigurujte MLflow tak, aby odkazovali na identifikátor URI sledování pracovního prostoru Azure Machine Learning. Další informace o připojení MLflow k pracovnímu prostoru najdete v tématu Konfigurace MLflow pro Azure Machine Learning.
- V pracovním prostoru musíte mít zaregistrovaný model MLflow. Zejména tento příklad zaregistruje model natrénovaný pro datovou sadu Diabetes.
Připojení k pracovnímu prostoru
Nejprve se připojíme k pracovnímu prostoru Azure Machine Learning, ve kterém je váš model zaregistrovaný.
Sledování je už pro vás nakonfigurované. Vaše výchozí přihlašovací údaje se použijí také při práci s MLflow.
Registrace modelu
K odvozování potřebujeme model zaregistrovaný v registru služby Azure Machine Learning. V tomto případě už máme v úložišti místní kopii modelu, takže model musíme publikovat jenom do registru v pracovním prostoru. Tento krok můžete přeskočit, pokud už je model, který se pokoušíte nasadit, zaregistrovaný.
model_name = 'heart-classifier'
model_local_path = "model"
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version
Případně pokud byl váš model přihlášený uvnitř spuštění, můžete ho zaregistrovat přímo.
Tip
Pokud chcete model zaregistrovat, budete muset znát umístění, kde byl model uložen. Pokud používáte autolog
funkci MLflow, bude cesta záviset na typu a rozhraní použitého modelu. Doporučujeme zkontrolovat výstup úloh a zjistit, který název této složky je. Můžete vyhledat složku, která obsahuje soubor s názvem MLModel
. Pokud modely protokolujete ručně pomocí log_model
, pak cesta je argument, který předáte této metodě. Pokud například protokolujete model pomocí mlflow.sklearn.log_model(my_model, "classifier")
, pak cesta, kde je model uložen, je classifier
.
model_name = 'heart-classifier'
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version
Poznámka:
Cesta MODEL_PATH
je umístění, ve kterém byl model uložen ve spuštění.
Získání vstupních dat pro určení skóre
Ke spuštění nebo úlohám budeme potřebovat nějaká vstupní data. V tomto příkladu stáhneme ukázková data z internetu a umístíme je do sdíleného úložiště používaného clusterem Spark.
import urllib
urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")
Přesuňte data do připojeného účtu úložiště, který je k dispozici pro celý cluster.
dbutils.fs.mv("file:/tmp/data", "dbfs:/")
Důležité
Předchozí kód používá dbutils
nástroj dostupný v clusteru Azure Databricks. Vhodný nástroj použijte v závislosti na platformě, kterou používáte.
Vstupní data se pak umístí do následující složky:
input_data_path = "dbfs:/data"
Spuštění modelu v clusterech Spark
Následující část vysvětluje, jak spouštět modely MLflow zaregistrované ve službě Azure Machine Learning v úlohách Sparku.
Ujistěte se, že jsou v clusteru nainstalované následující knihovny:
- mlflow<3,>=2.1 - cloudpickle==2.2.0 - scikit-learn==1.2.0 - xgboost==1.7.2
Pomocí poznámkového bloku si ukážeme, jak vytvořit rutinu bodování s modelem MLflow zaregistrovaným ve službě Azure Machine Learning. Vytvořte poznámkový blok a jako výchozí jazyk použijte PySpark.
Import požadovaných oborů názvů:
import mlflow import pyspark.sql.functions as f
Nakonfigurujte identifikátor URI modelu. Následující identifikátor URI přináší model pojmenovaný
heart-classifier
v nejnovější verzi.model_uri = "models:/heart-classifier/latest"
Načtěte model jako funkci UDF. Uživatelem definovaná funkce (UDF) je funkce definovaná uživatelem, která umožňuje opakované použití vlastní logiky v uživatelském prostředí.
predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double')
Tip
Pomocí argumentu
result_type
můžete řídit typ vrácenýpredict()
funkcí.Přečtěte si data, která chcete skóre:
df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
V našem případě jsou vstupní data ve
CSV
formátu a umístěna ve složcedbfs:/data/
. Sloupec také přehazujemetarget
, protože tato datová sada obsahuje cílovou proměnnou, která se má předpovědět. V produkčních scénářích nebudou mít vaše data tento sloupec.Spusťte funkci
predict_function
a umístěte předpovědi do nového sloupce. V tomto případě umístíme předpovědi do sloupcepredictions
.df.withColumn("predictions", score_function(*df.columns))
Tip
Přijímá
predict_function
jako argumenty požadované sloupce. V našem případě se očekává, že model očekává všechny sloupce datového rámce, a protodf.columns
se použije. Pokud váš model vyžaduje podmnožinu sloupců, můžete je zavést ručně. Pokud máte podpis, musí být typy kompatibilní mezi vstupy a očekávanými typy.Predikce můžete zapsat zpět do úložiště:
scored_data_path = "dbfs:/scored-data" scored_data.to_csv(scored_data_path)
Spuštění modelu v samostatné úloze Spark ve službě Azure Machine Learning
Azure Machine Learning podporuje vytvoření samostatné úlohy Spark a vytvoření opakovaně použitelné komponenty Spark, kterou je možné použít v kanálech Azure Machine Learning. V tomto příkladu nasadíme úlohu bodování, která běží v samostatné úloze Sparku ve službě Azure Machine Learning a spustí model MLflow, který provede odvozování.
Poznámka:
Další informace o úlohách Sparku ve službě Azure Machine Learning najdete v tématu Odesílání úloh Sparku ve službě Azure Machine Learning (Preview)
Úloha Sparku vyžaduje skript Pythonu, který přebírá argumenty. Vytvořte bodovací skript:
score.py
import argparse parser = argparse.ArgumentParser() parser.add_argument("--model") parser.add_argument("--input_data") parser.add_argument("--scored_data") args = parser.parse_args() print(args.model) print(args.input_data) # Load the model as an UDF function predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda") # Read the data you want to score df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target") # Run the function `predict_function` and place the predictions on a new column scored_data = df.withColumn("predictions", score_function(*df.columns)) # Save the predictions scored_data.to_csv(args.scored_data)
Výše uvedený skript přebírá tři argumenty
--model
--input_data
a--scored_data
. První dva jsou vstupy a představují model, který chceme spustit, a vstupní data, poslední je výstupem a jedná se o výstupní složku, ve které budou umístěny předpovědi.Tip
Instalace balíčků Pythonu: Předchozí bodovací skript načte model MLflow do funkce UDF, ale označuje parametr
env_manager="conda"
. Pokud je tento parametr nastavený, MLflow obnoví požadované balíčky, jak je uvedeno v definici modelu v izolovaném prostředí, kde se spustí jenom funkce definovaná uživatelem. Další podrobnosti najdete vmlflow.pyfunc.spark_udf
dokumentaci.Vytvoření definice úlohy:
mlflow-score-spark-job.yml
$schema: http://azureml/sdk-2-0/SparkJob.json type: spark code: ./src entry: file: score.py conf: spark.driver.cores: 1 spark.driver.memory: 2g spark.executor.cores: 2 spark.executor.memory: 2g spark.executor.instances: 2 inputs: model: type: mlflow_model path: azureml:heart-classifier@latest input_data: type: uri_file path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv mode: direct outputs: scored_data: type: uri_folder args: >- --model ${{inputs.model}} --input_data ${{inputs.input_data}} --scored_data ${{outputs.scored_data}} identity: type: user_identity resources: instance_type: standard_e4s_v3 runtime_version: "3.2"
Tip
Chcete-li použít připojený fond Synapse Spark, definujte
compute
vlastnost v ukázkovém souboru specifikace YAML zobrazeném výše místoresources
vlastnosti.Soubory YAML uvedené výše lze použít v
az ml job create
příkazu s parametrem--file
k vytvoření samostatné úlohy Sparku, jak je znázorněno na následujícím obrázku:az ml job create -f mlflow-score-spark-job.yml