Mallien luominen automaattianalyysipalveluilla (esikatselu)
Automaattiset Automaattianalyysipalvelut (AutoML) sisältävät joukon tekniikoita ja työkaluja, joiden avulla voidaan tehostaa koneoppimismallien harjoittamista ja optimointia mahdollisimman pienellä inhimillisellä väliintulolla. Automaattianalyysipalveluiden ensisijaisena tavoitteena on yksinkertaistaa ja nopeuttaa tietyn tietojoukon sopivimman koneoppimismallin ja hyperparametrien valintaa. Tämä on tehtävä, joka vaatii yleensä huomattavaa asiantuntemusta ja laskennallisia resursseja. Fabric-kehyksessä datatieteilijät voivat automatisoida koneoppimisen työnkulkujen eri näkökohtia moduulin flaml.AutoML
avulla.
Tässä artikkelissa perehdytaan automaattianalyysipalveluiden kokeiluversioiden luomiseen suoraan koodista Spark-tietojoukon avulla. Lisäksi tutkimme menetelmiä näiden tietojen muuntamiseksi Pandas-tietokehykseksi ja keskustelemme tekniikoista kokeilujen rinnakkaistamiseksi.
Edellytykset
Hanki Microsoft Fabric -tilaus. Voit myös rekisteröityä ilmaiseen Microsoft Fabric -kokeiluversioon.
Siirry Synapse Data Science -käyttökokemukseen aloitussivun vasemmassa reunassa olevan käyttökokemuksen vaihtajan avulla.
- Luo uusi Fabric-ympäristö tai varmista, että käytössäsi ovat Fabric Runtime 1.2 (Spark 3.4 (tai uudempi) ja Delta 2.4)
- Luo uusi muistikirja.
- Liitä muistikirjasi Lakehouseen. Valitse muistikirjasi vasemmasta reunasta Lisää , jos haluat lisätä aiemmin luodun lakehousen tai luoda uuden.
Lataa ja valmistele tiedot
Tässä osiossa määritämme tietojen latausasetukset ja tallennamme ne sitten Lakehouse-tallennustilaan.
Lataa tiedot
Tämä koodilohko lataa tiedot etälähteestä ja tallentaa ne Lakehouse-järjestelmään.
import os
import requests
IS_CUSTOM_DATA = False # if TRUE, dataset has to be uploaded manually
if not IS_CUSTOM_DATA:
# Specify the remote URL where the data is hosted
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
# List of data files to download
file_list = ["churn.csv"]
# Define the download path within the lakehouse
download_path = "/lakehouse/default/Files/churn/raw"
# Check if the lakehouse directory exists; if not, raise an error
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
# Create the download directory if it doesn't exist
os.makedirs(download_path, exist_ok=True)
# Download each data file if it doesn't already exist in the lakehouse
for fname in file_list:
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
Tietojen lataaminen Spark-tietokehykseen
Seuraava koodilohko lataa TIEDOT CSV-tiedostosta Spark DataFrameen ja tallentaa ne välimuistiin tehokasta käsittelyä varten.
df = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv("Files/churn/raw/churn.csv")
.cache()
)
Tässä koodissa oletetaan, että datatiedosto on ladattu ja että se sijaitsee määritetyssä polussa. Se lukee CSV-tiedoston Spark DataFrame -kehykseen, muodostaa rakenteen ja tallentaa sen välimuistiin, jotta sitä on helpompi käyttää seuraavien toimintojen aikana.
Tietojen valmistelu
Tässä osiossa teemme tietojoukolle tietojen siistimisen ja ominaisuuksien suunnittelun.
Tietojen siistiminen
Ensin määritämme funktion tietojen siistimiseksi. Siihen kuuluu rivien pudottaminen puuttuvista tiedoista, kaksoiskappalerivien poistaminen tiettyjen sarakkeiden perusteella ja tarpeettomien sarakkeiden pudottaminen.
# Define a function to clean the data
def clean_data(df):
# Drop rows with missing data across all columns
df = df.dropna(how="all")
# Drop duplicate rows based on 'RowNumber' and 'CustomerId'
df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
# Drop columns: 'RowNumber', 'CustomerId', 'Surname'
df = df.drop('RowNumber', 'CustomerId', 'Surname')
return df
# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")
# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)
Funktion clean_data
avulla varmistetaan, että tietojoukossa ei ole puuttuvia arvoja ja kaksoiskappaleita samalla, kun poistetaan tarpeettomia sarakkeita.
Ominaisuuksien suunnittelu
Seuraavaksi teemme ominaisuussuunnittelun luomalla tarpeettomat sarakkeet Geography- ja Gender-sarakkeille käyttämällä yhtä kuumaa koodausta.
# Import PySpark functions
from pyspark.sql import functions as F
# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
"*",
F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)
# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")
Tässä käytämme yhden kuuman koodauksen avulla luokittaisia sarakkeita binaarista tyhjiä sarakkeita varten, jolloin ne sopivat koneoppimisen algoritmeja varten.
Siistittyjen tietojen näyttäminen
Lopuksi näytämme siistityn ja ominaisuuspohjaisen tietojoukon näyttöfunktion avulla.
display(df_clean)
Tämän vaiheen avulla voit tarkastaa tuloksena saatavan DataFrame-kehyksen käyttäen käytössä olevia muunnoksia.
Tallenna Lakehouseen
Nyt tallennamme siistityn ja ominaisuusteknisen tietojoukon Lakehouseen.
# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")
Tässä otamme siistityn ja muunnetun PySpark DataFramen, df_clean
, ja tallennamme sen Delta-taulukoksi nimeltä "churn_data_clean" lakehousessa. Delta-muotoa käytetään tietojoukon tehokkaaseen versioinniin ja hallintaan. mode("overwrite")
varmistaa, että kaikki samannimiset olemassa olevat taulukot korvataan ja että taulukosta luodaan uusi versio.
Testi- ja koulutustietojoukkojen luominen
Seuraavaksi luomme testi- ja koulutustietojoukot siistityistä ja ominaisuusteknistetyistä tiedoista.
Annetussa koodiosiossa lataamme siistityn ja ominaisuuspohjaisen tietojoukon Lakehouse-muodosta Delta-muotoa käyttäen, jaamme sen koulutus- ja testausjoukkoihin 80–20-suhteessa ja valmistelemme tiedot koneoppimista varten. Valmistelu edellyttää, että tuot PySpark ML:n VectorAssembler
ominaisuussarakkeiden yhdistämiseksi yhdeksi ominaisuussarakkeeksi. Tämän jälkeen muunnamme VectorAssembler
harjoitus- ja testaustietojoukot, minkä tuloksena train_data
saadaan sekä test_data
datakehykset, jotka sisältävät kohdemuuttujan "Exited" ja ominaisuusvektorit. Nämä tietojoukot ovat nyt valmiita käytettäväksi koneoppimismallien luomisessa ja arvioinnissa.
# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler
# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")
# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)
# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]
# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]
Junan perusmalli
Käyttämällä esimerkittyjä tietoja harjoitamme perustason koneoppimismallin, määritämme MLflow-kohteen kokeiluja varten, määritämme ennustefunktion mittarien laskemista varten ja lopuksi tarkastelemme ja kirjaamme tulokseksi saadut ROC AUC -pisteet.
Kirjaamisen tason määrittäminen
Tässä määritämme kirjaustason estääksemme Synapse.ml kirjaston tarpeettomat tulokset, mikä pitää lokit siistimpinä.
import logging
logging.getLogger('synapse.ml').setLevel(logging.ERROR)
MLflow'n määrittäminen
Tässä osiossa MLflow määritetään kokeilemisen seurantaa varten. Määritämme kokeilun nimeksi "automl_sample" suoritusten järjestämiseksi. Lisäksi automaattisen kirjaamisen avulla varmistetaan, että mallin parametrit, mittarit ja artefaktit kirjataan automaattisesti MLflow'hin.
import mlflow
# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)
Mallin harjoittaminen ja arvioiminen
Lopuksi harjoitamme LightGBMClassifier-mallin annetuista harjoitustiedoista. Malliin on määritetty tarvittavat asetukset binaariluokitusta ja epätasapainon käsittelyä varten. Tämän harjoitetun mallin avulla tehdään ennusteita testitiedoista. Poimimme testitiedoista positiivisen luokan ja tosi-otsikoiden ennustetut todennäköisyydet. Sen jälkeen laskemme ROC AUC -pistemäärän sklearnin funktiolla roc_auc_score
.
from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score
# Assuming you have already defined 'train_data' and 'test_data'
with mlflow.start_run(run_name="default") as run:
# Create a LightGBMClassifier model with specified settings
model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
# Fit the model to the training data
model = model.fit(train_data)
# Get the predictions
predictions = model.transform(test_data)
# Extract the predicted probabilities for the positive class
y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()
# Extract the true labels from the 'test_data' DataFrame
y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()
# Compute the ROC AUC score
roc_auc = roc_auc_score(y_true, y_pred)
# Log the ROC AUC score with MLflow
mlflow.log_metric("ROC_AUC", roc_auc)
# Print or log the ROC AUC score
print("ROC AUC Score:", roc_auc)
Täältä voimme nähdä, että tuloksena saatava mallimme saa ROC AUC -pistemäärän 84%.
Automaattianalyysipalveluiden kokeiluversion luominen FLAML:n avulla
Tässä osiossa luomme automaattianalyysipalveluiden kokeiluversion FLAML-paketilla, määritämme kokeiluasetukset, muunnamme Spark-tietojoukon Pandas on Spark -tietojoukoksi, suoritamme AutoML-kokeilun ja tarkastelemme tuloksena saatuja arvoja.
Automaattianalyysipalveluiden kokeiluversion määrittäminen
Tässä tuomme tarvittavat luokat ja moduulit FLAML-paketista ja luomme automaattianalyysipalveluiden esiintymän, jota käytetään koneoppimisputken automatisointeihin.
# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark
# Create an AutoML instance
automl = AutoML()
Määritä asetukset
Tässä osiossa määritämme automaattianalyysipalveluiden kokeiluversion määritysasetukset.
# Define AutoML settings
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"log_file_name": 'flaml_experiment.log', # FLAML log file
"seed": 41, # Random seed
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Muunna Pandasiksi Sparkissä
Jos haluat suorittaa automaattianalyysipalveluita Spark-pohjaisen tietojoukon kanssa, se on muunnettava Pandas on Spark -tietojoukoksi -funktion to_pandas_on_spark
avulla. Näin FLAML voi käsitellä tietoja tehokkaasti.
# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)
Automaattianalyysipalveluiden kokeiluversion suorittaminen
Nyt suoritamme automaattianalyysipalveluiden kokeiluversion. Seuraamme sisäkkäisen MLflow-suorituksen avulla kokeilua olemassa olevassa MLflow-suorituskontekstissa. AutoML-kokeilu suoritetaan Pandas on Spark -tietojoukossa (df_automl
) kohdemuuttujalla "Exited
ja määritetyt asetukset välitetään funktiolle fit
määritettäväksi.
'''The main flaml automl API'''
with mlflow.start_run(nested=True):
automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)
Tarkastele tuloksena saatuja mittareita
Tässä viimeisessä osiossa noudetaan ja näytetään automaattianalyysipalveluiden kokeiluversion tulokset. Nämä mittarit tarjoavat merkityksellisiä tietoja automaattianalyysipalvelumallin suorituskyvystä ja määrityksestä kyseisessä tietojoukossa.
# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))
AutoML-kokeiluversion rinnakkaisottaminen Apache Sparkin avulla
Jos tietojoukkosi mahtuu yhteen solmuun ja haluat hyödyntää Sparkin tehoa useiden samanaikaisten automaattianalyysipalveluiden kokeilujen suorittamisessa samanaikaisesti, voit tehdä seuraavat toimet:
Muunna Pandas-tietokehykseksi
Rinnakkaistumisen mahdollistamiseksi tiedot on ensin muunnettava Pandas DataFrame -kehykseksi.
pandas_df = train_raw.toPandas()
Tässä spark-tietokehyksen train_raw
voi muuntaa nimetyksi Pandas DataFrame -kehykseksi pandas_df
, jotta se sopii rinnakkaiseen käsittelyyn.
Parallelisoinnin asetusten määrittäminen
True
Ota Spark-pohjainen rinnakkaisuus käyttöön määrittämällä use_spark
se. OLETUSARVOisesti FLAML käynnistää yhden kokeiluversion suoritettavaa tiedostoa kohden. Voit mukauttaa samanaikaisten kokeilujen määrää käyttämällä -argumenttia n_concurrent_trials
.
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"seed": 41, # Random seed
"use_spark": True, # Enable Spark-based parallelism
"n_concurrent_trials": 3, # Number of concurrent trials to run
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Näissä asetuksissa määritämme, että haluamme käyttää Sparkiä rinnakkaisuudessa määrittämällä asetukseksi use_spark
True
. Määritämme myös samanaikaisten kokeiden määräksi 3, mikä tarkoittaa sitä, että kolme kokeiluversiota suoritetaan rinnakkain Sparkissä.
Saat lisätietoja automaattianalyysipalveluiden jälkien rinnakkaistamisesta käymällä FLAML-dokumentaatiossa rinnakkaisten Spark-töiden osalta.
Automaattianalyysipalveluiden kokeiluversion suorittaminen rinnakkain
Nyt suoritamme AutoML-kokeiluversion rinnakkain määritettyjen asetusten kanssa. Seuraamme sisäkkäisen MLflow-suorituksen avulla kokeilua olemassa olevassa MLflow-suorituskontekstissa.
'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
automl.fit(dataframe=pandas_df, label='Exited', **settings)
Tämä suorittaa nyt automaattianalyysipalveluiden kokeiluversion, jossa rinnakkaisuus on käytössä. Argumenttina dataframe
on Pandas DataFrame pandas_df
, ja muut asetukset välitetään funktiolle fit
rinnakkaista suoritusta varten.
Näytä mittarit
Kun olet suorittanut rinnakkaisen autoML-kokeilun, nouda ja näytä tulokset, mukaan lukien paras hyperparametrimääritys, ROC AUC vahvistustiedoissa ja parhaan suorituksen harjoittamisen kesto.
''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))