Del via


Opret modeller med automatiseret maskinel indlæring (prøveversion)

AutoML (Automated Machine Learning) omfatter et sæt teknikker og værktøjer, der er designet til at strømline oplæringsprocessen og optimere modeller til maskinel indlæring med minimal menneskelig indgriben. Det primære mål med AutoML er at forenkle og fremskynde udvælgelsen af den mest egnede model til maskinel indlæring og hyperparametre for et givet datasæt, en opgave, der typisk kræver betydelig ekspertise og beregningsressourcer. Inden for Fabric-strukturen kan dataeksperter udnytte flaml.AutoML-modulet til at automatisere forskellige aspekter af deres arbejdsprocesser til maskinel indlæring.

I denne artikel dykker vi ned i processen med at generere AutoML-prøveversioner direkte fra kode ved hjælp af et Spark-datasæt. Derudover vil vi udforske metoder til konvertering af disse data til en Pandas-dataramme og diskutere teknikker til parallelisering af dine eksperimenter.

Vigtig

Denne funktion er i prøveversion.

Forudsætninger

  • Opret et nyt Fabric-miljø, eller sørg for, at du kører på Fabric Runtime 1.2 (Spark 3.4 (eller nyere) og Delta 2.4)
  • Opret en ny notesbog.
  • Vedhæft din notesbog til et lakehouse. I venstre side af notesbogen skal du vælge Tilføj for at tilføje et eksisterende lakehouse eller oprette et nyt.

Indlæs og forbered data

I dette afsnit angiver vi downloadindstillingerne for dataene og gemmer dem derefter i lakehouse.

Download data

Denne kodeblok downloader dataene fra en ekstern kilde og gemmer dem i lakehouse

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Indlæs data i en Spark-dataramme

Følgende kodeblok indlæser dataene fra CSV-filen i en Spark DataFrame og cachelagrer dem til effektiv behandling.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Denne kode forudsætter, at datafilen er blevet downloadet og er placeret i den angivne sti. Den læser CSV-filen i en Spark DataFrame, udleder skemaet og cachelagrer den for at få hurtigere adgang under efterfølgende handlinger.

Forbered dataene

I dette afsnit udfører vi datarensning og funktionskonstruktion på datasættet.

Ryd data

Først definerer vi en funktion til at rense dataene, hvilket omfatter at slippe rækker med manglende data, fjerne dublerede rækker baseret på bestemte kolonner og slippe unødvendige kolonner.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

Funktionen clean_data hjælper med at sikre, at datasættet er fri for manglende værdier og dubletter, samtidig med at unødvendige kolonner fjernes.

Funktionskonstruktion

Derefter udfører vi funktionskonstruktion ved at oprette dummy-kolonner for kolonnerne 'Geography' og 'Gender' ved hjælp af en-hot-kodning.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Her bruger vi en-hot-kodning til at konvertere kategoriske kolonner til binære dummy-kolonner, hvilket gør dem velegnede til algoritmer til maskinel indlæring.

Vis rensede data

Endelig viser vi det rensede og funktionsudviklede datasæt ved hjælp af visningsfunktionen.


display(df_clean)

Dette trin giver dig mulighed for at undersøge den resulterende DataFrame med de anvendte transformationer.

Gem i lakehouse

Nu gemmer vi det rensede og funktionsudviklede datasæt i lakehouse.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Her tager vi den rensede og transformerede PySpark DataFrame, df_clean, og gemmer den som en Delta-tabel med navnet "churn_data_clean" i lakehouse. Vi bruger Delta-formatet til effektiv versionsstyring og administration af datasættet. mode("overwrite") sikrer, at alle eksisterende tabeller med samme navn overskrives, og at der oprettes en ny version af tabellen.

Opret test- og oplæringsdatasæt

Derefter opretter vi datasættene til test og oplæring ud fra de rensede og funktionsudviklede data.

I det angivne kodeafsnit indlæser vi et renset og funktionsudviklet datasæt fra lakehouse ved hjælp af Delta-format, opdeler det i trænings- og testsæt med et forhold på 80-20 og forbereder dataene til maskinel indlæring. Dette præparat omfatter import af VectorAssembler fra PySpark ML for at kombinere funktionskolonner i en enkelt "funktioner"-kolonne. Efterfølgende bruger vi VectorAssembler til at transformere oplærings- og testdatasæt, hvilket resulterer i train_data og test_data DataFrames, der indeholder målvariablen "Afsluttet" og funktionsvektorerne. Disse datasæt er nu klar til brug i forbindelse med oprettelse og evaluering af modeller til maskinel indlæring.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Oplær oprindelig model

Ved hjælp af de featuriserede data oplærer vi en model til grundlæggende maskinel indlæring, konfigurerer MLflow til sporing af eksperimenter, definerer en forudsigelsesfunktion til beregning af målepunkter og til sidst får du vist og logfører den resulterende ROC AUC-score.

Angiv logføringsniveau

Her konfigurerer vi logføringsniveauet for at undertrykke unødvendige output fra biblioteket Synapse.ml, så loggene er renere.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Konfigurer MLflow

I dette afsnit konfigurerer vi MLflow til sporing af eksperimenter. Vi angiver eksperimentnavnet til "automl_sample" for at organisere kørslerne. Derudover aktiverer vi automatisk logføring og sikrer, at modelparametre, målepunkter og artefakter automatisk logføres i MLflow.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Oplær og evaluer modellen

Endelig oplærer vi en LightGBMClassifier-model på de angivne oplæringsdata. Modellen er konfigureret med de nødvendige indstillinger for binær klassificering og håndtering af ubalancer. Vi bruger derefter denne oplærte model til at foretage forudsigelser af testdataene. Vi udtrækker de forudsagte sandsynligheder for den positive klasse og de sande mærkater fra testdataene. Derefter beregner vi ROC AUC-scoren ved hjælp af sklearns roc_auc_score funktion.

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Herfra kan vi se, at vores resulterende model opnår en ROC AUC-score på 84%.

Opret en AutoML-prøveversion med FLAML

I dette afsnit opretter vi en AutoML-prøveversion ved hjælp af FLAML-pakken, konfigurerer indstillingerne for prøveversionen, konverterer Spark-datasættet til en Pandas på Spark-datasæt, kører AutoML-prøveversionen og får vist de resulterende målepunkter.

Konfigurer AutoML-prøveversionen

Her importerer vi de nødvendige klasser og moduler fra FLAML-pakken og opretter en forekomst af AutoML, som bruges til at automatisere pipelinen til maskinel indlæring.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Konfigurer indstillinger

I dette afsnit definerer vi konfigurationsindstillingerne for AutoML-prøveversionen.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Konvertér til Pandas på Spark

Hvis du vil køre AutoML med et Spark-baseret datasæt, skal vi konvertere det til pandas på Spark-datasæt ved hjælp af funktionen to_pandas_on_spark. Dette gør det muligt for FLAML at arbejde effektivt med dataene.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Kør AutoML-prøveversionen

Nu udfører vi AutoML-prøveversionen. Vi bruger en indlejret MLflow-kørsel til at spore eksperimentet i den eksisterende MLflow-kørselskontekst. AutoML-prøveversionen udføres på Pandas on Spark-datasættet (df_automl) med målvariablen "Exited, og de definerede indstillinger overføres til fit-funktionen til konfiguration.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Få vist de resulterende målepunkter

I dette sidste afsnit henter og viser vi resultaterne af AutoML-prøveversionen. Disse målepunkter giver indsigt i ydeevnen og konfigurationen af AutoML-modellen på det angivne datasæt.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Paralleliser din AutoML-prøveversion med Apache Spark

I scenarier, hvor dit datasæt kan passe ind i en enkelt node, og du vil udnytte Styrken ved Spark til at køre flere parallelle AutoML-prøveversioner samtidigt, kan du følge disse trin:

Konvertér til Pandas-dataramme

Hvis du vil aktivere parallelisering, skal dine data først konverteres til en Pandas DataFrame.

pandas_df = train_raw.toPandas()

Her konverterer vi train_raw Spark DataFrame til en Pandas DataFrame med navnet pandas_df for at gøre den egnet til parallel behandling.

Konfigurer indstillinger for parallelisering

Angiv use_spark til True for at aktivere Spark-baseret parallelitet. FLAML starter som standard én prøveversion pr. eksekvering. Du kan tilpasse antallet af samtidige forsøg ved hjælp af argumentet n_concurrent_trials.

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

I disse indstillinger angiver vi, at vi vil bruge Spark til parallelitet ved at angive use_spark til True. Vi angiver også antallet af samtidige forsøg til 3, hvilket betyder, at tre forsøg kører parallelt på Spark.

Hvis du vil vide mere om, hvordan du paralleliserer dine AutoML-stier, kan du gå til dokumentationen til FLAML for at få flere oplysninger om parallelle Spark-job.

Kør AutoML-prøveversionen parallelt

Nu kører vi AutoML-prøveversionen parallelt med de angivne indstillinger. Vi bruger en indlejret MLflow-kørsel til at spore eksperimentet i den eksisterende MLflow-kørselskontekst.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Dette vil nu udføre AutoML-prøveversionen med parallelisering aktiveret. Argumentet dataframe er angivet til Pandas DataFrame-pandas_df, og andre indstillinger overføres til funktionen fit til parallel udførelse.

Vis målepunkter

Når du har kørt den parallelle AutoML-prøveversion, kan du hente og vise resultaterne, herunder den bedste konfiguration af hyperparameteren, ROC AUC på valideringsdataene og oplæringsvarigheden for den mest effektive kørsel.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))