次の方法で共有


自動 ML を使用してモデルを作成する (プレビュー)

自動機械学習 (AutoML) には、最小限の人間の介入で機械学習モデルのトレーニングと最適化のプロセスを合理化するように設計された一連の手法とツールが含まれています。 AutoML の主な目的は、特定のデータセットに最適な機械学習モデルとハイパーパラメーターの選択を簡素化し、高速化することです。これは、通常、かなりの専門知識と計算リソースを必要とするタスクです。 Fabric フレームワーク内では、データ サイエンティストは flaml.AutoML モジュールを利用して、機械学習ワークフローのさまざまな側面を自動化できます。

この記事では、Spark データセットを使用してコードから直接 AutoML 試用版を生成するプロセスについて詳しい説明を行います。 さらに、このデータを Pandas データフレームに変換する方法について説明し、実験の試行を並列化するための手法について説明します。

重要

この機能はプレビュー段階にあります。

前提条件

  • 新しい Fabric 環境 を作成するか、Fabric Runtime 1.2 (Spark 3.4 (以降) と Delta 2.4) で実行していることを確認します
  • 新しいノートブックを作成します。
  • ノートブックをレイクハウスにアタッチします。 ノートブックの左側で [追加] を選択して、既存のレイクハウスを追加するか、新しいレイクハウスを作成します。

データの読み込みと準備

このセクションでは、データのダウンロード設定を指定し、レイクハウスに保存します。

データをダウンロードする

このコード ブロックは、リモート ソースからデータをダウンロードし、レイクハウスに保存します

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Spark DataFrame にデータを読み込む

次のコード ブロックは、CSV ファイルから Spark DataFrame にデータを読み込み、効率的な処理のためにキャッシュします。

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

このコードは、データ ファイルがダウンロードされ、指定されたパスに配置されていることを前提としています。 CSV ファイルを Spark DataFrame に読み取り、スキーマを推論し、後続の操作中にアクセスを高速化するためにキャッシュします。

データを準備する

このセクションでは、データセットに対してデータ クリーニングと特徴エンジニアリングを実行します。

データをクリーンアップする

まず、データをクリーニングする関数を定義します。これには、不足しているデータを含む行の削除、特定の列に基づく重複する行の削除、不要な列の削除が含まれます。

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

clean_data 関数は、不要な列を削除しながら、データセットに欠損値や重複がないことを確認するのに役立ちます。

機能エンジニアリング

次に、ワンホット エンコードを使用して 、"Geography" 列と "Gender" 列のダミー列を作成することで、特徴エンジニアリングを実行します。

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

ここでは、ワンホット エンコードを使用してカテゴリ列をバイナリ ダミー列に変換し、機械学習アルゴリズムに適した列にします。

クリーニングされたデータの表示

最後に、表示関数を使用して、クリーニングおよび特徴エンジニアリングされたデータセットを表示します。


display(df_clean)

この手順では、適用された変換で結果の DataFrame を検査できます。

レイクハウスに保存

次に、クリーニングおよび特徴エンジニアリングされたデータセットをレイクハウスに保存します。

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

ここでは、クリーニングおよび変換された PySpark DataFrame df_clean を取得し、それを "churn_data_clean" という名前の Delta テーブルとしてレイクハウスに保存します。 データセットの効率的なバージョン管理と管理には Delta 形式を使用します。 mode("overwrite") により、同じ名前の既存のテーブルが上書きされ、テーブルの新しいバージョンが作成されます。

テスト データセットとトレーニング データセットを作成する

次に、クリーニングおよび特徴エンジニアリングされたデータからテスト データセットとトレーニング データセットを作成します。

提供されたコード セクションでは、Delta 形式を使用して、クリーニングおよび特徴エンジニアリングされたデータセットをレイクハウスから読み込み、80 - 20 の比率でトレーニング セットとテスト セットに分割し、機械学習用のデータを準備します。 この準備には、PySpark ML から VectorAssembler をインポートして、機能列を単一の "特徴" 列に結合することが含まれています。 その後、VectorAssembler を使用して、トレーニングデータセットとテストデータセットを変換し、ターゲット 変数 "Exited" と特徴ベクトルを含む train_data および test_data DataFrame が生成されます。 これらのデータセットは、機械学習モデルの構築と評価に使用する準備ができました。

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

ベースライン モデルをトレーニングする

特徴付けされたデータを使用して、ベースライン機械学習モデルをトレーニングし、実験追跡用に MLflow を構成し、メトリック計算用の予測関数を定義し、最後に、結果の ROC AUC スコアを表示してログに記録します。

ログ レベルの設定

ここでは、ログ レベルを構成して、Synapse.ml ライブラリからの不要な出力を抑制し、ログをクリーンな状態に保ちます。

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

MLflow を構成する

このセクションでは、実験追跡用に MLflow を構成します。 実行を整理するために、実験名を "automl_sample" に設定します。 さらに、自動ログ記録を有効にして、モデルのパラメーター、メトリック、および成果物が MLflow に自動的に記録されるようにします。

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

モデルをトレーニングして評価する

最後に、提供されたトレーニング データに対して LightGBMClassifier モデルをトレーニングします。 モデルは、二項分類と不均衡処理に必要な設定で構成されます。 次に、このトレーニングされたモデルを使用して、テスト データについて予測を行います。 テスト データから、正のクラスと真のラベルの予測確率を抽出します。 その後、sklearn の roc_auc_score 関数を使用して ROC AUC スコアを計算します。

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

ここから、結果として得られたモデルが ROC AUC スコア 84% を達成していることがわかります。

FLAML を使用して AutoML 試用版を作成する

このセクションでは、FLAML パッケージを使用して AutoML 試用版を作成し、試用版の設定を構成し、Spark データセットを Pandas on Spark データセットに変換し、AutoML 試用版を実行して、結果のメトリックを表示します。

AutoML 試用版を構成する

ここでは、必要なクラスとモジュールを FLAML パッケージからインポートし、AutoML のインスタンスを作成します。これは、機械学習パイプラインの自動化に使用されます。

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

設定の構成

このセクションでは、AutoML 試用版の構成設定を定義します。

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Pandas on Spark に変換する

Spark ベースのデータセットで AutoML を実行するには、to_pandas_on_spark 関数を使用して Pandas on Spark データセットに変換する必要があります。 これにより、FLAML はデータを効率的に操作できます。

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

AutoML 試用版を実行する

次に、AutoML 試用版を実行します。 入れ子になった MLflow 実行を使用して、既存の MLflow 実行コンテキスト内の実験を追跡します。 AutoML 試用版は、ターゲット変数 "Exited を使用して Pandas on Spark データセット (df_automl) に対して実行され、定義された設定が構成のために fit 関数に渡されます。

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

結果のメトリックを表示する

この最後のセクションでは、AutoML 試用版の結果を取得して表示します。 これらのメトリックは、特定のデータセットの AutoML モデルのパフォーマンスと構成に関する分析情報を提供します。

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Apache Spark を使用して AutoML 試用版を並列化する

データセットを 1 つのノードに収めることができるシナリオで、複数の並列 AutoML 試用版を同時に実行するために Spark の機能を活用する場合は、次の手順に従います。

Pandas データフレームへの変換

並列処理を有効にするには、まずデータを Pandas DataFrame に変換する必要があります。

pandas_df = train_raw.toPandas()

ここでは、train_raw Spark DataFrame を pandas_df という名前の Pandas DataFrame に変換して、並列処理に適したものにします。

並列化設定を構成する

Spark ベースの並列処理を有効にするように use_sparkTrue に設定します。 既定では、FLAML は Executor ごとに 1 つの試用版を起動します。 n_concurrent_trials 引数を使用して、同時試行回数をカスタマイズできます。

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

これらの設定では、Spark を並列処理に使用するように use_sparkTrue に指定します。 また、同時試行回数を 3 に設定しています。つまり、Spark で 3 つの試用版が並列に実行されます。

AutoML の軌跡を並列化する方法の詳細については、並列 Spark ジョブの FLAML ドキュメントを参照してください。

AutoML 試用版を並列で実行する

次に、指定した設定と並行して AutoML 試用版を実行します。 入れ子になった MLflow 実行を使用して、既存の MLflow 実行コンテキスト内の実験を追跡します。

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

これで、並列化が有効になっている AutoML 試用版が実行されます。 dataframe 引数は Pandas DataFrame pandas_dfに設定され、その他の設定は並列実行のために fit 関数に渡されます。

メトリックを表示する

並列 AutoML 試用版を実行した後、最適なハイパーパラメーター構成、検証データの ROC AUC、最適な実行のトレーニング期間など、結果を取得して表示します。

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))