Créer des modèles avec le ML automatisé (aperçu)
Le Machine Learning automatisé (AutoML) englobe un ensemble de techniques et d’outils conçus pour simplifier le processus d’apprentissage et d’optimisation des modèles Machine Learning avec une intervention humaine minimale. L’objectif principal d’AutoML est de simplifier et d’accélérer la sélection du modèle machine learning et des hyperparamètres les plus appropriés pour un jeu de données donné, une tâche qui demande généralement une expertise et des ressources de calcul considérables. Dans l’infrastructure Fabric, les scientifiques des données peuvent tirer parti du module flaml.AutoML
pour automatiser différents aspects de leurs flux de travail Machine Learning.
Dans cet article, nous allons examiner le processus de génération d’essais AutoML directement à partir du code à l’aide d’un jeu de données Spark. En outre, nous allons explorer les méthodes permettant de convertir ces données en trame de données Pandas et de discuter des techniques de parallélisation de vos essais d’expérimentation.
Conditions préalables
Souscrivez à un abonnement Microsoft Fabric . Vous pouvez également vous inscrire à une version d’évaluation gratuite de Microsoft Fabric .
Connectez-vous à Microsoft Fabric.
Utilisez le sélecteur d’expérience en bas à gauche de votre page d’accueil pour basculer vers Fabric.
- Créez un environnement Fabric ou vérifiez que vous êtes en cours d’exécution sur Fabric Runtime 1.2 (Spark 3.4 (ou version ultérieure) et Delta 2.4)
- Créez un nouveau bloc-notes.
- Attachez votre notebook à un lakehouse. Sur le côté gauche de votre bloc-notes, sélectionnez Ajouter pour ajouter un lakehouse existant ou en créer un.
Charger et préparer des données
Dans cette section, nous allons spécifier les paramètres de téléchargement des données, puis l’enregistrer dans la lakehouse.
Télécharger des données
Ce bloc de code télécharge les données à partir d’une source distante et l’enregistre dans la lakehouse
import os
import requests
IS_CUSTOM_DATA = False # if TRUE, dataset has to be uploaded manually
if not IS_CUSTOM_DATA:
# Specify the remote URL where the data is hosted
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
# List of data files to download
file_list = ["churn.csv"]
# Define the download path within the lakehouse
download_path = "/lakehouse/default/Files/churn/raw"
# Check if the lakehouse directory exists; if not, raise an error
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
# Create the download directory if it doesn't exist
os.makedirs(download_path, exist_ok=True)
# Download each data file if it doesn't already exist in the lakehouse
for fname in file_list:
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
Charger des données dans un dataframe Spark
Le bloc de code suivant charge les données du fichier CSV dans un DataFrame Spark et les met en cache pour un traitement efficace.
df = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv("Files/churn/raw/churn.csv")
.cache()
)
Ce code part du principe que le fichier de données a été téléchargé et se trouve dans le chemin d’accès spécifié. Il lit le fichier CSV dans un DataFrame Spark, déduit le schéma et le met en cache pour un accès plus rapide pendant les opérations suivantes.
Préparer les données
Dans cette section, nous allons effectuer le nettoyage des données et l’ingénierie des fonctionnalités sur le jeu de données.
Nettoyer les données
Tout d’abord, nous définissons une fonction pour nettoyer les données, ce qui inclut la suppression de lignes avec des données manquantes, la suppression de lignes dupliquées en fonction de colonnes spécifiques et la suppression de colonnes inutiles.
# Define a function to clean the data
def clean_data(df):
# Drop rows with missing data across all columns
df = df.dropna(how="all")
# Drop duplicate rows based on 'RowNumber' and 'CustomerId'
df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
# Drop columns: 'RowNumber', 'CustomerId', 'Surname'
df = df.drop('RowNumber', 'CustomerId', 'Surname')
return df
# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")
# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)
La fonction clean_data
permet de s’assurer que le jeu de données est exempt de valeurs manquantes et de doublons tout en supprimant les colonnes inutiles.
Ingénierie des caractéristiques
Ensuite, nous effectuons l’ingénierie des caractéristiques en créant des variables fictives pour les colonnes « Geography » et « Gender » à l’aide d’un encodage one-hot.
# Import PySpark functions
from pyspark.sql import functions as F
# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
"*",
F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)
# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")
Ici, nous utilisons un encodage à chaud pour convertir des colonnes catégorielles en colonnes factices binaires, ce qui les rend adaptés aux algorithmes de Machine Learning.
Afficher les données nettoyées
Enfin, nous affichons le jeu de données nettoyé et doté d'ingénierie des fonctionnalités à l'aide de la fonction d'affichage.
display(df_clean)
Cette étape vous permet d’inspecter le DataFrame résultant avec les transformations appliquées.
Enregistrer dans lakehouse
Nus allons maintenant enregistrer le jeu de données nettoyé et conçu pour les fonctionnalités dans le lakehouse.
# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")
Ici, nous prenons le DataFrame PySpark nettoyé et transformé, df_clean
, et nous le sauvegardons en tant que table Delta nommée « churn_data_clean » dans le lakehouse. Nous utilisons le format Delta pour gérer efficacement le contrôle de version et la gestion du jeu de données. La mode("overwrite")
garantit que toute table existante portant le même nom est remplacée et qu’une nouvelle version de la table est créée.
Créer des jeux de données de test et d’entraînement
Ensuite, nous allons créer les jeux de données de test et d'entraînement à partir des données nettoyées avec ingénierie des caractéristiques.
Dans la section de code fournie, nous chargeons un jeu de données nettoyé et conçu pour les fonctionnalités à partir du lakehouse à l’aide du format Delta, nous le divisions en jeux d’apprentissage et de test avec un ratio de 80-20 et nous préparons les données pour le Machine Learning. Cette préparation implique l’importation de VectorAssembler
à partir de PySpark ML pour combiner les colonnes de caractéristiques en une seule colonne « fonctionnalités ». Par la suite, nous utilisons le VectorAssembler
pour transformer les jeux de données d’entraînement et de test, ce qui entraîne train_data
et test_data
DataFrames qui contiennent la variable cible « Exited » et les vecteurs de caractéristiques. Ces jeux de données sont maintenant prêts à être utilisés pour créer et évaluer des modèles Machine Learning.
# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler
# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")
# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)
# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]
# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]
Entraîner le modèle de référence
À l’aide des données caractérisées, nous allons entraîner un modèle de base d'apprentissage automatique, configurer MLflow pour le suivi des expériences, définir une fonction de prédiction pour le calcul des indicateurs, et enfin, afficher et consigner le score AUC ROC résultant.
Définir le niveau de journalisation
Ici, nous configurons le niveau de journalisation pour supprimer les sorties inutiles de la bibliothèque Synapse.ml, en gardant les journaux plus propres.
import logging
logging.getLogger('synapse.ml').setLevel(logging.ERROR)
Configurer MLflow
Dans cette section, nous configurons MLflow pour le suivi des expériences. Nous attribuons le nom « automl_sample » à l’expérience pour organiser les essais. En outre, nous activez la journalisation automatique, en veillant à ce que les paramètres de modèle, les métriques et les artefacts soient automatiquement enregistrés dans MLflow.
import mlflow
# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)
Entraîner et évaluer le modèle
Enfin, nous entraînons un modèle LightGBMClassifier sur les données d’apprentissage fournies. Le modèle est configuré avec les paramètres nécessaires pour la classification binaire et la gestion des déséquilibres. Nous utilisons ensuite ce modèle entraîné pour effectuer des prédictions sur les données de test. Nous extrayons les probabilités prédites pour la classe positive et les étiquettes vraies des données de test. Ensuite, nous calculons le score ROC AUC à l’aide de la fonction roc_auc_score
de sklearn.
from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score
# Assuming you have already defined 'train_data' and 'test_data'
with mlflow.start_run(run_name="default") as run:
# Create a LightGBMClassifier model with specified settings
model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
# Fit the model to the training data
model = model.fit(train_data)
# Get the predictions
predictions = model.transform(test_data)
# Extract the predicted probabilities for the positive class
y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()
# Extract the true labels from the 'test_data' DataFrame
y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()
# Compute the ROC AUC score
roc_auc = roc_auc_score(y_true, y_pred)
# Log the ROC AUC score with MLflow
mlflow.log_metric("ROC_AUC", roc_auc)
# Print or log the ROC AUC score
print("ROC AUC Score:", roc_auc)
À partir de là, nous pouvons voir que notre modèle résultant atteint un score ROC AUC de 84%.
Créer une version d’évaluation AutoML avec FLAML
Dans cette section, nous allons créer une version d’évaluation AutoML à l’aide du package FLAML, configurer les paramètres d’évaluation, convertir le jeu de données Spark en jeu de données Pandas sur Spark, exécuter la version d’évaluation AutoML et afficher les métriques obtenues.
Configurer la version d’évaluation AutoML
Ici, nous importons les classes et modules nécessaires à partir du package FLAML et créons une instance d’AutoML, qui sera utilisée pour automatiser le pipeline Machine Learning.
# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark
# Create an AutoML instance
automl = AutoML()
Configurer les paramètres
Dans cette section, nous définissons les paramètres de configuration de la version d’évaluation AutoML.
# Define AutoML settings
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"log_file_name": 'flaml_experiment.log', # FLAML log file
"seed": 41, # Random seed
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Convertir en Pandas sur Spark
Pour exécuter AutoML avec un jeu de données Basé sur Spark, nous devons le convertir en jeu de données Pandas sur Spark à l’aide de la fonction to_pandas_on_spark
. Cela permet à FLAML d’utiliser efficacement les données.
# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)
Exécuter la version d’évaluation AutoML
À présent, nous exécutons la version d’évaluation AutoML. Nous utilisons une exécution MLflow imbriquée pour suivre l’expérience dans le contexte d’exécution MLflow existant. La version d’évaluation AutoML est effectuée sur le jeu de données Pandas sur Spark (df_automl
) avec la variable cible «Exited
et les paramètres définis sont passés à la fonction fit
pour la configuration.
'''The main flaml automl API'''
with mlflow.start_run(nested=True):
automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)
Afficher les métriques obtenues
Dans cette dernière section, nous récupérons et affichons les résultats de la version d’évaluation AutoML. Ces métriques fournissent des insights sur les performances et la configuration du modèle AutoML sur le jeu de données donné.
# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))
Parallélisez votre version d’évaluation AutoML avec Apache Spark
Dans les scénarios où votre jeu de données peut s’adapter à un seul nœud et que vous souhaitez tirer parti de la puissance de Spark pour exécuter simultanément plusieurs essais AutoML parallèles, vous pouvez suivre ces étapes :
Convertir en DataFrame Pandas
Pour activer la parallélisation, vos données doivent d’abord être converties en dataFrame Pandas.
pandas_df = train_raw.toPandas()
Ici, nous convertissons le train_raw
Spark DataFrame en un DataFrame Pandas nommé pandas_df
pour le rendre adapté au traitement parallèle.
Configurer les paramètres de parallélisation
Définissez use_spark
sur True
pour activer le parallélisme basé sur Spark. Par défaut, FLAML lance un essai par exécuteur. Vous pouvez personnaliser le nombre d’essais simultanés à l’aide de l’argument n_concurrent_trials
.
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"seed": 41, # Random seed
"use_spark": True, # Enable Spark-based parallelism
"n_concurrent_trials": 3, # Number of concurrent trials to run
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Dans ces paramètres, nous spécifions que nous voulons utiliser Spark pour le parallélisme en définissant use_spark
sur True
. Nous avons également défini le nombre d’essais simultanés sur 3, ce qui signifie que trois essais s’exécuteront en parallèle sur Spark.
Pour en savoir plus sur la parallélisation de vos essais AutoML, vous pouvez consulter la documentation FLAML pour les travaux Spark parallèles.
Exécuter la version d’évaluation AutoML en parallèle
À présent, nous allons exécuter la version d’évaluation AutoML en parallèle avec les paramètres spécifiés. Nous allons utiliser une exécution MLflow imbriquée pour suivre l’expérience dans le contexte d’exécution MLflow existant.
'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
automl.fit(dataframe=pandas_df, label='Exited', **settings)
Cette opération exécute maintenant la version d’évaluation AutoML avec la parallélisation activée. L’argument dataframe
est défini sur le dataFrame Pandas pandas_df
, et d’autres paramètres sont passés à la fonction fit
pour l’exécution parallèle.
Afficher les métriques
Après avoir exécuté l’essai AutoML parallèle, récupérez et affichez les résultats, notamment la meilleure configuration des hyperparamètres, l’AUC ROC sur les données de validation et la durée d’apprentissage de l’exécution la plus performante.
''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))