Del via


Opprette modeller med automatisert ML (forhåndsversjon)

Automatisert machine Læring (AutoML) omfatter et sett med teknikker og verktøy som er utformet for å effektivisere prosessen med opplæring og optimalisering av maskinlæringsmodeller med minimal menneskelig inngripen. Det primære målet med AutoML er å forenkle og akselerere utvalget av den mest egnede maskinlæringsmodellen og hyperparameterne for et gitt datasett, en oppgave som vanligvis krever betydelig ekspertise og beregningsressurser. Innenfor Fabric-rammeverket kan dataforskere utnytte flaml.AutoML modulen til å automatisere ulike aspekter ved maskinlæringsarbeidsflytene.

I denne artikkelen vil vi fordype oss i prosessen med å generere AutoML-forsøk direkte fra kode ved hjelp av et Spark-datasett. I tillegg vil vi utforske metoder for å konvertere disse dataene til en Pandas-dataramme og diskutere teknikker for parallellisering av forsøkene.

Viktig

Denne funksjonen er i forhåndsvisning.

Forutsetning

  • Få et Microsoft Fabric-abonnement. Eller registrer deg for en gratis prøveversjon av Microsoft Fabric.

  • Logg på Microsoft Fabric.

  • Bruk opplevelsesbryteren til venstre på hjemmesiden for å bytte til Synapse Data Science-opplevelsen.

    Skjermbilde av menyen for opplevelsesbryteren, som viser hvor du velger Datavitenskap.

  • Opprett et nytt stoffmiljø , eller sørg for at du kjører på Fabric Runtime 1.2 (Spark 3.4 (eller høyere) og Delta 2.4)
  • Opprett en ny notatblokk.
  • Legg notatblokken til et lakehouse. På venstre side av notatblokken velger du Legg til for å legge til et eksisterende lakehouse eller opprette et nytt.

Laste inn og klargjøre data

I denne delen angir vi nedlastingsinnstillingene for dataene og lagrer dem deretter i lakehouse.

Last ned data

Denne kodeblokken laster ned dataene fra en ekstern kilde og lagrer dem i lakehouse

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Laste inn data i en Spark-dataramme

Følgende kodeblokk laster inn dataene fra CSV-filen i en Spark DataFrame og bufrer den for effektiv behandling.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Denne koden forutsetter at datafilen er lastet ned og er plassert i den angitte banen. Den leser CSV-filen inn i en Spark DataFrame, utleder skjemaet og bufrer det for raskere tilgang under etterfølgende operasjoner.

Klargjøre dataene

I denne delen skal vi utføre datarengjøring og funksjonsteknikk på datasettet.

Rens data

Først definerer vi en funksjon for å rense dataene, som inkluderer å slippe rader med manglende data, fjerne dupliserte rader basert på bestemte kolonner og slippe unødvendige kolonner.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

Funksjonen clean_data bidrar til å sikre at datasettet mangler verdier og duplikater mens unødvendige kolonner fjernes.

Funksjonsteknikk

Deretter utfører vi funksjonsteknikk ved å opprette dummykolonner for kolonnene Geografi og Kjønn ved hjelp av en varm koding.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Her bruker vi en varm koding til å konvertere kategoriske kolonner til binære dummy kolonner, noe som gjør dem egnet for maskinlæringsalgoritmer.

Vis rensede data

Til slutt viser vi det rengjorte og funksjonsutviklede datasettet ved hjelp av visningsfunksjonen.


display(df_clean)

Med dette trinnet kan du undersøke den resulterende DataFrame med de brukte transformasjonene.

Spar til lakehouse

Nå skal vi lagre det rengjorte og funksjonsutviklede datasettet til lakehouse.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Her tar vi den rengjorte og transformerte PySpark DataFrame, df_cleanog lagrer den som en Delta-tabell kalt "churn_data_clean" i lakehouse. Vi bruker Delta-formatet for effektiv versjonskontroll og administrasjon av datasettet. Sikrer mode("overwrite") at alle eksisterende tabeller med samme navn overskrives, og at en ny versjon av tabellen opprettes.

Opprette test- og opplæringsdatasett

Deretter skal vi opprette test- og opplæringsdatasettene fra de rengjorte og funksjonsutviklede dataene.

I den angitte kodedelen laster vi inn et rengjort og funksjonsutviklet datasett fra lakehouse ved hjelp av Delta-format, deler det inn i opplærings- og testsett med et 80-20-forhold og klargjør dataene for maskinlæring. Denne forberedelsen innebærer å VectorAssembler importere fra PySpark ML for å kombinere funksjonskolonner til én enkelt «funksjoner»-kolonne. Deretter bruker VectorAssembler vi til å transformere opplærings- og testdatasettene, noe som resulterer i train_data og test_data DataFrames som inneholder målvariabelen «Exited» og funksjonsvektorene. Disse datasettene er nå klare til bruk i bygging og evaluering av maskinlæringsmodeller.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Kalibrer grunnlinjemodell

Ved hjelp av de medfølgende dataene skal vi lære opp en maskinlæringsmodell for grunnlinje, konfigurere MLflow for eksperimentsporing, definere en prognosefunksjon for beregning av måledata, og til slutt vise og logge den resulterende ROC AUC-poengsummen.

Angi loggingsnivå

Her konfigurerer vi loggingsnivået til å undertrykke unødvendige utdata fra biblioteket Synapse.ml, slik at loggene blir renere.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Konfigurer MLflow

I denne delen konfigurerer vi MLflow for eksperimentsporing. Vi setter eksperimentnavnet til «automl_sample» for å organisere kjøringene. I tillegg aktiverer vi automatisk logging, slik at modellparametere, måledata og artefakter logges automatisk til MLflow.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Kalibrer og evaluer modellen

Til slutt lærer vi opp en LightGBMClassifier-modell på de angitte opplæringsdataene. Modellen er konfigurert med de nødvendige innstillingene for binær klassifisering og ubalansehåndtering. Vi bruker deretter denne opplærte modellen til å lage prognoser på testdataene. Vi trekker ut de forventede sannsynlighetene for den positive klassen og de sanne etikettene fra testdataene. Etterpå beregner vi ROC AUC-poengsummen ved hjelp av sklearns roc_auc_score funksjon.

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Herfra kan vi se at vår resulterende modell oppnår en ROC AUC-poengsum på 84%.

Opprette en AutoML-prøveversjon med FLAML

I denne delen skal vi opprette en AutoML-prøveversjon ved hjelp av FLAML-pakken, konfigurere prøveinnstillingene, konvertere Spark-datasettet til en Pandas på Spark-datasett, kjøre AutoML-prøveversjonen og vise de resulterende måledataene.

Konfigurer AutoML-prøveversjonen

Her importerer vi de nødvendige klassene og modulene fra FLAML-pakken og oppretter en forekomst av AutoML, som vil bli brukt til å automatisere maskinlæringsforløpet.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Konfigurer innstillinger

I denne delen definerer vi konfigurasjonsinnstillingene for prøveversjonen av AutoML.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Konverter til pandaer på spark

Hvis du vil kjøre AutoML med et Spark-basert datasett, må vi konvertere det til en Pandas på Spark-datasett ved hjelp av to_pandas_on_spark funksjonen. Dette gjør det mulig for FLAML å arbeide effektivt med dataene.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Kjør prøveversjonen av AutoML

Nå utfører vi AutoML-prøveversjonen. Vi bruker en nestet MLflow-kjøring til å spore eksperimentet i den eksisterende MLflow-kjørekonteksten. AutoML-prøveversjonen utføres på Pandas på Spark-datasettet (df_automl) med målvariabelen "Exited og de definerte innstillingene sendes til fit funksjonen for konfigurasjon.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Vis resulterende måledata

I denne siste delen henter og viser vi resultatene av AutoML-prøveversjonen. Disse måledataene gir innsikt i ytelsen og konfigurasjonen av AutoML-modellen på det gitte datasettet.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Parallelliser prøveversjonen av AutoML med Apache Spark

I scenarioer der datasettet kan passe inn i én enkelt node, og du vil utnytte kraften i Spark for å kjøre flere parallelle AutoML-prøveversjoner samtidig, kan du følge disse trinnene:

Konverter til Pandas-dataramme

Hvis du vil aktivere parallellisering, må dataene først konverteres til en Pandas DataFrame.

pandas_df = train_raw.toPandas()

Her konverterer train_raw vi Spark DataFrame til en Pandas DataFrame navngitt pandas_df for å gjøre den egnet for parallell behandling.

Konfigurer innstillinger for parallellisering

Angi use_spark at spark-basert parallellisme skal True aktiveres. Som standard starter FLAML én prøveversjon per eksekutor. Du kan tilpasse antall samtidige forsøk ved hjelp n_concurrent_trials av argumentet.

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

I disse innstillingene angir vi at vi ønsker å bruke Spark for parallellisme ved å angi use_spark til True. Vi angir også antall samtidige forsøk til 3, noe som betyr at tre forsøk vil kjøre parallelt på Spark.

Hvis du vil lære mer om hvordan du parallelliserer AutoML-løyper, kan du gå til FLAML-dokumentasjonen for parallelle Spark-jobber.

Kjør AutoML-prøveversjonen parallelt

Nå kjører vi AutoML-prøveversjonen parallelt med de angitte innstillingene. Vi bruker en nestet MLflow-kjøring til å spore eksperimentet i den eksisterende MLflow-kjørekonteksten.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Dette vil nå kjøre AutoML-prøveversjonen med parallellisering aktivert. Argumentet dataframe er satt til Pandas DataFrame pandas_df, og andre innstillinger sendes til fit funksjonen for parallell kjøring.

Vis måledata

Når du har kjørt den parallelle AutoML-prøveversjonen, henter og viser du resultatene, inkludert den beste hyperparameterkonfigurasjonen, ROC AUC på valideringsdataene og opplæringsvarigheten for kjøringen med best ytelse.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))