チュートリアル パート 4: バッチ スコアリングを実行し、予測を lakehouse に保存する
このチュートリアルでは、Microsoft Fabric MLflow モデル レジストリを使用してパート 3 でトレーニングされた登録済みの LightGBMClassifier モデルをインポートし、Lakehouse から読み込まれたテスト データセットでバッチ予測を実行する方法について説明します。
Microsoft Fabric を使用すると、PREDICT と呼ばれるスケーラブルな関数を使用して機械学習モデルを運用化できます。これは、任意のコンピューティング エンジンでのバッチ スコアリングをサポートします。 バッチ予測は、Microsoft Fabric ノートブックまたは特定のモデルの項目ページから直接生成できます。 PREDICT について説明します。
テスト データセットでバッチ予測を生成するには、トレーニング済みの LightGBM モデルのバージョン 1 を使用して、トレーニングされたすべての機械学習モデルの中で最高のパフォーマンスを示します。 テスト データセットを Spark DataFrame に読み込み、バッチ予測を生成する MLFlowTransformer オブジェクトを作成します。 その後、次の 3 つの方法のいずれかを使用して PREDICT 関数を呼び出すことができます。
- SynapseML からの Transformer API
- Spark SQL API
- PySpark ユーザー定義関数 (UDF)
前提 条件
Microsoft Fabric サブスクリプションを取得します。 または、無料の Microsoft Fabric 試用版を登録する 。
Microsoft Fabric にサインインします。
ホーム ページの左下にあるエクスペリエンス スイッチャーを使用して、Fabric に切り替えます。
チュートリアル シリーズのこのパート 4/5。 このチュートリアルを完了するには、最初に次の手順を完了します。
- パート 1: Apache Sparkを使用して Microsoft Fabric Lakehouse にデータを取り込みます。
- パート 2: データの詳細については、Microsoft Fabric ノートブック を使用してデータを探索し、視覚化します。
- パート 3: 機械学習モデルをトレーニングし、登録します。
ノートブックで作業を進める
4-predict.ipynb は、このチュートリアルに付属するノートブックです。
このチュートリアルの付属のノートブックを開くには、「データ サイエンス用にシステムを準備する」の手順に従って、ノートブックをワークスペースにインポート します。
むしろこのページからコードをコピーして貼り付ける場合は、新しいノートブックを作成できます。
コードの実行を開始する前に、必ずレイクハウスをノートブックにアタッチしてください。
重要
このシリーズの他の部分で使用したのと同じレイクハウスを取り付けます。
テスト データを読み込む
パート 3 で保存したテスト データを読み込みます。
df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)
Transformer API を使用した PREDICT
SynapseML から Transformer API を使用するには、まず MLFlowTransformer オブジェクトを作成する必要があります。
MLFlowTransformer オブジェクトをインスタンス化する
MLFlowTransformer オブジェクトは、パート 3 で登録した MLFlow モデルのラッパーです。 これにより、特定の DataFrame でバッチ予測を生成できます。 MLFlowTransformer オブジェクトをインスタンス化するには、次のパラメーターを指定する必要があります。
- モデルへの入力として必要なテスト DataFrame の列 (この場合は、それらすべてが必要になります)。
- 新しい出力列の名前 (この場合は予測)。
- 予測を生成する正しいモデル名とモデル バージョン (この場合は
lgbm_sm
とバージョン 1)。
from synapse.ml.predict import MLFlowTransformer
model = MLFlowTransformer(
inputCols=list(df_test.columns),
outputCol='predictions',
modelName='lgbm_sm',
modelVersion=1
)
MLFlowTransformer オブジェクトが作成されたので、それを使用してバッチ予測を生成できます。
import pandas
predictions = model.transform(df_test)
display(predictions)
Spark SQL API を使用した PREDICT
次のコードは、Spark SQL API を使用して PREDICT 関数を呼び出します。
from pyspark.ml.feature import SQLTransformer
# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns
sqlt = SQLTransformer().setStatement(
f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")
# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))
ユーザー定義関数を使用した PREDICT (UDF)
次のコードは、PySpark UDF を使用して PREDICT 関数を呼び出します。
from pyspark.sql.functions import col, pandas_udf, udf, lit
# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns
display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))
モデルの項目ページから PREDICT コードを生成することもできます。 について PREDICTを学びましょう。
モデル予測結果をレイクハウスに書き込む
バッチ予測を生成したら、モデル予測結果を lakehouse に書き戻します。
# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")
次の手順
次の手順に進みます。