Δημιουργία μοντέλων με αυτοματοποιημένη εκμάθηση μηχανής (προεπισκόπηση)
Η αυτοματοποιημένη Εκμάθηση μηχανής (AutoML) περιλαμβάνει ένα σύνολο τεχνικών και εργαλείων που έχουν σχεδιαστεί για την απλοποίηση της διαδικασίας εκπαίδευσης και βελτιστοποίησης μοντέλων εκμάθησης μηχανής με ελάχιστη ανθρώπινη παρέμβαση. Ο κύριος στόχος του AutoML είναι η απλοποίηση και η επιτάχυνση της επιλογής του καταλληλότερου μοντέλου εκμάθησης μηχανής και των υπερπαραμετών για ένα συγκεκριμένο σύνολο δεδομένων, μια εργασία που συνήθως απαιτεί σημαντική εξειδίκευση και υπολογιστικούς πόρους. Στο πλαίσιο Fabric, οι επιστήμονες δεδομένων μπορούν να αξιοποιήσουν την flaml.AutoML
εκπαιδευτική ενότητα για να αυτοματοποιήσουν διάφορες πτυχές των ροών εργασιών εκμάθησης μηχανής.
Σε αυτό το άρθρο, θα εμβαθύνουμε στη διαδικασία δημιουργίας δοκιμών AutoML απευθείας από κώδικα χρησιμοποιώντας ένα σύνολο δεδομένων Spark. Επιπλέον, θα εξερευνήσουμε μεθόδους για τη μετατροπή αυτών των δεδομένων σε ένα πλαίσιο δεδομένων Pandas και θα συζητήσουμε τεχνικές για την παράλληλη δοκιμή πειραματισμού.
Σημαντικό
Αυτή η δυνατότητα είναι σε προεπισκόπηση.
Προαπαιτούμενα στοιχεία
Λάβετε μια συνδρομή Microsoft Fabric. Εναλλακτικά, εγγραφείτε για μια δωρεάν δοκιμαστική έκδοση του Microsoft Fabric.
Εισέλθετε στο Microsoft Fabric.
Χρησιμοποιήστε την εναλλαγή εμπειρίας στην αριστερή πλευρά της αρχικής σελίδας σας για να μεταβείτε στην εμπειρία Synapse Data Science.
- Δημιουργήστε ένα νέο περιβάλλον Fabric ή βεβαιωθείτε ότι εκτελείτε στο Fabric Runtime 1.2 (Spark 3.4 (ή νεότερη έκδοση) και Delta 2.4)
- Δημιουργήστε ένα νέο σημειωματάριο.
- Επισυνάψτε το σημειωματάριό σας σε ένα lakehouse. Στην αριστερή πλευρά του σημειωματάριού σας, επιλέξτε Προσθήκη για να προσθέσετε μια υπάρχουσα λίμνη ή να δημιουργήσετε μια νέα.
Φόρτωση και προετοιμασία δεδομένων
Σε αυτή την ενότητα, θα καθορίσουμε τις ρυθμίσεις λήψης για τα δεδομένα και, στη συνέχεια, θα τα αποθηκεύσουμε στο lakehouse.
Λήψη δεδομένων
Αυτό το μπλοκ κώδικα κάνει λήψη των δεδομένων από μια απομακρυσμένη προέλευση και τα αποθηκεύει στο lakehouse
import os
import requests
IS_CUSTOM_DATA = False # if TRUE, dataset has to be uploaded manually
if not IS_CUSTOM_DATA:
# Specify the remote URL where the data is hosted
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
# List of data files to download
file_list = ["churn.csv"]
# Define the download path within the lakehouse
download_path = "/lakehouse/default/Files/churn/raw"
# Check if the lakehouse directory exists; if not, raise an error
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
# Create the download directory if it doesn't exist
os.makedirs(download_path, exist_ok=True)
# Download each data file if it doesn't already exist in the lakehouse
for fname in file_list:
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
Φόρτωση δεδομένων σε ένα πλαίσιο δεδομένων Spark
Το ακόλουθο μπλοκ κώδικα φορτώνει τα δεδομένα από το αρχείο CSV σε ένα Spark DataFrame και τα αποθηκεύει στο cache για αποτελεσματική επεξεργασία.
df = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv("Files/churn/raw/churn.csv")
.cache()
)
Αυτός ο κώδικας προϋποθέτει ότι το αρχείο δεδομένων έχει ληφθεί και βρίσκεται στην καθορισμένη διαδρομή. Διαβάζει το αρχείο CSV σε ένα Spark DataFrame, το συνάγει το σχήμα και το αποθηκεύει στο cache για ταχύτερη πρόσβαση κατά τη διάρκεια των επόμενων λειτουργιών.
Προετοιμασία των δεδομένων
Σε αυτή την ενότητα, θα εκτελέσουμε εκκαθάριση δεδομένων και διαχείριση δυνατοτήτων στο σύνολο δεδομένων.
Καθαρισμός δεδομένων
Πρώτα, ορίζουμε μια συνάρτηση για την εκκαθάριση των δεδομένων, η οποία περιλαμβάνει την απόρριψη γραμμών με δεδομένα που απουσιάζουν, την κατάργηση διπλότυπων γραμμών με βάση συγκεκριμένες στήλες και την κατάργηση περιττών στηλών.
# Define a function to clean the data
def clean_data(df):
# Drop rows with missing data across all columns
df = df.dropna(how="all")
# Drop duplicate rows based on 'RowNumber' and 'CustomerId'
df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
# Drop columns: 'RowNumber', 'CustomerId', 'Surname'
df = df.drop('RowNumber', 'CustomerId', 'Surname')
return df
# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")
# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)
Η clean_data
συνάρτηση βοηθά να εξασφαλίσετε ότι το σύνολο δεδομένων δεν περιέχει τιμές και διπλότυπα, καταργώντας παράλληλα τις περιττές στήλες.
Μηχανική δυνατοτήτων
Στη συνέχεια, εκτελέσαμε μηχανική δυνατοτήτων δημιουργώντας εικονικές στήλες για τις στήλες "Γεωγραφία" και "Φύλο", χρησιμοποιώντας κωδικοποίηση μίας ώρας.
# Import PySpark functions
from pyspark.sql import functions as F
# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
"*",
F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)
# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")
Εδώ, χρησιμοποιούμε κωδικοποίηση μίας ώρας για τη μετατροπή κατηγορικών στηλών σε δυαδικές εικονικές στήλες, καθιστώντας τις κατάλληλες για αλγόριθμους εκμάθησης μηχανής.
Εμφάνιση καθαρισμένου δεδομένων
Τέλος, εμφανίζουμε το καθαρισμένο και σχεδιασμένο σύνολο δεδομένων με τη χρήση της συνάρτησης εμφάνισης.
display(df_clean)
Αυτό το βήμα σάς επιτρέπει να ελέγξετε το dataFrame που προκύπτει με τους εφαρμοσμένους μετασχηματισμούς.
Αποθήκευση στο lakehouse
Τώρα, θα αποθηκεύσουμε το καθαρισμένο και σχεδιασμένο σύνολο δεδομένων στο lakehouse.
# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")
Εδώ, παίρνουμε το καθαρισμένο και μετασχηματισμένο PySpark DataFrame df_clean
και το αποθηκεύουμε ως πίνακα Delta με το όνομα "churn_data_clean" στο lakehouse. Χρησιμοποιούμε τη μορφή Delta για αποτελεσματική διαχείριση και διαχείριση του συνόλου δεδομένων. Το mode("overwrite")
εξασφαλίζει ότι οποιοσδήποτε υπάρχων πίνακας με το ίδιο όνομα αντικαθίσταται και δημιουργείται μια νέα έκδοση του πίνακα.
Δημιουργία συνόλων δεδομένων δοκιμής και εκπαίδευσης
Στη συνέχεια, θα δημιουργήσουμε τα σύνολα δεδομένων δοκιμής και εκπαίδευσης από τα δεδομένα που έχουν καθαριστεί και σχεδιαστεί με δυνατότητα.
Στην παρεχόμενη ενότητα κώδικα, φορτώνουμε ένα καθαρισμένο και σχεδιασμένο σύνολο δεδομένων από το lakehouse χρησιμοποιώντας τη μορφή Delta, το διαιρούμε σε σύνολα εκπαίδευσης και δοκιμών με αναλογία 80-20 και προετοιμάζουμε τα δεδομένα για εκμάθηση μηχανής. Αυτή η προετοιμασία περιλαμβάνει την εισαγωγή της από την VectorAssembler
εκμάθηση μηχανής PySpark για τον συνδυασμό στηλών δυνατοτήτων σε μία μόνο στήλη "δυνατοτήτων". Στη συνέχεια, χρησιμοποιούμε την VectorAssembler
για να μετασχηματίσουμε τα σύνολα δεδομένων εκπαίδευσης και δοκιμής, με train_data
test_data
αποτέλεσμα τα DataFrames που περιέχουν τη μεταβλητή προορισμού "Exited" και τα διανύσματα δυνατοτήτων. Αυτά τα σύνολα δεδομένων είναι τώρα έτοιμα για χρήση στη δημιουργία και αξιολόγηση μοντέλων εκμάθησης μηχανής.
# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler
# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")
# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)
# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]
# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]
Μοντέλο γραμμής βάσης εκπαίδευσης
Χρησιμοποιώντας τα δεδομένα με δυνατότητα, θα εκπαιδεύσουμε ένα μοντέλο εκμάθησης μηχανής γραμμής βάσης, θα ρυθμίσουμε τις παραμέτρους MLflow για παρακολούθηση πειραμάτων, θα ορίσουμε μια συνάρτηση πρόβλεψης για υπολογισμό μετρικών και, τέλος, θα προβάλουμε και θα καταγράψουμε τη βαθμολογία ROC AUC που προκύπτει.
Ορισμός επιπέδου καταγραφής
Εδώ, διαμορφώνουμε τις παραμέτρους του επιπέδου καταγραφής για να καταστείλουν τα περιττά δεδομένα εξόδου από τη βιβλιοθήκη Synapse.ml, διατηρώντας τα αρχεία καταγραφής καθαρότερα.
import logging
logging.getLogger('synapse.ml').setLevel(logging.ERROR)
Ρύθμιση παραμέτρων ροής ML
Σε αυτή την ενότητα, θα ρυθμίσουμε τις παραμέτρους του MLflow για παρακολούθηση πειραμάτων. Ορίζουμε το όνομα του πειράματος σε "automl_sample" για να οργανώσουμε τις εκτελέσεις. Επιπλέον, ενεργοποιούμε την αυτόματη καταγραφή, εξασφαλίζοντας ότι οι παράμετροι μοντέλου, τα μετρικά και τα αντικείμενα σχεδίασης καταγράφονται αυτόματα στο MLflow.
import mlflow
# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)
Εκπαίδευση και αξιολόγηση του μοντέλου
Τέλος, εκπαιδεύουμε ένα μοντέλο LightGBMClassifier στα παρεχόμενα δεδομένα εκπαίδευσης. Το μοντέλο ρυθμίζεται με τις απαραίτητες ρυθμίσεις για δυαδική ταξινόμηση και χειρισμό ανισορροπιών. Στη συνέχεια, χρησιμοποιούμε αυτό το εκπαιδευμένο μοντέλο για προβλέψεις σχετικά με τα δεδομένα δοκιμής. Εξάγουμε τις προβλεπόμενες πιθανότητες για τη θετική κλάση και τις ετικέτες true από τα δεδομένα δοκιμής. Στη συνέχεια, υπολογίζουμε τη βαθμολογία ROC AUC χρησιμοποιώντας τη συνάρτηση sklearn roc_auc_score
.
from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score
# Assuming you have already defined 'train_data' and 'test_data'
with mlflow.start_run(run_name="default") as run:
# Create a LightGBMClassifier model with specified settings
model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
# Fit the model to the training data
model = model.fit(train_data)
# Get the predictions
predictions = model.transform(test_data)
# Extract the predicted probabilities for the positive class
y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()
# Extract the true labels from the 'test_data' DataFrame
y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()
# Compute the ROC AUC score
roc_auc = roc_auc_score(y_true, y_pred)
# Log the ROC AUC score with MLflow
mlflow.log_metric("ROC_AUC", roc_auc)
# Print or log the ROC AUC score
print("ROC AUC Score:", roc_auc)
Από εδώ, μπορούμε να δούμε ότι το μοντέλο που προκύπτει επιτυγχάνει βαθμολογία ROC AUC 84%.
Δημιουργία δοκιμαστικής έκδοσης AutoML με ΤΟ FLAML
Σε αυτή την ενότητα, θα δημιουργήσουμε μια δοκιμαστική έκδοση AutoML χρησιμοποιώντας το πακέτο FLAML, θα ρυθμίσουμε τις παραμέτρους της δοκιμαστικής έκδοσης, θα μετατρέψουμε το σύνολο δεδομένων Spark σε ένα σύνολο δεδομένων Pandas στο Spark, θα εκτελέσουμε τη δοκιμαστική έκδοση AutoML και θα προβάλουμε τα μετρικά που προκύπτουν.
Ρύθμιση παραμέτρων της δοκιμαστικής έκδοσης AutoML
Εδώ, εισάγουμε τις απαραίτητες κλάσεις και εκπαιδευτικές ενότητες από το πακέτο FLAML και δημιουργούμε μια παρουσία του AutoML, η οποία θα χρησιμοποιηθεί για την αυτοματοποίηση της διοχέτευσης εκμάθησης μηχανής.
# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark
# Create an AutoML instance
automl = AutoML()
Καθορισμός ρυθμίσεων
Σε αυτή την ενότητα, ορίζουμε τις ρυθμίσεις παραμέτρων για τη δοκιμαστική έκδοση AutoML.
# Define AutoML settings
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"log_file_name": 'flaml_experiment.log', # FLAML log file
"seed": 41, # Random seed
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Μετατροπή σε Pandas on Spark
Για να εκτελέσουμε το AutoML με ένα σύνολο δεδομένων που βασίζεται σε Spark, πρέπει να το μετατρέψουμε σε ένα σύνολο δεδομένων Pandas on Spark χρησιμοποιώντας τη to_pandas_on_spark
συνάρτηση . Αυτό επιτρέπει στο FLAML να λειτουργεί αποτελεσματικά με τα δεδομένα.
# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)
Εκτέλεση της δοκιμαστικής έκδοσης AutoML
Τώρα, εκτελούμε τη δοκιμαστική έκδοση AutoML. Χρησιμοποιούμε μια ένθεση εκτέλεση MLflow για την παρακολούθηση του πειράματος εντός του υπάρχοντος περιβάλλοντος εκτέλεσης MLflow. Η δοκιμαστική έκδοση AutoML εκτελείται στο σύνολο δεδομένων Pandas στο Spark (df_automl
) με τη μεταβλητή προορισμού "Exited
και οι καθορισμένες ρυθμίσεις μεταβιβάζονται στη fit
συνάρτηση για ρύθμιση παραμέτρων.
'''The main flaml automl API'''
with mlflow.start_run(nested=True):
automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)
Προβολή μετρικών που προκύπτουν
Σε αυτή την τελική ενότητα, ανακτάμε και εμφανίζουμε τα αποτελέσματα της δοκιμαστικής έκδοσης AutoML. Αυτά τα μετρικά παρέχουν πληροφορίες σχετικά με τις επιδόσεις και τη ρύθμιση παραμέτρων του μοντέλου AutoML στο δεδομένο σύνολο δεδομένων.
# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))
Παραλληλοποίηση της δοκιμαστικής έκδοσης AutoML με το Apache Spark
Σε σενάρια όπου το σύνολο δεδομένων σας μπορεί να χωράει σε έναν μοναδικό κόμβο και θέλετε να αξιοποιήσετε την ισχύ του Spark για την ταυτόχρονη εκτέλεση πολλών παράλληλων δοκιμών AutoML, μπορείτε να ακολουθήσετε τα εξής βήματα:
Μετατροπή σε πλαίσιο δεδομένων Pandas
Για να ενεργοποιήσετε την παραλληλοποίηση, τα δεδομένα σας πρέπει πρώτα να μετατραπούν σε Pandas DataFrame.
pandas_df = train_raw.toPandas()
Εδώ, μετατρέπουμε το train_raw
Spark DataFrame σε ένα Pandas DataFrame με την ονομασία pandas_df
, ώστε να είναι κατάλληλο για παράλληλη επεξεργασία.
Ρύθμιση παραμέτρων παραλληλισμού
Ορίστε use_spark
την σε True
για να ενεργοποιήσετε παραλληλισμό που βασίζεται σε Spark. Από προεπιλογή, το FLAML θα εκκινήσει μία δοκιμαστική έκδοση ανά εκτελέσιμο εκτέλεση. Μπορείτε να προσαρμόσετε τον αριθμό των ταυτόχρονων δοκιμών χρησιμοποιώντας το n_concurrent_trials
όρισμα.
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"seed": 41, # Random seed
"use_spark": True, # Enable Spark-based parallelism
"n_concurrent_trials": 3, # Number of concurrent trials to run
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Σε αυτές τις ρυθμίσεις, καθορίσαμε ότι θέλουμε να χρησιμοποιήσουμε το Spark για παραλληλισμούς, ορίζοντας την σε use_spark
True
. Επίσης, ορίζουμε τον αριθμό των ταυτόχρονων δοκιμών σε 3, πράγμα που σημαίνει ότι τρεις δοκιμές θα εκτελούνται παράλληλα στο Spark.
Για να μάθετε περισσότερα σχετικά με τον τρόπο παραλληλισμού των διαδρομών σας AutoML, μπορείτε να επισκεφθείτε την τεκμηρίωση flaml για παράλληλες εργασίες Spark.
Παράλληλη εκτέλεση της δοκιμαστικής έκδοσης AutoML
Τώρα, θα εκτελέσουμε τη δοκιμαστική έκδοση AutoML παράλληλα με τις καθορισμένες ρυθμίσεις. Θα χρησιμοποιήσουμε μια ένθεση εκτέλεση MLflow για την παρακολούθηση του πειράματος εντός του υπάρχοντος περιβάλλοντος εκτέλεσης MLflow.
'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
automl.fit(dataframe=pandas_df, label='Exited', **settings)
Αυτό θα εκτελέσει τώρα τη δοκιμαστική έκδοση AutoML με ενεργοποιημένη την παραλληλοποίηση. Το dataframe
όρισμα ορίζεται σε Pandas DataFrame pandas_df
και άλλες ρυθμίσεις μεταβιβάζονται στη fit
συνάρτηση για παράλληλη εκτέλεση.
Προβολή μετρικών
Μετά την εκτέλεση της παράλληλης δοκιμαστικής έκδοσης AutoML, ανακτήστε και εμφανίστε τα αποτελέσματα, συμπεριλαμβανομένης της καλύτερης ρύθμισης παραμέτρων υπερπαραμετέρου, του ROC AUC στα δεδομένα επικύρωσης και της διάρκειας εκπαίδευσης της εκτέλεσης με τις καλύτερες επιδόσεις.
''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))