Partage via


Créer des modèles avec ML automatisé (aperçu)

Le Machine learning automatisé (AutoML) englobe un ensemble de techniques et d’outils conçus pour simplifier le processus d’entraînement et d’optimisation des modèles Machine Learning avec un minimum d’intervention humaine. L’objectif principal d’AutoML est de simplifier et d’accélérer la sélection du modèle Machine Learning et des hyperparamètres les plus appropriés pour un jeu de données donné, une tâche qui exige généralement une expertise et des ressources de calcul considérables. Dans le cadre de Fabric, les scientifiques des données peuvent exploiter le module flaml.AutoML pour automatiser divers aspects de leurs flux de travail d’apprentissage automatique.

Dans cet article, nous allons approfondir le processus de génération d’essais AutoML directement à partir du code en utilisant un jeu de données Spark. En outre, nous explorerons les méthodes de conversion de ces données en dataframe Pandas et aborderons les techniques de parallélisation de vos essais d’expérimentation.

Important

Cette fonctionnalité est en préversion.

Prérequis

  • Créez un nouvel environnement Fabric ou vérifiez que vous utilisez le runtime Fabric 1.2 (Spark 3.4 (ou supérieur) et Delta 2.4)
  • Créez un nouveau bloc-notes.
  • Attachez votre notebook à lakehouse. Sur le côté gauche de votre bloc-notes, sélectionnez Ajouter pour ajouter une maison de lac existante ou en créer une nouvelle.

Charger et préparer les données

Dans cette section, nous allons spécifier les paramètres de téléchargement des données, puis les enregistrer dans le lakehouse.

Télécharger des données

Ce bloc de code télécharge les données à partir d’une source distante et les enregistre dans le lakehouse

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Charger des données dans une trame de données Spark

Le bloc de code suivant charge les données du fichier CSV dans un dataframe Spark et les met en cache pour un traitement efficace.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Ce code suppose que le fichier de données a été téléchargé et qu’il se trouve dans le chemin spécifié. Il lit le fichier CSV dans un DataFrame Spark, infère le schéma et le met en cache pour un accès plus rapide lors des opérations suivantes.

Préparer les données

Dans cette section, nous allons effectuer le nettoyage des données et l’ingénierie de caractéristiques sur le jeu de données.

Nettoyer des données

Nous définissons d’abord une fonction pour nettoyer les données, ce qui inclut l’élimination des lignes avec des données manquantes, l’élimination des lignes en double basées sur des colonnes spécifiques et l’élimination des colonnes inutiles.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

La fonction clean_data permet de s’assurer que le jeu de données est exempt de valeurs manquantes et de doublons tout en supprimant les colonnes inutiles.

Ingénierie des caractéristiques

Nous procédons ensuite à l’ingénierie de caractéristiques en créant des colonnes fictives pour les colonnes « Geography » et « Gender » à l’aide de l’encodage à une touche.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Nous utilisons ici l’encodage one-hot pour convertir les colonnes catégorielles en colonnes factices binaires, ce qui les rend adaptées aux algorithmes d’apprentissage automatique.

Afficher les données nettoyées

Enfin, nous affichons le jeu de données nettoyé et soumis à l’ingénierie de caractéristiques à l’aide de la fonction d’affichage.


display(df_clean)

Cette étape vous permet d’inspecter le dataframe résultant avec les transformations appliquées.

Enregistrer dans lakehouse

Nous allons maintenant enregistrer le jeu de données nettoyé et soumis à l’ingénierie de caractéristiques dans lakehouse.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Nous prenons ici le DataFrame PySpark nettoyé et transformé, df_clean, et nous l’enregistrons en tant que table Delta nommée « churn_data_clean » dans lakehouse. Nous utilisons le format Delta pour un contrôle de version et une gestion efficaces du jeu de données. Le mode("overwrite") garantit que toute table existante portant le même nom est écrasée et qu’une nouvelle version de la table est créée.

Créer les jeux de données de test et d’entraînement

Nous allons ensuite créer les jeux de données de test et d’entraînement à partir des données nettoyées et de l’ingénierie de caractéristiques.

Dans la section de code fournie, nous chargeons un jeu de données nettoyé et soumis à l’ingénierie de caractéristiques à partir du lakehouse en utilisant le format Delta, nous le divisons en jeux d’entraînement et de test avec un ratio 80-20 et nous préparons les données pour l’apprentissage automatique. Cette préparation implique l’importation de VectorAssembler de PySpark ML pour combiner les colonnes de caractéristiques en une seule colonne « features ». Nous utilisons ensuite le VectorAssembler pour transformer les jeux de données d’entraînement et de test, ce qui donne des DataFrames train_data et test_data qui contiennent la variable cible «  Exited  » et les vecteurs de caractéristiques. Ces jeux de données sont maintenant prêts à être utilisés pour créer et évaluer des modèles Machine Learning.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Effectuer l’entraînement d’un modèle de référence

En utilisant les données caractérisées, nous allons entraîner un modèle Machine Learning de référence, configurer MLflow pour le suivi des expériences, définir une fonction de prédiction pour le calcul des métriques et, enfin, afficher et journaliser le score ROC AUC résultant.

Définir le niveau de journalisation

Nous configurons ici le niveau de journalisation pour supprimer les sorties inutiles de la bibliothèque Synapse.ml, ce qui permet de garder les journaux plus propres.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Configurer MLflow

Dans cette section, nous configurons MLflow pour le suivi des expériences. Nous définissons le nom de l’expérience sur « automl_sample » pour organiser les exécutions. En outre, nous activons la journalisation automatique, en veillant à ce que les paramètres du modèle, les métriques et les artefacts sont automatiquement journalisés dans MLflow.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Former et évaluer le modèle

Enfin, nous entraînons un modèle LightGBMClassifier sur les données d’entraînement fournies. Le modèle est configuré avec les paramètres nécessaires pour la classification binaire et la gestion des déséquilibres. Nous utilisons ensuite ce modèle entraîné pour faire des prédictions sur les données de test. Nous extrayons les probabilités prédites pour la classe positive et les vraies étiquettes des données de test. Nous calculons ensuite le score ROC AUC à l’aide de la fonction roc_auc_score de sklearn.

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Nous pouvons voir ici que notre modèle atteint un score ROC AUC de 84 %.

Créer un essai AutoML avec FLAML

Dans cette section, nous allons créer un essai AutoML à l’aide du package FLAML, configurer les paramètres de l’essai, convertir le jeu de données Spark en un jeu de données Pandas sur Spark, exécuter l’essai AutoML et afficher les métriques résultantes.

Configurer l’essai AutoML

Nous importons ici les classes et modules nécessaires du package FLAML et créons une instance d’AutoML, qui sera utilisée pour automatiser le pipeline d’apprentissage automatique.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Configurer les paramètres

Dans cette section, nous définissons les paramètres de configuration de l’essai AutoML.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Convertir vers Pandas sur Spark

Pour exécuter AutoML avec un jeu de données basé sur Spark, nous devons le convertir en un jeu de données Pandas sur Spark à l’aide de la fonction to_pandas_on_spark. Cela permet à FLAML de travailler efficacement avec les données.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Exécuter l’essai AutoML

Nous exécutons maintenant l’essai AutoML. Nous utilisons une exécution MLflow imbriquée pour suivre l’expérience dans le contexte de l’exécution MLflow existante. L’essai AutoML est exécuté sur le jeu de données Pandas sur Spark (df_automl) avec la variable cible « Exited » et les paramètres définis sont transmis à la fonction fit pour la configuration.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Afficher les métriques résultantes

Dans cette dernière section, nous récupérons et affichons les résultats de l’essai AutoML. Ces métriques donnent un aperçu des performances et de la configuration du modèle AutoML sur le jeu de données donné.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Paralléliser votre essai AutoML avec Apache Spark

Dans les scénarios où votre jeu de données peut tenir dans un seul nœud et où vous voulez tirer parti de la puissance de Spark pour exécuter simultanément plusieurs essais AutoML en parallèle, vous pouvez suivre les étapes suivantes :

Convertir en dataframe Pandas

Pour permettre la parallélisation, vos données doivent d’abord être converties en dataframe Pandas.

pandas_df = train_raw.toPandas()

Nous convertissons ici le dataframe Spark train_raw en un dataframe Pandas nommé pandas_df afin de le rendre adapté au traitement parallèle.

Configurer les paramètres de parallélisation

Définissez use_spark sur True pour activer le parallélisme basé sur Spark. Par défaut, FLAML lancera un essai par exécuteur. Vous pouvez personnaliser le nombre d’essais simultanés en utilisant l’argument n_concurrent_trials.

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

Nous spécifions dans ces paramètres que nous voulons utiliser Spark pour le parallélisme en définissant use_spark sur True. Nous avons également défini le nombre d’essais simultanés sur 3, ce qui signifie que trois essais seront exécutés en parallèle sur Spark.

Pour en savoir plus sur la manière de paralléliser vos essais AutoML, vous pouvez consulter la documentation FLAML pour les tâches Spark parallèles.

Exécuter l’essai AutoML en parallèle

Nous allons maintenant exécuter l’essai AutoML en parallèle avec les paramètres spécifiés. Nous allons utiliser une exécution MLflow imbriquée pour suivre l’expérience dans le contexte l’exécution MLflow existante.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Ceci va maintenant exécuter l’essai AutoML avec la parallélisation activée. L’argument dataframe est défini sur le DataFrame Pandas pandas_df, et les autres paramètres sont transmis à la fonction fit pour l’exécution parallèle.

Afficher les mesures

Après avoir exécuté l’essai AutoML parallèle, récupérez et affichez les résultats, y compris la meilleure configuration d’hyperparamètres, le ROC AUC sur les données de validation et la durée d’entraînement de l’exécution la plus performante.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))