次の方法で共有


チュートリアル パート 4: バッチ スコアリングを実行し、予測を lakehouse に保存する

このチュートリアルでは、Microsoft Fabric MLflow モデル レジストリを使用してパート 3 でトレーニングされた登録済みの LightGBMClassifier モデルをインポートし、Lakehouse から読み込まれたテスト データセットでバッチ予測を実行する方法について説明します。

Microsoft Fabric を使用すると、PREDICT と呼ばれるスケーラブルな関数を使用して機械学習モデルを運用化できます。これは、任意のコンピューティング エンジンでのバッチ スコアリングをサポートします。 バッチ予測は、Microsoft Fabric ノートブックまたは特定のモデルの項目ページから直接生成できます。 PREDICT について説明します。

テスト データセットでバッチ予測を生成するには、トレーニング済みの LightGBM モデルのバージョン 1 を使用して、トレーニングされたすべての機械学習モデルの中で最高のパフォーマンスを示します。 テスト データセットを Spark DataFrame に読み込み、バッチ予測を生成する MLFlowTransformer オブジェクトを作成します。 その後、次の 3 つの方法のいずれかを使用して PREDICT 関数を呼び出すことができます。

  • SynapseML からの Transformer API
  • Spark SQL API
  • PySpark ユーザー定義関数 (UDF)

前提 条件

  • Microsoft Fabric サブスクリプションを取得します。 または、無料の Microsoft Fabric 試用版を登録する

  • Microsoft Fabric にサインインします。

  • ホーム ページの左下にあるエクスペリエンス スイッチャーを使用して、Fabric に切り替えます。

    データ サイエンスを選択する場所を示すエクスペリエンス スイッチャー メニューのスクリーンショット。

チュートリアル シリーズのこのパート 4/5。 このチュートリアルを完了するには、最初に次の手順を完了します。

ノートブックで作業を進める

4-predict.ipynb は、このチュートリアルに付属するノートブックです。

重要

このシリーズの他の部分で使用したのと同じレイクハウスを取り付けます。

テスト データを読み込む

パート 3 で保存したテスト データを読み込みます。

df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)

Transformer API を使用した PREDICT

SynapseML から Transformer API を使用するには、まず MLFlowTransformer オブジェクトを作成する必要があります。

MLFlowTransformer オブジェクトをインスタンス化する

MLFlowTransformer オブジェクトは、パート 3 で登録した MLFlow モデルのラッパーです。 これにより、特定の DataFrame でバッチ予測を生成できます。 MLFlowTransformer オブジェクトをインスタンス化するには、次のパラメーターを指定する必要があります。

  • モデルへの入力として必要なテスト DataFrame の列 (この場合は、それらすべてが必要になります)。
  • 新しい出力列の名前 (この場合は予測)。
  • 予測を生成する正しいモデル名とモデル バージョン (この場合は lgbm_sm とバージョン 1)。
from synapse.ml.predict import MLFlowTransformer

model = MLFlowTransformer(
    inputCols=list(df_test.columns),
    outputCol='predictions',
    modelName='lgbm_sm',
    modelVersion=1
)

MLFlowTransformer オブジェクトが作成されたので、それを使用してバッチ予測を生成できます。

import pandas

predictions = model.transform(df_test)
display(predictions)

Spark SQL API を使用した PREDICT

次のコードは、Spark SQL API を使用して PREDICT 関数を呼び出します。

from pyspark.ml.feature import SQLTransformer 

# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns

sqlt = SQLTransformer().setStatement( 
    f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")

# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))

ユーザー定義関数を使用した PREDICT (UDF)

次のコードは、PySpark UDF を使用して PREDICT 関数を呼び出します。

from pyspark.sql.functions import col, pandas_udf, udf, lit

# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns

display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))

モデルの項目ページから PREDICT コードを生成することもできます。 について PREDICTを学びましょう。

モデル予測結果をレイクハウスに書き込む

バッチ予測を生成したら、モデル予測結果を lakehouse に書き戻します。

# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")

次の手順

次の手順に進みます。