Vytváření modelů pomocí automatizovaného strojového učení (Preview)
Automatizované strojové Učení (AutoML) zahrnuje sadu technik a nástrojů navržených tak, aby zjednodušily proces trénování a optimalizace modelů strojového učení s minimálním zásahem člověka. Hlavním cílem AutoML je zjednodušit a urychlit výběr nejvhodnějšího modelu strojového učení a hyperparametrů pro danou datovou sadu, což je úkol, který obvykle vyžaduje značné znalosti a výpočetní prostředky. Datoví vědci mohou v rámci architektury Fabric využít flaml.AutoML
modul k automatizaci různých aspektů pracovních postupů strojového učení.
V tomto článku se podíváme na proces generování zkušebních verzí AutoML přímo z kódu pomocí datové sady Spark. Dále prozkoumáme metody pro převod těchto dat do datového rámce Pandas a probereme techniky paralelizace zkušebních pokusů o experimentování.
Důležité
Tato funkce je ve verzi Preview.
Požadavky
Získejte předplatné Microsoft Fabric. Nebo si zaregistrujte bezplatnou zkušební verzi Microsoft Fabricu.
Přihlaste se k Microsoft Fabric.
Pomocí přepínače prostředí na levé straně domovské stránky přepněte na prostředí Synapse Datová Věda.
- Vytvořte nové prostředí Fabric nebo se ujistěte, že používáte modul Runtime Fabric 1.2 (Spark 3.4 nebo novější) a Delta 2.4.
- Vytvořte nový poznámkový blok.
- Připojte poznámkový blok k jezeru. Na levé straně poznámkového bloku vyberte Přidat a přidejte existující jezero nebo vytvořte nový.
Načtení a příprava dat
V této části určíme nastavení stahování dat a pak je uložíme do jezera.
Stahování souborů
Tento blok kódu stáhne data ze vzdáleného zdroje a uloží je do jezera.
import os
import requests
IS_CUSTOM_DATA = False # if TRUE, dataset has to be uploaded manually
if not IS_CUSTOM_DATA:
# Specify the remote URL where the data is hosted
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
# List of data files to download
file_list = ["churn.csv"]
# Define the download path within the lakehouse
download_path = "/lakehouse/default/Files/churn/raw"
# Check if the lakehouse directory exists; if not, raise an error
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
# Create the download directory if it doesn't exist
os.makedirs(download_path, exist_ok=True)
# Download each data file if it doesn't already exist in the lakehouse
for fname in file_list:
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
Načtení dat do datového rámce Sparku
Následující blok kódu načte data ze souboru CSV do datového rámce Sparku a uloží je do mezipaměti pro efektivní zpracování.
df = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv("Files/churn/raw/churn.csv")
.cache()
)
Tento kód předpokládá, že se datový soubor stáhl a nachází se v zadané cestě. Načte soubor CSV do datového rámce Sparku, odvodí schéma a uloží ho do mezipaměti pro rychlejší přístup během následujících operací.
Příprava dat
V této části provedeme čištění dat a přípravu funkcí v datové sadě.
Vyčištění dat
Nejprve definujeme funkci pro vyčištění dat, která zahrnuje vyřazení řádků s chybějícími daty, odebrání duplicitních řádků na základě konkrétních sloupců a vyřazení nepotřebných sloupců.
# Define a function to clean the data
def clean_data(df):
# Drop rows with missing data across all columns
df = df.dropna(how="all")
# Drop duplicate rows based on 'RowNumber' and 'CustomerId'
df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
# Drop columns: 'RowNumber', 'CustomerId', 'Surname'
df = df.drop('RowNumber', 'CustomerId', 'Surname')
return df
# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")
# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)
Tato clean_data
funkce pomáhá zajistit, že datová sada neobsahuje chybějící hodnoty a duplicity a zároveň odebere nepotřebné sloupce.
Příprava atributů
Dále provedeme přípravu funkcí vytvořením fiktivních sloupců pro sloupce Geography (Zeměpis) a Gender (Pohlaví) pomocí kódování 1-hot.
# Import PySpark functions
from pyspark.sql import functions as F
# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
"*",
F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)
# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")
Tady použijeme kódování typu 1-hot k převodu sloupců kategorií na binární fiktivní sloupce, které jsou vhodné pro algoritmy strojového učení.
Zobrazení vyčištěných dat
Nakonec pomocí funkce zobrazení zobrazíme vyčištěnou a funkční datovou sadu.
display(df_clean)
Tento krok umožňuje zkontrolovat výsledný datový rámec s použitými transformacemi.
Uložit do lakehouse
Teď uložíme vyčištěnou a funkční datovou sadu do jezera.
# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")
Tady vezmeme vyčištěný a transformovaný datový rámec df_clean
PySpark a uložíme ho jako tabulku Delta s názvem "churn_data_clean" v jezeře. K efektivní správě verzí a správy datové sady používáme formát Delta. Zajistí mode("overwrite")
, že se přepíše jakákoli existující tabulka se stejným názvem a vytvoří se nová verze tabulky.
Vytvoření testovacích a trénovacích datových sad
Dále vytvoříme testovací a trénovací datové sady z vyčištěných a funkcí navržených dat.
V zadané části kódu načteme vyčištěnou a funkční datovou sadu z lakehouse pomocí formátu Delta, rozdělíme ji na trénovací a testovací sady s poměrem 80–20 a připravíme data pro strojové učení. Tato příprava zahrnuje import VectorAssembler
z PySpark ML, aby se sloupce funkcí zkombinují do jednoho sloupce "funkcí". Následně použijeme VectorAssembler
k transformaci trénovacích a testovacích datových sad, což vede train_data
k tomu, test_data
že datové rámce obsahují cílovou proměnnou "Exited" a vektory funkcí. Tyto datové sady jsou teď připravené k použití při sestavování a vyhodnocování modelů strojového učení.
# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler
# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")
# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)
# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]
# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]
Trénování standardního modelu
Pomocí featurizovaných dat vytrénujeme základní model strojového učení, nakonfigurujeme MLflow pro sledování experimentů, definujeme prediktivní funkci pro výpočet metrik a nakonec zobrazíme a zapíšeme výsledné skóre ROC AUC.
Nastavení úrovně protokolování
Tady nakonfigurujeme úroveň protokolování tak, aby potlačit nepotřebný výstup z knihovny Synapse.ml, aby byly protokoly přehlednější.
import logging
logging.getLogger('synapse.ml').setLevel(logging.ERROR)
Konfigurace MLflow
V této části nakonfigurujeme MLflow pro sledování experimentů. Pro uspořádání spuštění nastavíme název experimentu na "automl_sample". Kromě toho povolíme automatické protokolování a zajistíme, aby se parametry modelu, metriky a artefakty automaticky protokolovaly do MLflow.
import mlflow
# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)
Trénování a vyhodnocení modelu
Nakonec vytrénujeme model LightGBMClassifier na zadaných trénovacích datech. Model je nakonfigurovaný s nezbytnými nastaveními pro binární klasifikaci a manipulaci s nevyvážeností. Tento natrénovaný model pak použijeme k předpovědím na testovacích datech. Z testovacích dat extrahujeme predikované pravděpodobnosti pro kladnou třídu a skutečné popisky. Potom vypočítáme skóre ROC AUC pomocí funkce sklearn.roc_auc_score
from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score
# Assuming you have already defined 'train_data' and 'test_data'
with mlflow.start_run(run_name="default") as run:
# Create a LightGBMClassifier model with specified settings
model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
# Fit the model to the training data
model = model.fit(train_data)
# Get the predictions
predictions = model.transform(test_data)
# Extract the predicted probabilities for the positive class
y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()
# Extract the true labels from the 'test_data' DataFrame
y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()
# Compute the ROC AUC score
roc_auc = roc_auc_score(y_true, y_pred)
# Log the ROC AUC score with MLflow
mlflow.log_metric("ROC_AUC", roc_auc)
# Print or log the ROC AUC score
print("ROC AUC Score:", roc_auc)
Odsud vidíme, že náš výsledný model dosahuje skóre ROC AUC o 84 %.
Vytvoření zkušební verze AutoML pomocí FLAML
V této části vytvoříme zkušební verzi AutoML pomocí balíčku FLAML, nakonfigurujeme nastavení zkušební verze, převedeme datovou sadu Sparku na sadu Pandas v datové sadě Spark, spustíme zkušební verzi AutoML a zobrazíme výsledné metriky.
Konfigurace zkušební verze AutoML
Tady naimportujeme potřebné třídy a moduly z balíčku FLAML a vytvoříme instanci AutoML, která se použije k automatizaci kanálu strojového učení.
# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark
# Create an AutoML instance
automl = AutoML()
Konfigurace nastavení
V této části definujeme nastavení konfigurace pro zkušební verzi AutoML.
# Define AutoML settings
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"log_file_name": 'flaml_experiment.log', # FLAML log file
"seed": 41, # Random seed
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Převod na Pandas ve Sparku
Abychom mohli spustit AutoML s datovou sadou založenou na Sparku, musíme ji pomocí funkce převést na datovou sadu Pandas ve Sparku to_pandas_on_spark
. Díky tomu může FLAML efektivně pracovat s daty.
# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)
Spuštění zkušební verze AutoML
Teď spustíme zkušební verzi AutoML. Ke sledování experimentu v existujícím kontextu spuštění MLflow používáme vnořené spuštění MLflow. Zkušební verze AutoML se provádí u datové sady Pandas ve Sparku (df_automl
) s cílovou proměnnou "Exited
a definovaná nastavení se předají fit
funkci pro konfiguraci.
'''The main flaml automl API'''
with mlflow.start_run(nested=True):
automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)
Zobrazení výsledných metrik
V této poslední části načteme a zobrazíme výsledky zkušební verze AutoML. Tyto metriky poskytují přehled o výkonu a konfiguraci modelu AutoML pro danou datovou sadu.
# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))
Paralelizace zkušební verze AutoML pomocí Apache Sparku
Ve scénářích, ve kterých se datová sada může vejít do jednoho uzlu a chcete využít sílu Sparku pro souběžné spouštění několika paralelních zkušebních verzí AutoML, můžete postupovat následovně:
Převod na datový rámec Pandas
Aby bylo možné povolit paralelizaci, musí být vaše data nejprve převedena na datový rámec Pandas.
pandas_df = train_raw.toPandas()
V této části převedeme datový rámec Sparku train_raw
na datový rámec Pandas s názvem pandas_df
, aby byl vhodný pro paralelní zpracování.
Konfigurace nastavení paralelizace
Nastavte use_spark
na True
povolení paralelismu založeného na Sparku. FlaML ve výchozím nastavení spustí jednu zkušební verzi na exekutor. Počet souběžných pokusů můžete přizpůsobit pomocí argumentu n_concurrent_trials
.
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"seed": 41, # Random seed
"use_spark": True, # Enable Spark-based parallelism
"n_concurrent_trials": 3, # Number of concurrent trials to run
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Vtěchtoch use_spark
True
Také jsme nastavili počet souběžných zkušebních verzí na 3, což znamená, že tři zkušební verze se budou spouštět paralelně ve Sparku.
Další informace o paralelizaci tras AutoML najdete v dokumentaci FLAML pro paralelní úlohy Sparku.
Paralelní spuštění zkušební verze AutoML
Teď spustíme zkušební verzi AutoML paralelně se zadanými nastaveními. Ke sledování experimentu v existujícím kontextu spuštění MLflow použijeme vnořené spuštění MLflow.
'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
automl.fit(dataframe=pandas_df, label='Exited', **settings)
Tím se teď spustí zkušební verze AutoML s povolenou paralelizací. Argument dataframe
je nastavený na datový rámec pandas_df
Pandas a další nastavení jsou předány fit
funkci pro paralelní provádění.
Zobrazení metrik
Po spuštění paralelní zkušební verze AutoML načtěte a zobrazte výsledky, včetně nejlepší konfigurace hyperparametrů, ROC AUC na ověřovacích datech a doby trénování nejlepšího spuštění.
''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))