Sdílet prostřednictvím


Vytváření modelů pomocí automatizovaného strojového učení (Preview)

Automatizované strojové Učení (AutoML) zahrnuje sadu technik a nástrojů navržených tak, aby zjednodušily proces trénování a optimalizace modelů strojového učení s minimálním zásahem člověka. Hlavním cílem AutoML je zjednodušit a urychlit výběr nejvhodnějšího modelu strojového učení a hyperparametrů pro danou datovou sadu, což je úkol, který obvykle vyžaduje značné znalosti a výpočetní prostředky. Datoví vědci mohou v rámci architektury Fabric využít flaml.AutoML modul k automatizaci různých aspektů pracovních postupů strojového učení.

V tomto článku se podíváme na proces generování zkušebních verzí AutoML přímo z kódu pomocí datové sady Spark. Dále prozkoumáme metody pro převod těchto dat do datového rámce Pandas a probereme techniky paralelizace zkušebních pokusů o experimentování.

Důležité

Tato funkce je ve verzi Preview.

Požadavky

  • Vytvořte nové prostředí Fabric nebo se ujistěte, že používáte modul Runtime Fabric 1.2 (Spark 3.4 nebo novější) a Delta 2.4.
  • Vytvořte nový poznámkový blok.
  • Připojte poznámkový blok k jezeru. Na levé straně poznámkového bloku vyberte Přidat a přidejte existující jezero nebo vytvořte nový.

Načtení a příprava dat

V této části určíme nastavení stahování dat a pak je uložíme do jezera.

Stahování souborů

Tento blok kódu stáhne data ze vzdáleného zdroje a uloží je do jezera.

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Načtení dat do datového rámce Sparku

Následující blok kódu načte data ze souboru CSV do datového rámce Sparku a uloží je do mezipaměti pro efektivní zpracování.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Tento kód předpokládá, že se datový soubor stáhl a nachází se v zadané cestě. Načte soubor CSV do datového rámce Sparku, odvodí schéma a uloží ho do mezipaměti pro rychlejší přístup během následujících operací.

Příprava dat

V této části provedeme čištění dat a přípravu funkcí v datové sadě.

Vyčištění dat

Nejprve definujeme funkci pro vyčištění dat, která zahrnuje vyřazení řádků s chybějícími daty, odebrání duplicitních řádků na základě konkrétních sloupců a vyřazení nepotřebných sloupců.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

Tato clean_data funkce pomáhá zajistit, že datová sada neobsahuje chybějící hodnoty a duplicity a zároveň odebere nepotřebné sloupce.

Příprava atributů

Dále provedeme přípravu funkcí vytvořením fiktivních sloupců pro sloupce Geography (Zeměpis) a Gender (Pohlaví) pomocí kódování 1-hot.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Tady použijeme kódování typu 1-hot k převodu sloupců kategorií na binární fiktivní sloupce, které jsou vhodné pro algoritmy strojového učení.

Zobrazení vyčištěných dat

Nakonec pomocí funkce zobrazení zobrazíme vyčištěnou a funkční datovou sadu.


display(df_clean)

Tento krok umožňuje zkontrolovat výsledný datový rámec s použitými transformacemi.

Uložit do lakehouse

Teď uložíme vyčištěnou a funkční datovou sadu do jezera.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Tady vezmeme vyčištěný a transformovaný datový rámec df_cleanPySpark a uložíme ho jako tabulku Delta s názvem "churn_data_clean" v jezeře. K efektivní správě verzí a správy datové sady používáme formát Delta. Zajistí mode("overwrite") , že se přepíše jakákoli existující tabulka se stejným názvem a vytvoří se nová verze tabulky.

Vytvoření testovacích a trénovacích datových sad

Dále vytvoříme testovací a trénovací datové sady z vyčištěných a funkcí navržených dat.

V zadané části kódu načteme vyčištěnou a funkční datovou sadu z lakehouse pomocí formátu Delta, rozdělíme ji na trénovací a testovací sady s poměrem 80–20 a připravíme data pro strojové učení. Tato příprava zahrnuje import VectorAssembler z PySpark ML, aby se sloupce funkcí zkombinují do jednoho sloupce "funkcí". Následně použijeme VectorAssembler k transformaci trénovacích a testovacích datových sad, což vede train_data k tomu, test_data že datové rámce obsahují cílovou proměnnou "Exited" a vektory funkcí. Tyto datové sady jsou teď připravené k použití při sestavování a vyhodnocování modelů strojového učení.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Trénování standardního modelu

Pomocí featurizovaných dat vytrénujeme základní model strojového učení, nakonfigurujeme MLflow pro sledování experimentů, definujeme prediktivní funkci pro výpočet metrik a nakonec zobrazíme a zapíšeme výsledné skóre ROC AUC.

Nastavení úrovně protokolování

Tady nakonfigurujeme úroveň protokolování tak, aby potlačit nepotřebný výstup z knihovny Synapse.ml, aby byly protokoly přehlednější.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Konfigurace MLflow

V této části nakonfigurujeme MLflow pro sledování experimentů. Pro uspořádání spuštění nastavíme název experimentu na "automl_sample". Kromě toho povolíme automatické protokolování a zajistíme, aby se parametry modelu, metriky a artefakty automaticky protokolovaly do MLflow.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Trénování a vyhodnocení modelu

Nakonec vytrénujeme model LightGBMClassifier na zadaných trénovacích datech. Model je nakonfigurovaný s nezbytnými nastaveními pro binární klasifikaci a manipulaci s nevyvážeností. Tento natrénovaný model pak použijeme k předpovědím na testovacích datech. Z testovacích dat extrahujeme predikované pravděpodobnosti pro kladnou třídu a skutečné popisky. Potom vypočítáme skóre ROC AUC pomocí funkce sklearn.roc_auc_score

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Odsud vidíme, že náš výsledný model dosahuje skóre ROC AUC o 84 %.

Vytvoření zkušební verze AutoML pomocí FLAML

V této části vytvoříme zkušební verzi AutoML pomocí balíčku FLAML, nakonfigurujeme nastavení zkušební verze, převedeme datovou sadu Sparku na sadu Pandas v datové sadě Spark, spustíme zkušební verzi AutoML a zobrazíme výsledné metriky.

Konfigurace zkušební verze AutoML

Tady naimportujeme potřebné třídy a moduly z balíčku FLAML a vytvoříme instanci AutoML, která se použije k automatizaci kanálu strojového učení.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Konfigurace nastavení

V této části definujeme nastavení konfigurace pro zkušební verzi AutoML.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Převod na Pandas ve Sparku

Abychom mohli spustit AutoML s datovou sadou založenou na Sparku, musíme ji pomocí funkce převést na datovou sadu Pandas ve Sparku to_pandas_on_spark . Díky tomu může FLAML efektivně pracovat s daty.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Spuštění zkušební verze AutoML

Teď spustíme zkušební verzi AutoML. Ke sledování experimentu v existujícím kontextu spuštění MLflow používáme vnořené spuštění MLflow. Zkušební verze AutoML se provádí u datové sady Pandas ve Sparku (df_automl) s cílovou proměnnou "Exited a definovaná nastavení se předají fit funkci pro konfiguraci.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Zobrazení výsledných metrik

V této poslední části načteme a zobrazíme výsledky zkušební verze AutoML. Tyto metriky poskytují přehled o výkonu a konfiguraci modelu AutoML pro danou datovou sadu.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Paralelizace zkušební verze AutoML pomocí Apache Sparku

Ve scénářích, ve kterých se datová sada může vejít do jednoho uzlu a chcete využít sílu Sparku pro souběžné spouštění několika paralelních zkušebních verzí AutoML, můžete postupovat následovně:

Převod na datový rámec Pandas

Aby bylo možné povolit paralelizaci, musí být vaše data nejprve převedena na datový rámec Pandas.

pandas_df = train_raw.toPandas()

V této části převedeme datový rámec Sparku train_raw na datový rámec Pandas s názvem pandas_df , aby byl vhodný pro paralelní zpracování.

Konfigurace nastavení paralelizace

Nastavte use_spark na True povolení paralelismu založeného na Sparku. FlaML ve výchozím nastavení spustí jednu zkušební verzi na exekutor. Počet souběžných pokusů můžete přizpůsobit pomocí argumentu n_concurrent_trials .

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

Vtěchtoch use_spark True Také jsme nastavili počet souběžných zkušebních verzí na 3, což znamená, že tři zkušební verze se budou spouštět paralelně ve Sparku.

Další informace o paralelizaci tras AutoML najdete v dokumentaci FLAML pro paralelní úlohy Sparku.

Paralelní spuštění zkušební verze AutoML

Teď spustíme zkušební verzi AutoML paralelně se zadanými nastaveními. Ke sledování experimentu v existujícím kontextu spuštění MLflow použijeme vnořené spuštění MLflow.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Tím se teď spustí zkušební verze AutoML s povolenou paralelizací. Argument dataframe je nastavený na datový rámec pandas_dfPandas a další nastavení jsou předány fit funkci pro paralelní provádění.

Zobrazení metrik

Po spuštění paralelní zkušební verze AutoML načtěte a zobrazte výsledky, včetně nejlepší konfigurace hyperparametrů, ROC AUC na ověřovacích datech a doby trénování nejlepšího spuštění.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))