Delen via


Modellen maken met Geautomatiseerde ML (preview)

Geautomatiseerde Machine Learning (AutoML) omvat een set technieken en hulpprogramma's die zijn ontworpen om het proces van training te stroomlijnen en machine learning-modellen te optimaliseren met minimale menselijke tussenkomst. Het primaire doel van AutoML is het vereenvoudigen en versnellen van de selectie van het meest geschikte machine learning-model en hyperparameters voor een bepaalde gegevensset, een taak die doorgaans veel expertise en rekenbronnen vereist. In het Fabric-framework kunnen gegevenswetenschappers gebruikmaken van de flaml.AutoML-module om verschillende aspecten van hun machine learning-werkstromen te automatiseren.

In dit artikel gaan we dieper in op het proces voor het genereren van AutoML-proefversies rechtstreeks vanuit code met behulp van een Spark-gegevensset. Daarnaast verkennen we methoden voor het converteren van deze gegevens naar een Pandas-dataframe en bespreken we technieken voor het parallelliseren van uw experimenten.

Belangrijk

Deze functie bevindt zich in preview-versie.

Voorwaarden

  • Maak een nieuwe Fabric-omgeving of zorg ervoor dat u werkt met Fabric Runtime 1.2 (Spark 3.4 (of hoger) en Delta 2.4)
  • Maak een nieuw notitieblok.
  • Koppel uw notitieblok aan een lakehouse. Selecteer aan de linkerkant van het notitieblok Voeg toe om een bestaand lakehouse toe te voegen of maak een nieuw lakehouse.

Gegevens laden en voorbereiden

In deze sectie geven we de downloadinstellingen voor de gegevens op en slaan deze vervolgens op in het lakehouse.

Gegevens downloaden

Met dit codeblok worden de gegevens uit een externe bron gedownload en opgeslagen in het lakehouse

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Gegevens laden in een Spark-dataframe

Met het volgende codeblok worden de gegevens uit het CSV-bestand in een Spark DataFrame geladen en in de cache opgeslagen voor efficiënte verwerking.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Bij deze code wordt ervan uitgegaan dat het gegevensbestand is gedownload en zich in het opgegeven pad bevindt. Het CSV-bestand wordt in een Spark DataFrame gelezen, het schema afgeleid en in de cache opgeslagen voor snellere toegang tijdens volgende bewerkingen.

De gegevens voorbereiden

In deze sectie voeren we gegevens opschonen en functie-engineering uit op de gegevensset.

Gegevens opschonen

Eerst definiëren we een functie voor het opschonen van de gegevens, waaronder het verwijderen van rijen met ontbrekende gegevens, het verwijderen van dubbele rijen op basis van specifieke kolommen en het verwijderen van overbodige kolommen.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

De clean_data-functie zorgt ervoor dat de gegevensset vrij is van ontbrekende waarden en duplicaten terwijl onnodige kolommen worden verwijderd.

Kenmerkontwikkeling

Vervolgens voeren we functie-engineering uit door dummykolommen te maken voor de kolommen Geografie en Geslacht met behulp van one-hot codering.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Hier gebruiken we one-hot codering om categorische kolommen te converteren naar binaire dummykolommen, zodat ze geschikt zijn voor machine learning-algoritmen.

Opgeschoonde gegevens weergeven

Ten slotte geven we de opgeschoonde gegevensset, voorzien van kenmerken, weer met behulp van de display-functie.


display(df_clean)

Met deze stap kunt u het resulterende DataFrame inspecteren met de toegepaste transformaties.

Opslaan in Lakehouse

Nu slaan we de opgeschoonde en met kenmerken verrijkte gegevensset op in het lakehouse.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Hier nemen we het opgeschoonde en getransformeerde PySpark DataFrame, df_clean, en slaan we het op als een Delta-tabel met de naam 'churn_data_clean' in het lakehouse. We gebruiken de Delta-indeling voor efficiënte versiebeheer en het beheer van de gegevensset. De mode("overwrite") zorgt ervoor dat een bestaande tabel met dezelfde naam wordt overschreven en dat er een nieuwe versie van de tabel wordt gemaakt.

Test- en trainingsgegevenssets maken

Vervolgens maken we de test- en trainingsgegevenssets op basis van de opgeschoonde en functie-engineerende gegevens.

In de opgegeven codesectie laden we een opgeschoonde en functies geëngineerd gegevensset uit de lakehouse met behulp van de Delta-indeling. We splitsen deze in trainings- en testsets met een verhouding van 80-20 en bereiden de gegevens voor op machine learning. Deze voorbereiding omvat het importeren van de VectorAssembler uit PySpark ML om functiekolommen te combineren in één kolom met functies. Vervolgens gebruiken we de VectorAssembler om de trainings- en testgegevenssets te transformeren, wat resulteert in train_data en test_data DataFrames die de doelvariabele 'Afgesloten' en de functievectoren bevatten. Deze gegevenssets zijn nu klaar voor gebruik bij het bouwen en evalueren van machine learning-modellen.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Basislijnmodel trainen

Met behulp van de gemetriseerde gegevens trainen we een machine learning-basislijnmodel, configureren we MLflow voor het bijhouden van experimenten, definiëren we een voorspellingsfunctie voor metrische berekeningen en ten slotte bekijken en registreren we de resulterende ROC AUC-score.

Logniveau instellen

Hier configureren we het logboekregistratieniveau om onnodige uitvoer van de Synapse.ml-bibliotheek te onderdrukken, waardoor de logboeken overzichtelijker blijven.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

MLflow configureren

In deze sectie configureren we MLflow voor het bijhouden van experimenten. We stellen de naam van het experiment in op 'automl_sample' om de uitvoeringen te organiseren. Daarnaast schakelen we automatische logboekregistratie in, zodat modelparameters, metrische gegevens en artefacten automatisch worden geregistreerd bij MLflow.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Het model trainen en evalueren

Ten slotte trainen we een LightGBMClassifier-model op de opgegeven trainingsgegevens. Het model wordt geconfigureerd met de benodigde instellingen voor binaire classificatie en onevenwichtige verwerking. Vervolgens gebruiken we dit getrainde model om voorspellingen te doen over de testgegevens. We halen de voorspelde waarschijnlijkheden voor de positieve klasse en de werkelijke labels uit de testgegevens op. Daarna berekenen we de ROC AUC-score met behulp van de roc_auc_score functie van sklearn.

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Van hieruit kunnen we zien dat ons resulterende model een ROC AUC-score van 84%bereikt.

Een AutoML-proefversie maken met FLAML

In deze sectie maken we een AutoML-proefversie met behulp van het FLAML-pakket, configureert u de proefinstellingen, converteert u de Spark-gegevensset naar een Pandas op Spark-gegevensset, voert u de AutoML-proefversie uit en bekijkt u de resulterende metrische gegevens.

De AutoML-proefversie configureren

Hier importeren we de benodigde klassen en modules uit het FLAML-pakket en maken we een exemplaar van AutoML, dat wordt gebruikt om de machine learning-pijplijn te automatiseren.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Instellingen configureren

In deze sectie definiëren we de configuratie-instellingen voor de AutoML-proefversie.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Converteren naar Pandas in Spark

Als u AutoML wilt uitvoeren met een Spark-gegevensset, moet u deze converteren naar een Pandas-on-Spark-gegevensset met behulp van de functie to_pandas_on_spark. Hierdoor kan FLAML efficiënt met de gegevens werken.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

De AutoML-proefversie uitvoeren

Nu voeren we de AutoML-proefversie uit. We gebruiken een geneste MLflow-run om het experiment te volgen binnen de bestaande MLflow-uitvoeringscontext. De AutoML-proefversie wordt uitgevoerd op de Pandas in Spark-gegevensset (df_automl) met de doelvariabeleExited en de gedefinieerde instellingen worden doorgegeven aan de fit-functie voor configuratie.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Resulterende metrische gegevens weergeven

In deze laatste sectie worden de resultaten van de AutoML-proefversie opgehaald en weergegeven. Deze metrische gegevens bieden inzicht in de prestaties en configuratie van het AutoML-model op de opgegeven gegevensset.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Uw AutoML-proefversie parallelliseren met Apache Spark

In scenario's waarin uw gegevensset in één knooppunt kan passen en u gebruik wilt maken van de kracht van Spark voor het gelijktijdig uitvoeren van meerdere parallelle AutoML-proefversies, kunt u de volgende stappen uitvoeren:

Converteren naar Pandas-dataframe

Als u parallellisatie wilt inschakelen, moeten uw gegevens eerst worden geconverteerd naar een Pandas DataFrame.

pandas_df = train_raw.toPandas()

Hier converteren we het train_raw Spark DataFrame naar een Pandas DataFrame met de naam pandas_df om het geschikt te maken voor parallelle verwerking.

Instellingen voor parallelle uitvoering configureren

Stel use_spark in op True om parallellisme op basis van Spark in te schakelen. FLAML start standaard één proefversie per uitvoerder. U kunt het aantal gelijktijdige proefversies aanpassen met behulp van het argument n_concurrent_trials.

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

In deze instellingen geven we op dat we Spark willen gebruiken voor parallelle uitvoering door use_spark in te stellen op True. We stellen ook het aantal gelijktijdige proefversies in op 3, wat betekent dat er drie proefversies parallel op Spark worden uitgevoerd.

Zie de FLAML-documentatie voor parallelle Spark-takenvoor meer informatie over het parallelliseren van uw AutoML-paden.

De AutoML-proefversie parallel uitvoeren

Nu voeren we de AutoML-proefversie parallel uit met de opgegeven instellingen. We gebruiken een geneste MLflow-uitvoering om het experiment bij te houden binnen de bestaande MLflow-uitvoeringscontext.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Hiermee wordt nu de AutoML-proefversie uitgevoerd waarvoor parallellisatie is ingeschakeld. Het argument dataframe is ingesteld op het Pandas DataFrame pandas_dfen andere instellingen worden doorgegeven aan de fit-functie voor parallelle uitvoering.

Metrische gegevens weergeven

Nadat u de parallelle AutoML-proefversie hebt uitgevoerd, haalt u de resultaten op en geeft u deze weer, inclusief de beste hyperparameterconfiguratie, ROC AUC op de validatiegegevens en de trainingsduur van de best presterende uitvoering.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))