Jaa


Mallien luominen automaattianalyysipalveluilla (esikatselu)

Automaattiset Automaattianalyysipalvelut (AutoML) sisältävät joukon tekniikoita ja työkaluja, joiden avulla voidaan tehostaa koneoppimismallien harjoittamista ja optimointia mahdollisimman pienellä inhimillisellä väliintulolla. Automaattianalyysipalveluiden ensisijaisena tavoitteena on yksinkertaistaa ja nopeuttaa tietyn tietojoukon sopivimman koneoppimismallin ja hyperparametrien valintaa. Tämä on tehtävä, joka vaatii yleensä huomattavaa asiantuntemusta ja laskennallisia resursseja. Fabric-kehyksessä datatieteilijät voivat automatisoida koneoppimisen työnkulkujen eri näkökohtia moduulin flaml.AutoML avulla.

Tässä artikkelissa perehdytaan automaattianalyysipalveluiden kokeiluversioiden luomiseen suoraan koodista Spark-tietojoukon avulla. Lisäksi tutkimme menetelmiä näiden tietojen muuntamiseksi Pandas-tietokehykseksi ja keskustelemme tekniikoista kokeilujen rinnakkaistamiseksi.

Tärkeä

Tämä ominaisuus on esikatselutilassa.

Edellytykset

  • Luo uusi Fabric-ympäristö tai varmista, että käytössäsi ovat Fabric Runtime 1.2 (Spark 3.4 (tai uudempi) ja Delta 2.4)
  • Luo uusi muistikirja.
  • Liitä muistikirjasi Lakehouseen. Valitse muistikirjasi vasemmasta reunasta Lisää , jos haluat lisätä aiemmin luodun lakehousen tai luoda uuden.

Lataa ja valmistele tiedot

Tässä osiossa määritämme tietojen latausasetukset ja tallennamme ne sitten Lakehouse-tallennustilaan.

Lataa tiedot

Tämä koodilohko lataa tiedot etälähteestä ja tallentaa ne Lakehouse-järjestelmään.

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Tietojen lataaminen Spark-tietokehykseen

Seuraava koodilohko lataa TIEDOT CSV-tiedostosta Spark DataFrameen ja tallentaa ne välimuistiin tehokasta käsittelyä varten.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Tässä koodissa oletetaan, että datatiedosto on ladattu ja että se sijaitsee määritetyssä polussa. Se lukee CSV-tiedoston Spark DataFrame -kehykseen, muodostaa rakenteen ja tallentaa sen välimuistiin, jotta sitä on helpompi käyttää seuraavien toimintojen aikana.

Tietojen valmistelu

Tässä osiossa teemme tietojoukolle tietojen siistimisen ja ominaisuuksien suunnittelun.

Tietojen siistiminen

Ensin määritämme funktion tietojen siistimiseksi. Siihen kuuluu rivien pudottaminen puuttuvista tiedoista, kaksoiskappalerivien poistaminen tiettyjen sarakkeiden perusteella ja tarpeettomien sarakkeiden pudottaminen.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

Funktion clean_data avulla varmistetaan, että tietojoukossa ei ole puuttuvia arvoja ja kaksoiskappaleita samalla, kun poistetaan tarpeettomia sarakkeita.

Ominaisuuksien suunnittelu

Seuraavaksi teemme ominaisuussuunnittelun luomalla tarpeettomat sarakkeet Geography- ja Gender-sarakkeille käyttämällä yhtä kuumaa koodausta.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Tässä käytämme yhden kuuman koodauksen avulla luokittaisia sarakkeita binaarista tyhjiä sarakkeita varten, jolloin ne sopivat koneoppimisen algoritmeja varten.

Siistittyjen tietojen näyttäminen

Lopuksi näytämme siistityn ja ominaisuuspohjaisen tietojoukon näyttöfunktion avulla.


display(df_clean)

Tämän vaiheen avulla voit tarkastaa tuloksena saatavan DataFrame-kehyksen käyttäen käytössä olevia muunnoksia.

Tallenna Lakehouseen

Nyt tallennamme siistityn ja ominaisuusteknisen tietojoukon Lakehouseen.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Tässä otamme siistityn ja muunnetun PySpark DataFramen, df_clean, ja tallennamme sen Delta-taulukoksi nimeltä "churn_data_clean" lakehousessa. Delta-muotoa käytetään tietojoukon tehokkaaseen versioinniin ja hallintaan. mode("overwrite") varmistaa, että kaikki samannimiset olemassa olevat taulukot korvataan ja että taulukosta luodaan uusi versio.

Testi- ja koulutustietojoukkojen luominen

Seuraavaksi luomme testi- ja koulutustietojoukot siistityistä ja ominaisuusteknistetyistä tiedoista.

Annetussa koodiosiossa lataamme siistityn ja ominaisuuspohjaisen tietojoukon Lakehouse-muodosta Delta-muotoa käyttäen, jaamme sen koulutus- ja testausjoukkoihin 80–20-suhteessa ja valmistelemme tiedot koneoppimista varten. Valmistelu edellyttää, että tuot PySpark ML:n VectorAssembler ominaisuussarakkeiden yhdistämiseksi yhdeksi ominaisuussarakkeeksi. Tämän jälkeen muunnamme VectorAssembler harjoitus- ja testaustietojoukot, minkä tuloksena train_data saadaan sekä test_data datakehykset, jotka sisältävät kohdemuuttujan "Exited" ja ominaisuusvektorit. Nämä tietojoukot ovat nyt valmiita käytettäväksi koneoppimismallien luomisessa ja arvioinnissa.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Junan perusmalli

Käyttämällä esimerkittyjä tietoja harjoitamme perustason koneoppimismallin, määritämme MLflow-kohteen kokeiluja varten, määritämme ennustefunktion mittarien laskemista varten ja lopuksi tarkastelemme ja kirjaamme tulokseksi saadut ROC AUC -pisteet.

Kirjaamisen tason määrittäminen

Tässä määritämme kirjaustason estääksemme Synapse.ml kirjaston tarpeettomat tulokset, mikä pitää lokit siistimpinä.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

MLflow'n määrittäminen

Tässä osiossa MLflow määritetään kokeilemisen seurantaa varten. Määritämme kokeilun nimeksi "automl_sample" suoritusten järjestämiseksi. Lisäksi automaattisen kirjaamisen avulla varmistetaan, että mallin parametrit, mittarit ja artefaktit kirjataan automaattisesti MLflow'hin.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Mallin harjoittaminen ja arvioiminen

Lopuksi harjoitamme LightGBMClassifier-mallin annetuista harjoitustiedoista. Malliin on määritetty tarvittavat asetukset binaariluokitusta ja epätasapainon käsittelyä varten. Tämän harjoitetun mallin avulla tehdään ennusteita testitiedoista. Poimimme testitiedoista positiivisen luokan ja tosi-otsikoiden ennustetut todennäköisyydet. Sen jälkeen laskemme ROC AUC -pistemäärän sklearnin funktiolla roc_auc_score .

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Täältä voimme nähdä, että tuloksena saatava mallimme saa ROC AUC -pistemäärän 84%.

Automaattianalyysipalveluiden kokeiluversion luominen FLAML:n avulla

Tässä osiossa luomme automaattianalyysipalveluiden kokeiluversion FLAML-paketilla, määritämme kokeiluasetukset, muunnamme Spark-tietojoukon Pandas on Spark -tietojoukoksi, suoritamme AutoML-kokeilun ja tarkastelemme tuloksena saatuja arvoja.

Automaattianalyysipalveluiden kokeiluversion määrittäminen

Tässä tuomme tarvittavat luokat ja moduulit FLAML-paketista ja luomme automaattianalyysipalveluiden esiintymän, jota käytetään koneoppimisputken automatisointeihin.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Määritä asetukset

Tässä osiossa määritämme automaattianalyysipalveluiden kokeiluversion määritysasetukset.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Muunna Pandasiksi Sparkissä

Jos haluat suorittaa automaattianalyysipalveluita Spark-pohjaisen tietojoukon kanssa, se on muunnettava Pandas on Spark -tietojoukoksi -funktion to_pandas_on_spark avulla. Näin FLAML voi käsitellä tietoja tehokkaasti.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Automaattianalyysipalveluiden kokeiluversion suorittaminen

Nyt suoritamme automaattianalyysipalveluiden kokeiluversion. Seuraamme sisäkkäisen MLflow-suorituksen avulla kokeilua olemassa olevassa MLflow-suorituskontekstissa. AutoML-kokeilu suoritetaan Pandas on Spark -tietojoukossa (df_automl) kohdemuuttujalla "Exited ja määritetyt asetukset välitetään funktiolle fit määritettäväksi.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Tarkastele tuloksena saatuja mittareita

Tässä viimeisessä osiossa noudetaan ja näytetään automaattianalyysipalveluiden kokeiluversion tulokset. Nämä mittarit tarjoavat merkityksellisiä tietoja automaattianalyysipalvelumallin suorituskyvystä ja määrityksestä kyseisessä tietojoukossa.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

AutoML-kokeiluversion rinnakkaisottaminen Apache Sparkin avulla

Jos tietojoukkosi mahtuu yhteen solmuun ja haluat hyödyntää Sparkin tehoa useiden samanaikaisten automaattianalyysipalveluiden kokeilujen suorittamisessa samanaikaisesti, voit tehdä seuraavat toimet:

Muunna Pandas-tietokehykseksi

Rinnakkaistumisen mahdollistamiseksi tiedot on ensin muunnettava Pandas DataFrame -kehykseksi.

pandas_df = train_raw.toPandas()

Tässä spark-tietokehyksen train_raw voi muuntaa nimetyksi Pandas DataFrame -kehykseksi pandas_df , jotta se sopii rinnakkaiseen käsittelyyn.

Parallelisoinnin asetusten määrittäminen

True Ota Spark-pohjainen rinnakkaisuus käyttöön määrittämällä use_spark se. OLETUSARVOisesti FLAML käynnistää yhden kokeiluversion suoritettavaa tiedostoa kohden. Voit mukauttaa samanaikaisten kokeilujen määrää käyttämällä -argumenttia n_concurrent_trials .

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

Näissä asetuksissa määritämme, että haluamme käyttää Sparkiä rinnakkaisuudessa määrittämällä asetukseksi use_spark True. Määritämme myös samanaikaisten kokeiden määräksi 3, mikä tarkoittaa sitä, että kolme kokeiluversiota suoritetaan rinnakkain Sparkissä.

Saat lisätietoja automaattianalyysipalveluiden jälkien rinnakkaistamisesta käymällä FLAML-dokumentaatiossa rinnakkaisten Spark-töiden osalta.

Automaattianalyysipalveluiden kokeiluversion suorittaminen rinnakkain

Nyt suoritamme AutoML-kokeiluversion rinnakkain määritettyjen asetusten kanssa. Seuraamme sisäkkäisen MLflow-suorituksen avulla kokeilua olemassa olevassa MLflow-suorituskontekstissa.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Tämä suorittaa nyt automaattianalyysipalveluiden kokeiluversion, jossa rinnakkaisuus on käytössä. Argumenttina dataframe on Pandas DataFrame pandas_df, ja muut asetukset välitetään funktiolle fit rinnakkaista suoritusta varten.

Näytä mittarit

Kun olet suorittanut rinnakkaisen autoML-kokeilun, nouda ja näytä tulokset, mukaan lukien paras hyperparametrimääritys, ROC AUC vahvistustiedoissa ja parhaan suorituksen harjoittamisen kesto.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))