Udostępnij za pośrednictwem


Tworzenie modeli za pomocą zautomatyzowanego uczenia maszynowego (wersja zapoznawcza)

Zautomatyzowane uczenie maszynowe (AutoML) obejmuje zestaw technik i narzędzi zaprojektowanych w celu usprawnienia procesu trenowania i optymalizowania modeli uczenia maszynowego przy minimalnej interwencji człowieka. Głównym celem rozwiązania AutoML jest uproszczenie i przyspieszenie wyboru najbardziej odpowiedniego modelu uczenia maszynowego i hiperparametrów dla danego zestawu danych, zadania, które zwykle wymaga znacznej wiedzy i zasobów obliczeniowych. W ramach platformy Sieć szkieletowa analitycy danych mogą korzystać z modułu flaml.AutoML w celu zautomatyzowania różnych aspektów przepływów pracy uczenia maszynowego.

W tym artykule omówimy proces generowania wersji próbnej rozwiązania AutoML bezpośrednio z kodu przy użyciu zestawu danych Spark. Ponadto zapoznamy się z metodami konwertowania tych danych na ramkę danych Pandas i omówimy techniki równoległego realizowania prób eksperymentalnych.

Ważny

Ta funkcja jest w wersji zapoznawczej .

Warunki wstępne

  • Utwórz nowe środowisko Fabric lub upewnij się, że korzystasz ze środowiska uruchomieniowego Fabric 1.2 (Spark 3.4 lub nowszy i Delta Lake 2.4).
  • Utwórz nowy notatnik.
  • Dołącz notatnik do lakehouse. Z lewej strony notatnika wybierz pozycję Dodaj, aby dodać istniejący lakehouse lub utworzyć nowy.

Ładowanie i przygotowywanie danych

W tej sekcji określimy ustawienia pobierania danych, a następnie zapiszemy je w lakehouse.

Pobieranie danych

Ten blok kodu pobiera dane ze zdalnego źródła i zapisuje je w lakehouse.

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Ładowanie danych do ramki danych platformy Spark

Poniższy blok kodu ładuje dane z pliku CSV do ramki danych platformy Spark i buforuje je w celu wydajnego przetwarzania.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Ten kod zakłada, że plik danych został pobrany i znajduje się w określonej ścieżce. Odczytuje on plik CSV do ramki danych Platformy Spark, wywnioskuje schemat i buforuje go w celu szybszego dostępu podczas kolejnych operacji.

Przygotowywanie danych

W tej sekcji wykonamy czyszczenie danych i inżynierię cech w zestawie danych.

Czyszczenie danych

Najpierw definiujemy funkcję czyszczenia danych, która obejmuje usuwanie wierszy z brakującymi danymi, usuwanie zduplikowanych wierszy na podstawie określonych kolumn i usuwanie niepotrzebnych kolumn.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

Funkcja clean_data pomaga upewnić się, że zestaw danych jest wolny od brakujących wartości i duplikatów, jednocześnie usuwając niepotrzebne kolumny.

Inżynieria cech/funkcji

Następnie przeprowadzamy inżynierię cech, tworząc kolumny pomocnicze dla kolumn "Geography" i "Gender" przy użyciu kodowania one-hot.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

W tym miejscu używamy kodowania jednokrotnego do konwertowania kolumn kategorii na fikcyjne kolumny binarne, dzięki czemu są one odpowiednie dla algorytmów uczenia maszynowego.

Wyświetlanie oczyszczonych danych

Na koniec wyświetlamy oczyszczony i przetworzony zestaw danych za pomocą funkcji wyświetlania.


display(df_clean)

Ten krok umożliwia sprawdzenie wynikowej ramki danych za pomocą zastosowanych przekształceń.

Zapisz w "lakehouse"

Teraz zapiszemy oczyszczony i zaprojektowany przez funkcję zestaw danych w lakehouse.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

W tym miejscu pobieramy oczyszczoną i przekształconą ramkę danych PySpark, df_clean, i zapisujemy ją jako tabelę Delta o nazwie "churn_data_clean" w lakehouse. Używamy formatu delta do wydajnego przechowywania wersji i zarządzania zestawem danych. mode("overwrite") gwarantuje, że każda istniejąca tabela o tej samej nazwie zostanie zastąpiona i zostanie utworzona nowa wersja tabeli.

Tworzenie zestawów danych testowych i szkoleniowych

Następnie utworzymy zestawy danych testowych i szkoleniowych na podstawie oczyszczonych i zaprojektowanych funkcji danych.

W podanej sekcji kodu ładujemy oczyszczony zestaw danych z przygotowanymi cechami z lakehouse'u przy użyciu formatu Delta, dzielimy go na zestawy szkoleniowe i testowe w proporcji 80–20 i przygotowujemy dane do uczenia maszynowego. To przygotowanie obejmuje zaimportowanie VectorAssembler z usługi PySpark ML w celu połączenia kolumn funkcji w jedną kolumnę "features". Następnie użyjemy VectorAssembler, aby przekształcić zestawy danych trenowania i testowania, co powoduje train_data i test_data DataFrames, które zawierają zmienną docelową "Exited" i wektory cech. Te zestawy danych są teraz gotowe do użycia w tworzeniu i ocenianiu modeli uczenia maszynowego.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Szkolenie modelu bazowego

Korzystając z danych cechowanych, wytrenujemy model bazowy uczenia maszynowego, skonfigurujemy MLflow do śledzenia eksperymentów, zdefiniujemy funkcję przewidywania dla obliczeń metryk, a na koniec wyświetlimy i zarejestrujemy wynikowy wynik ROC AUC.

Ustawianie poziomu rejestrowania

W tym miejscu konfigurujemy poziom rejestrowania, aby ograniczał niepotrzebne dane wyjściowe z biblioteki Synapse.ml, co pozwala na utrzymanie czystszych dzienników.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Konfigurowanie biblioteki MLflow

W tej sekcji skonfigurujemy platformę MLflow na potrzeby śledzenia eksperymentów. Ustawiamy nazwę eksperymentu na "automl_sample", aby zorganizować przebiegi. Ponadto włączamy automatyczne rejestrowanie, upewniając się, że parametry modelu, metryki i artefakty są automatycznie rejestrowane w usłudze MLflow.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Trenowanie i ocenianie modelu

Na koniec trenujemy model LightGBMClassifier na podanych danych treningowych. Model jest skonfigurowany z ustawieniami niezbędnymi do obsługi klasyfikacji binarnej i nierównowagi. Następnie użyjemy tego wytrenowanego modelu, aby przewidywać dane testowe. Wyodrębniamy przewidywane prawdopodobieństwa dla klasy dodatniej i prawdziwych etykiet z danych testowych. Następnie obliczamy wynik ROC AUC przy użyciu funkcji roc_auc_score sklearn.

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Stąd widzimy, że nasz model wynikowy uzyskuje wynik ROC AUC równy 84%.

Tworzenie wersji próbnej rozwiązania AutoML przy użyciu języka FLAML

W tej sekcji utworzymy wersję próbną rozwiązania AutoML przy użyciu pakietu FLAML, skonfigurujemy ustawienia wersji próbnej, przekonwertujemy zestaw danych Platformy Spark na zestaw danych pandas na platformie Spark, uruchomimy wersję próbną rozwiązania AutoML i wyświetlimy wynikowe metryki.

Konfigurowanie wersji próbnej rozwiązania AutoML

Tutaj importujemy niezbędne klasy i moduły z pakietu FLAML i tworzymy instancję AutoML, która będzie używana do automatyzacji procesu uczenia maszynowego.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Konfigurowanie ustawień

W tej sekcji zdefiniujemy ustawienia konfiguracji wersji próbnej rozwiązania AutoML.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Konwertowanie do Pandas w Spark

Aby uruchomić rozwiązanie AutoML z zestawem danych opartym na platformie Spark, musimy przekonwertować go na zestaw danych biblioteki Pandas na platformie Spark przy użyciu funkcji to_pandas_on_spark. Dzięki temu flaML może wydajnie pracować z danymi.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Uruchamianie wersji próbnej rozwiązania AutoML

Teraz wykonamy wersję próbną rozwiązania AutoML. Używamy zagnieżdżonej sesji MLflow do śledzenia eksperymentu w istniejącym kontekście sesji MLflow. Wersja próbna rozwiązania AutoML jest wykonywana na zestawie danych biblioteki Pandas na platformie Spark (df_automl) ze zmienną docelową "Exited, a zdefiniowane ustawienia są przekazywane do funkcji fit na potrzeby konfiguracji.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Wyświetlanie wynikowych metryk

W tej ostatniej sekcji pobierzemy i wyświetlimy wyniki wersji próbnej rozwiązania AutoML. Te metryki zapewniają wgląd w wydajność i konfigurację modelu AutoML w danym zestawie danych.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Równoległe wykonywanie eksperymentu AutoML z Apache Spark

W scenariuszach, w których zestaw danych może mieścić się w jednym węźle i chcesz wykorzystać możliwości platformy Spark do jednoczesnego uruchamiania wielu równoległych wersji próbnych rozwiązania AutoML, możesz wykonać następujące kroki:

Konwertuj na ramkę danych Pandas

Aby umożliwić równoległe przetwarzanie, dane muszą zostać najpierw przekonwertowane na Pandas DataFrame.

pandas_df = train_raw.toPandas()

W tym miejscu konwertujemy ramkę danych platformy Spark train_raw na ramkę danych biblioteki Pandas o nazwie pandas_df, aby była odpowiednia do przetwarzania równoległego.

Konfigurowanie ustawień przetwarzania równoległego

Ustaw use_spark na True, aby włączyć równoległość opartą na platformie Spark. Domyślnie FLAML uruchamia jedną próbę dla każdego wykonawcy. Liczbę współbieżnych prób można dostosować przy użyciu argumentu n_concurrent_trials.

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

W tych ustawieniach określamy, że chcemy używać platformy Spark do równoległego przetwarzania, ustawiając use_spark na True. Ustawiliśmy również liczbę równoczesnych prób na 3, co oznacza, że trzy wersje próbne będą uruchamiane równolegle na platformie Spark.

Aby dowiedzieć się więcej na temat równoległości szlaków rozwiązania AutoML, zapoznaj się z dokumentacją FLAML dotyczącą równoległych zadań platformy Spark.

Równoległe uruchamianie wersji próbnej rozwiązania AutoML

Teraz uruchomimy wersję próbną rozwiązania AutoML równolegle z określonymi ustawieniami. Użyjemy zagnieżdżonej sesji MLflow do śledzenia eksperymentu w ramach istniejącej sesji MLflow.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Spowoduje to teraz wykonanie wersji próbnej rozwiązania AutoML z włączoną przetwarzaniem równoległym. Argument dataframe jest ustawiony na ramkę danych Pandas pandas_df, a inne ustawienia są przekazywane do funkcji fit do wykonywania równoległego.

Wyświetlanie metryk

Po równoległym uruchomieniu AutoML pobierz i wyświetl wyniki, w tym najlepszą konfigurację hiperparametrów, wartość ROC AUC na danych walidacyjnych oraz czas trwania najlepszego przebiegu treningowego.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))