次の方法で共有


チュートリアル パート 4: バッチ スコアリングを実行し、予測をレイクハウスに保存する

このチュートリアルでは、Microsoft Fabric MLflow モデル レジストリを使用して、パート 3 でトレーニングした登録済みの LightGBMClassifier モデルをインポートし、レイクハウスから読み込まれたテスト データセットに対してバッチ予測を実行する方法について説明します。

Microsoft Fabric を使用すると、ユーザーは PREDICT と呼ばれるスケーラブルな関数を使用して機械学習モデルを運用化できます。これは、任意のコンピューティング エンジンでのバッチ スコアリングをサポートします。 ユーザーは、Microsoft Fabric ノートブックまたは特定のモデルの項目ページから直接バッチ予測を生成できます。 PREDICT について説明します。

テスト データセットでバッチ予測を生成するには、すべてのトレーニング済み機械学習モデルの中で最高のパフォーマンスを示したトレーニング済み LightGBM モデルのバージョン 1 を使用します。 テスト データセットを Spark DataFrame に読み込み、バッチ予測を生成するために MLFlowTransformer オブジェクトを作成します。 次に、次の 3 つの方法のいずれかを使用して、PREDICT 関数を呼び出します。

  • SynapseML のトランスフォーマー API
  • Spark SQL API
  • PySpark ユーザー定義関数 (UDF)

前提条件

これは、5 部構成チュートリアル シリーズの第 4 部です。 このチュートリアルを完了するには、最初に次の手順を完了します。

ノートブックで作業を進める

4-predict.ipynb は、このチュートリアルに付属するノートブックです。

このチュートリアルに付随するノートブックを開くには、「データ サイエンス用にシステムを準備する」チュートリアル の手順に従い、ノートブックをお使いのワークスペースにインポートします。

このページからコードをコピーして貼り付ける場合は、新しいノートブックを作成することができます。

コードの実行を開始する前に、必ずレイクハウスをノートブックにアタッチしてください。

重要

このシリーズの他の部分で使用したのと同じレイクハウスを接続します。

テストデータの読み込み

パート 3 で保存したテスト データを読み込みます。

df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)

Transformer API を使用した PREDICT

SynapseML の トランスフォーマー API を使用するには、最初に MLFlowTransformer オブジェクトを作成する必要があります。

MLFlowTransformer オブジェクトのインスタンス化

MLFlowTransformer オブジェクトは、パート 3 で登録した MLFlow モデルのラッパーです。 これにより、指定された DataFrame でバッチ予測を生成できるようになります。 MLFlowTransformer オブジェクトをインスタンス化するには、次のパラメーターを指定する必要があります。

  • モデルに対する入力として必要になるテスト DataFrame からの列 (この場合、すべての列が必要です)。
  • 新しい出力列の名前 (この場合は予測値)。
  • 予測を生成するための正しいモデル名とモデル バージョン (この場合は lgbm_sm とバージョン 1)。
from synapse.ml.predict import MLFlowTransformer

model = MLFlowTransformer(
    inputCols=list(df_test.columns),
    outputCol='predictions',
    modelName='lgbm_sm',
    modelVersion=1
)

これで MLFlowTransformer オブジェクトを入手したので、それを使用してバッチ予測を生成できます。

import pandas

predictions = model.transform(df_test)
display(predictions)

Spark SQL API を使用した PREDICT

次のコードでは、Spark SQL API を使用して PREDICT 関数を呼び出します。

from pyspark.ml.feature import SQLTransformer 

# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns

sqlt = SQLTransformer().setStatement( 
    f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")

# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))

ユーザー定義関数を使用した PREDICT (UDF)

次のコードでは、PySpark UDF を使用して PREDICT 関数を呼び出します。

from pyspark.sql.functions import col, pandas_udf, udf, lit

# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns

display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))

モデルのアイテム ページから PREDICT コードも生成できることに注意してください。 PREDICT について説明します。

モデル予測結果をレイクハウスに書き込む

バッチ予測を生成したら、モデル予測結果をレイクハウスに書き戻します。

# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")

次のステップ

次に進みます。