Tworzenie modeli za pomocą zautomatyzowanego uczenia maszynowego (wersja zapoznawcza)
Zautomatyzowane uczenie maszynowe (AutoML) obejmuje zestaw technik i narzędzi zaprojektowanych w celu usprawnienia procesu trenowania i optymalizowania modeli uczenia maszynowego przy minimalnej interwencji człowieka. Głównym celem rozwiązania AutoML jest uproszczenie i przyspieszenie wyboru najbardziej odpowiedniego modelu uczenia maszynowego i hiperparametrów dla danego zestawu danych, zadania, które zwykle wymaga znacznej wiedzy i zasobów obliczeniowych. W ramach platformy Sieć szkieletowa analitycy danych mogą korzystać z modułu flaml.AutoML
w celu zautomatyzowania różnych aspektów przepływów pracy uczenia maszynowego.
W tym artykule omówimy proces generowania wersji próbnej rozwiązania AutoML bezpośrednio z kodu przy użyciu zestawu danych Spark. Ponadto zapoznamy się z metodami konwertowania tych danych na ramkę danych Pandas i omówimy techniki równoległego realizowania prób eksperymentalnych.
Warunki wstępne
Pobierz subskrypcję usługi Microsoft Fabric . Możesz też utworzyć bezpłatne konto wersji próbnej usługi Microsoft Fabric.
Zaloguj się do usługi Microsoft Fabric.
Użyj przełącznika widoku w lewej dolnej części strony głównej, aby przełączyć się na Fabric.
- Utwórz nowe środowisko Fabric lub upewnij się, że korzystasz ze środowiska uruchomieniowego Fabric 1.2 (Spark 3.4 lub nowszy i Delta Lake 2.4).
- Utwórz nowy notatnik.
- Dołącz notatnik do lakehouse. Z lewej strony notatnika wybierz pozycję Dodaj, aby dodać istniejący lakehouse lub utworzyć nowy.
Ładowanie i przygotowywanie danych
W tej sekcji określimy ustawienia pobierania danych, a następnie zapiszemy je w lakehouse.
Pobieranie danych
Ten blok kodu pobiera dane ze zdalnego źródła i zapisuje je w lakehouse.
import os
import requests
IS_CUSTOM_DATA = False # if TRUE, dataset has to be uploaded manually
if not IS_CUSTOM_DATA:
# Specify the remote URL where the data is hosted
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
# List of data files to download
file_list = ["churn.csv"]
# Define the download path within the lakehouse
download_path = "/lakehouse/default/Files/churn/raw"
# Check if the lakehouse directory exists; if not, raise an error
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
# Create the download directory if it doesn't exist
os.makedirs(download_path, exist_ok=True)
# Download each data file if it doesn't already exist in the lakehouse
for fname in file_list:
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
Ładowanie danych do ramki danych platformy Spark
Poniższy blok kodu ładuje dane z pliku CSV do ramki danych platformy Spark i buforuje je w celu wydajnego przetwarzania.
df = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv("Files/churn/raw/churn.csv")
.cache()
)
Ten kod zakłada, że plik danych został pobrany i znajduje się w określonej ścieżce. Odczytuje on plik CSV do ramki danych Platformy Spark, wywnioskuje schemat i buforuje go w celu szybszego dostępu podczas kolejnych operacji.
Przygotowywanie danych
W tej sekcji wykonamy czyszczenie danych i inżynierię cech w zestawie danych.
Czyszczenie danych
Najpierw definiujemy funkcję czyszczenia danych, która obejmuje usuwanie wierszy z brakującymi danymi, usuwanie zduplikowanych wierszy na podstawie określonych kolumn i usuwanie niepotrzebnych kolumn.
# Define a function to clean the data
def clean_data(df):
# Drop rows with missing data across all columns
df = df.dropna(how="all")
# Drop duplicate rows based on 'RowNumber' and 'CustomerId'
df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
# Drop columns: 'RowNumber', 'CustomerId', 'Surname'
df = df.drop('RowNumber', 'CustomerId', 'Surname')
return df
# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")
# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)
Funkcja clean_data
pomaga upewnić się, że zestaw danych jest wolny od brakujących wartości i duplikatów, jednocześnie usuwając niepotrzebne kolumny.
Inżynieria cech/funkcji
Następnie przeprowadzamy inżynierię cech, tworząc kolumny pomocnicze dla kolumn "Geography" i "Gender" przy użyciu kodowania one-hot.
# Import PySpark functions
from pyspark.sql import functions as F
# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
"*",
F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)
# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")
W tym miejscu używamy kodowania jednokrotnego do konwertowania kolumn kategorii na fikcyjne kolumny binarne, dzięki czemu są one odpowiednie dla algorytmów uczenia maszynowego.
Wyświetlanie oczyszczonych danych
Na koniec wyświetlamy oczyszczony i przetworzony zestaw danych za pomocą funkcji wyświetlania.
display(df_clean)
Ten krok umożliwia sprawdzenie wynikowej ramki danych za pomocą zastosowanych przekształceń.
Zapisz w "lakehouse"
Teraz zapiszemy oczyszczony i zaprojektowany przez funkcję zestaw danych w lakehouse.
# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")
W tym miejscu pobieramy oczyszczoną i przekształconą ramkę danych PySpark, df_clean
, i zapisujemy ją jako tabelę Delta o nazwie "churn_data_clean" w lakehouse. Używamy formatu delta do wydajnego przechowywania wersji i zarządzania zestawem danych.
mode("overwrite")
gwarantuje, że każda istniejąca tabela o tej samej nazwie zostanie zastąpiona i zostanie utworzona nowa wersja tabeli.
Tworzenie zestawów danych testowych i szkoleniowych
Następnie utworzymy zestawy danych testowych i szkoleniowych na podstawie oczyszczonych i zaprojektowanych funkcji danych.
W podanej sekcji kodu ładujemy oczyszczony zestaw danych z przygotowanymi cechami z lakehouse'u przy użyciu formatu Delta, dzielimy go na zestawy szkoleniowe i testowe w proporcji 80–20 i przygotowujemy dane do uczenia maszynowego. To przygotowanie obejmuje zaimportowanie VectorAssembler
z usługi PySpark ML w celu połączenia kolumn funkcji w jedną kolumnę "features". Następnie użyjemy VectorAssembler
, aby przekształcić zestawy danych trenowania i testowania, co powoduje train_data
i test_data
DataFrames, które zawierają zmienną docelową "Exited" i wektory cech. Te zestawy danych są teraz gotowe do użycia w tworzeniu i ocenianiu modeli uczenia maszynowego.
# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler
# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")
# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)
# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]
# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]
Szkolenie modelu bazowego
Korzystając z danych cechowanych, wytrenujemy model bazowy uczenia maszynowego, skonfigurujemy MLflow do śledzenia eksperymentów, zdefiniujemy funkcję przewidywania dla obliczeń metryk, a na koniec wyświetlimy i zarejestrujemy wynikowy wynik ROC AUC.
Ustawianie poziomu rejestrowania
W tym miejscu konfigurujemy poziom rejestrowania, aby ograniczał niepotrzebne dane wyjściowe z biblioteki Synapse.ml, co pozwala na utrzymanie czystszych dzienników.
import logging
logging.getLogger('synapse.ml').setLevel(logging.ERROR)
Konfigurowanie biblioteki MLflow
W tej sekcji skonfigurujemy platformę MLflow na potrzeby śledzenia eksperymentów. Ustawiamy nazwę eksperymentu na "automl_sample", aby zorganizować przebiegi. Ponadto włączamy automatyczne rejestrowanie, upewniając się, że parametry modelu, metryki i artefakty są automatycznie rejestrowane w usłudze MLflow.
import mlflow
# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)
Trenowanie i ocenianie modelu
Na koniec trenujemy model LightGBMClassifier na podanych danych treningowych. Model jest skonfigurowany z ustawieniami niezbędnymi do obsługi klasyfikacji binarnej i nierównowagi. Następnie użyjemy tego wytrenowanego modelu, aby przewidywać dane testowe. Wyodrębniamy przewidywane prawdopodobieństwa dla klasy dodatniej i prawdziwych etykiet z danych testowych. Następnie obliczamy wynik ROC AUC przy użyciu funkcji roc_auc_score
sklearn.
from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score
# Assuming you have already defined 'train_data' and 'test_data'
with mlflow.start_run(run_name="default") as run:
# Create a LightGBMClassifier model with specified settings
model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
# Fit the model to the training data
model = model.fit(train_data)
# Get the predictions
predictions = model.transform(test_data)
# Extract the predicted probabilities for the positive class
y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()
# Extract the true labels from the 'test_data' DataFrame
y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()
# Compute the ROC AUC score
roc_auc = roc_auc_score(y_true, y_pred)
# Log the ROC AUC score with MLflow
mlflow.log_metric("ROC_AUC", roc_auc)
# Print or log the ROC AUC score
print("ROC AUC Score:", roc_auc)
Stąd widzimy, że nasz model wynikowy uzyskuje wynik ROC AUC równy 84%.
Tworzenie wersji próbnej rozwiązania AutoML przy użyciu języka FLAML
W tej sekcji utworzymy wersję próbną rozwiązania AutoML przy użyciu pakietu FLAML, skonfigurujemy ustawienia wersji próbnej, przekonwertujemy zestaw danych Platformy Spark na zestaw danych pandas na platformie Spark, uruchomimy wersję próbną rozwiązania AutoML i wyświetlimy wynikowe metryki.
Konfigurowanie wersji próbnej rozwiązania AutoML
Tutaj importujemy niezbędne klasy i moduły z pakietu FLAML i tworzymy instancję AutoML, która będzie używana do automatyzacji procesu uczenia maszynowego.
# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark
# Create an AutoML instance
automl = AutoML()
Konfigurowanie ustawień
W tej sekcji zdefiniujemy ustawienia konfiguracji wersji próbnej rozwiązania AutoML.
# Define AutoML settings
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"log_file_name": 'flaml_experiment.log', # FLAML log file
"seed": 41, # Random seed
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Konwertowanie do Pandas w Spark
Aby uruchomić rozwiązanie AutoML z zestawem danych opartym na platformie Spark, musimy przekonwertować go na zestaw danych biblioteki Pandas na platformie Spark przy użyciu funkcji to_pandas_on_spark
. Dzięki temu flaML może wydajnie pracować z danymi.
# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)
Uruchamianie wersji próbnej rozwiązania AutoML
Teraz wykonamy wersję próbną rozwiązania AutoML. Używamy zagnieżdżonej sesji MLflow do śledzenia eksperymentu w istniejącym kontekście sesji MLflow. Wersja próbna rozwiązania AutoML jest wykonywana na zestawie danych biblioteki Pandas na platformie Spark (df_automl
) ze zmienną docelową "Exited
, a zdefiniowane ustawienia są przekazywane do funkcji fit
na potrzeby konfiguracji.
'''The main flaml automl API'''
with mlflow.start_run(nested=True):
automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)
Wyświetlanie wynikowych metryk
W tej ostatniej sekcji pobierzemy i wyświetlimy wyniki wersji próbnej rozwiązania AutoML. Te metryki zapewniają wgląd w wydajność i konfigurację modelu AutoML w danym zestawie danych.
# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))
Równoległe wykonywanie eksperymentu AutoML z Apache Spark
W scenariuszach, w których zestaw danych może mieścić się w jednym węźle i chcesz wykorzystać możliwości platformy Spark do jednoczesnego uruchamiania wielu równoległych wersji próbnych rozwiązania AutoML, możesz wykonać następujące kroki:
Konwertuj na ramkę danych Pandas
Aby umożliwić równoległe przetwarzanie, dane muszą zostać najpierw przekonwertowane na Pandas DataFrame.
pandas_df = train_raw.toPandas()
W tym miejscu konwertujemy ramkę danych platformy Spark train_raw
na ramkę danych biblioteki Pandas o nazwie pandas_df
, aby była odpowiednia do przetwarzania równoległego.
Konfigurowanie ustawień przetwarzania równoległego
Ustaw use_spark
na True
, aby włączyć równoległość opartą na platformie Spark. Domyślnie FLAML uruchamia jedną próbę dla każdego wykonawcy. Liczbę współbieżnych prób można dostosować przy użyciu argumentu n_concurrent_trials
.
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"seed": 41, # Random seed
"use_spark": True, # Enable Spark-based parallelism
"n_concurrent_trials": 3, # Number of concurrent trials to run
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
W tych ustawieniach określamy, że chcemy używać platformy Spark do równoległego przetwarzania, ustawiając use_spark
na True
. Ustawiliśmy również liczbę równoczesnych prób na 3, co oznacza, że trzy wersje próbne będą uruchamiane równolegle na platformie Spark.
Aby dowiedzieć się więcej na temat równoległości szlaków rozwiązania AutoML, zapoznaj się z dokumentacją FLAML dotyczącą równoległych zadań platformy Spark.
Równoległe uruchamianie wersji próbnej rozwiązania AutoML
Teraz uruchomimy wersję próbną rozwiązania AutoML równolegle z określonymi ustawieniami. Użyjemy zagnieżdżonej sesji MLflow do śledzenia eksperymentu w ramach istniejącej sesji MLflow.
'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
automl.fit(dataframe=pandas_df, label='Exited', **settings)
Spowoduje to teraz wykonanie wersji próbnej rozwiązania AutoML z włączoną przetwarzaniem równoległym. Argument dataframe
jest ustawiony na ramkę danych Pandas pandas_df
, a inne ustawienia są przekazywane do funkcji fit
do wykonywania równoległego.
Wyświetlanie metryk
Po równoległym uruchomieniu AutoML pobierz i wyświetl wyniki, w tym najlepszą konfigurację hiperparametrów, wartość ROC AUC na danych walidacyjnych oraz czas trwania najlepszego przebiegu treningowego.
''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))