Freigeben über


Erstellen von Modellen mit automatisiertem ML (Vorschau)

Automated Machine Learning (AutoML) umfasst eine Reihe von Techniken und Tools, die entwickelt wurden, um den Schulungs- und Optimierungsprozess von Machine Learning-Modellen mit minimalem menschlichem Eingriff zu optimieren. Das Hauptziel von AutoML ist es, die Auswahl des am besten geeigneten Machine Learning-Modells und Hyperparameter für ein bestimmtes Dataset zu vereinfachen und zu beschleunigen, eine Aufgabe, die in der Regel erhebliche Kenntnisse und Rechenressourcen erfordert. Im Fabric-Framework können Data Scientists das flaml.AutoML Modul nutzen, um verschiedene Aspekte ihrer Machine Learning-Workflows zu automatisieren.

In diesem Artikel befassen wir uns mit dem Prozess der Generierung von AutoML-Testversionen direkt aus Code mithilfe eines Spark-Datasets. Darüber hinaus untersuchen wir Methoden zum Konvertieren dieser Daten in einen Pandas-Datenframe und besprechen Techniken zur Parallelisierung Ihrer Experimentierversuche.

Wichtig

Dieses Feature befindet sich in der Vorschau.

Voraussetzungen

  • Erstellen Sie eine neue Fabric-Umgebung oder stellen Sie sicher, dass Sie unter der Fabric Runtime-1.2 (Spark 3.4 (oder höher) und Delta 2.4) ausführen.
  • Erstellen Sie ein neues Notebook.
  • Fügen Sie Ihr Notizbuch an ein Seehaus an. Wählen Sie auf der linken Seite Ihres Notizbuchs Hinzufügen aus, um ein vorhandenes Seehaus hinzuzufügen oder ein neues zu erstellen.

Laden und Vorbereiten von Daten

In diesem Abschnitt geben wir die Downloadeinstellungen für die Daten an und speichern sie dann im Seehaus.

Herunterladen von Daten

Dieser Codeblock lädt die Daten aus einer Remotequelle herunter und speichert sie im Seehaus.

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Laden von Daten in einen Spark-Datenframe

Der folgende Codeblock lädt die Daten aus der CSV-Datei in einen Spark DataFrame und speichert sie zur effizienten Verarbeitung zwischen.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Dieser Code geht davon aus, dass die Datendatei heruntergeladen wurde und sich im angegebenen Pfad befindet. Sie liest die CSV-Datei in einen Spark DataFrame, leitet das Schema ab und speichert sie für einen schnelleren Zugriff während nachfolgenden Vorgängen zwischen.

Vorbereiten der Daten

In diesem Abschnitt führen wir datenreinigungs- und Feature-Engineering für das Dataset durch.

Bereinigen von Daten

Zunächst definieren wir eine Funktion zum Bereinigen der Daten, die das Ablegen von Zeilen mit fehlenden Daten, das Entfernen doppelter Zeilen basierend auf bestimmten Spalten und das Ablegen unnötiger Spalten umfasst.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

Die funktion clean_data hilft sicherzustellen, dass das Dataset frei von fehlenden Werten und Duplikaten ist, während unnötige Spalten entfernt werden.

Featureentwicklung

Als Nächstes führen wir Feature-Engineering durch, indem wir Dummyspalten für die Spalten "Geografie" und "Geschlecht" mithilfe einer One-Hot-Codierung erstellen.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Hier verwenden wir die einseitige Codierung, um kategorisierte Spalten in binäre Dummyspalten zu konvertieren, sodass sie für Maschinelle Lernalgorithmen geeignet sind.

Anzeigen von bereinigten Daten

Schließlich zeigen wir den bereinigten und merkmaltechnisch bearbeiteten Datensatz mithilfe der Anzeigefunktion an.


display(df_clean)

In diesem Schritt können Sie das resultierende DataFrame mit den angewendeten Transformationen untersuchen.

Speichern im Lakehouse

Jetzt speichern wir den bereinigten und mit Features ausgestatteten Datensatz im Lakehouse.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Hier nehmen wir den gereinigten und transformierten PySpark DataFrame, df_clean, und speichern sie als Delta-Tabelle mit dem Namen "churn_data_clean" im Seehaus. Wir verwenden das Delta-Format für eine effiziente Versionsverwaltung und -verwaltung des Datasets. Die mode("overwrite") stellt sicher, dass eine vorhandene Tabelle mit demselben Namen überschrieben wird und eine neue Version der Tabelle erstellt wird.

Erstellen von Test- und Schulungsdatensätzen

Als Nächstes erstellen wir die Test- und Trainingsdatensätze aus den bereinigten und merkmaloptimierten Daten.

Im bereitgestellten Codeabschnitt laden wir einen bereinigten und merkmalangereicherten Datensatz aus dem Lakehouse mithilfe des Delta-Formats, teilen ihn in Schulungs- und Testsätze mit einem Verhältnis von 80-20 auf und bereiten die Daten für maschinelles Lernen vor. Bei dieser Vorbereitung wird die VectorAssembler aus PySpark ML importiert, um Merkmalsspalten zu einer einzelnen 'Features'-Spalte zusammenzufassen. Anschließend verwenden wir die VectorAssembler, um die Schulungs- und Testdatensätze zu transformieren, was zu train_data und test_data DataFrames führt, die die Zielvariable "Exited" und die Featurevektoren enthalten. Diese Datasets sind jetzt für die Erstellung und Auswertung von Machine Learning-Modellen bereit.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Trainieren des Basismodells

Mithilfe der faturisierten Daten trainieren wir ein grundlegendes Machine Learning-Modell, konfigurieren MLflow für die Experimentnachverfolgung, definieren eine Vorhersagefunktion für die Metrikberechnung und schließlich anzeigen und protokollieren die resultierende ROC AUC-Bewertung.

Festlegen der Protokollierungsebene

Hier konfigurieren wir die Protokollierungsebene, um unnötige Ausgaben aus der Synapse.ml-Bibliothek zu unterdrücken und die Protokolle sauberer zu halten.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Konfigurieren von MLflow

In diesem Abschnitt konfigurieren wir MLflow für die Nachverfolgung von Experimenten. Wir legen den Experimentnamen auf "automl_sample" fest, um die Durchläufe zu organisieren. Darüber hinaus aktivieren wir die automatische Protokollierung, um sicherzustellen, dass Modellparameter, Metriken und Artefakte automatisch bei MLflow protokolliert werden.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Trainieren und Bewerten des Modells

Schließlich trainieren wir ein LightGBMClassifier-Modell auf den bereitgestellten Schulungsdaten. Das Modell wird mit den erforderlichen Einstellungen für die binäre Klassifizierung und die Ungleichgewichtbehandlung konfiguriert. Anschließend verwenden wir dieses trainierte Modell, um Vorhersagen für die Testdaten zu erstellen. Wir extrahieren die vorhergesagten Wahrscheinlichkeiten für die positive Klasse und die tatsächlichen Beschriftungen aus den Testdaten. Anschließend berechnen wir die ROC AUC-Bewertung mithilfe der roc_auc_score-Funktion von Sklearn.

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Von hier aus können wir sehen, dass unser resultierendes Modell einen ROC AUC-Wert von 84%erreicht.

Erstellen einer AutoML-Testversion mit FLAML

In diesem Abschnitt erstellen wir eine AutoML-Testversion mithilfe des FLAML-Pakets, konfigurieren die Testeinstellungen, konvertieren das Spark-Dataset in ein Pandas auf Spark-Dataset, führen die AutoML-Testversion aus und zeigen die resultierenden Metriken an.

Konfigurieren der AutoML-Testversion

Hier importieren wir die erforderlichen Klassen und Module aus dem FLAML-Paket und erstellen eine Instanz von AutoML, die zum Automatisieren der Machine Learning-Pipeline verwendet wird.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Konfigurieren von Einstellungen

In diesem Abschnitt definieren wir die Konfigurationseinstellungen für die AutoML-Testversion.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Zu Pandas auf Spark konvertieren

Um AutoML mit einem Spark-basierten Dataset auszuführen, müssen wir es mithilfe der to_pandas_on_spark-Funktion in ein Pandas-Dataset auf Spark umwandeln. Dadurch kann FLAML effizient mit den Daten arbeiten.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Ausführen der AutoML-Testversion

Jetzt führen wir die AutoML-Testversion aus. Wir verwenden einen geschachtelten MLflow-Lauf, um das Experiment im vorhandenen MLflow-Ausführungskontext nachzuverfolgen. Die AutoML-Testversion wird für die Pandas auf Spark-Dataset (df_automl) mit der Zielvariable "Exited ausgeführt, und die definierten Einstellungen werden zur Konfiguration an die fit-Funktion übergeben.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Anzeigen der resultierenden Metriken

In diesem letzten Abschnitt werden die Ergebnisse der AutoML-Testversion abgerufen und angezeigt. Diese Metriken liefern Einblicke in die Leistung und Konfiguration des AutoML-Modells für das angegebene Dataset.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Parallelisieren Ihrer AutoML-Testversion mit Apache Spark

In Szenarien, in denen Ihr Dataset in einen einzelnen Knoten passen kann und Sie die Leistungsfähigkeit von Spark für die gleichzeitige Ausführung mehrerer paralleler AutoML-Testversionen nutzen möchten, können Sie die folgenden Schritte ausführen:

In Pandas-DataFrame konvertieren

Um die Parallelisierung zu ermöglichen, müssen Ihre Daten zuerst in einen Pandas DataFrame konvertiert werden.

pandas_df = train_raw.toPandas()

Hier konvertieren wir den train_raw Spark DataFrame in einen Pandas DataFrame namens pandas_df, um ihn für die parallele Verarbeitung geeignet zu machen.

Konfigurieren von Parallelisierungseinstellungen

Legen Sie use_spark auf True fest, um Spark-basierte Parallelität zu aktivieren. Standardmäßig startet FLAML eine Testversion pro Executor. Sie können die Anzahl der gleichzeitigen Testversionen mithilfe des arguments n_concurrent_trials anpassen.

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

In diesen Einstellungen geben wir an, dass wir Spark für Parallelität verwenden möchten, indem wir use_spark auf Truefestlegen. Wir legen auch die Anzahl der gleichzeitigen Versuche auf 3 fest, was bedeutet, dass drei Testversionen parallel auf Spark ausgeführt werden.

Weitere Informationen zum Parallelisieren Ihrer AutoML-Trails finden Sie in der FLAML-Dokumentation für parallele Spark-Aufträge.

Ausführen der AutoML-Testversion parallel

Jetzt führen wir die AutoML-Testversion parallel zu den angegebenen Einstellungen aus. Wir verwenden einen geschachtelten MLflow-Lauf, um das Experiment im vorhandenen MLflow-Ausführungskontext nachzuverfolgen.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Dadurch wird nun die AutoML-Testversion mit aktivierter Parallelisierung ausgeführt. Das dataframe-Argument wird auf das Pandas DataFrame-pandas_dffestgelegt, und andere Einstellungen werden zur parallelen Ausführung an die fit-Funktion übergeben.

Anzeigen von Metriken

Nach dem Ausführen der parallelen Testversion für automatisiertes maschinelles Lernen können Sie die Ergebnisse abrufen und anzeigen, einschließlich der besten Hyperparameter-Konfiguration, der ROC-AUC für die Validierungsdaten und der Trainingsdauer der Ausführung mit der besten Leistung.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))