Udostępnij za pośrednictwem


Tworzenie modeli za pomocą zautomatyzowanego uczenia maszynowego (wersja zapoznawcza)

Zautomatyzowane Edukacja maszynowe (AutoML) obejmuje zestaw technik i narzędzi zaprojektowanych w celu usprawnienia procesu trenowania i optymalizowania modeli uczenia maszynowego przy minimalnej interwencji człowieka. Głównym celem rozwiązania AutoML jest uproszczenie i przyspieszenie wyboru najbardziej odpowiedniego modelu uczenia maszynowego i hiperparametrów dla danego zestawu danych, zadania, które zwykle wymaga znacznej wiedzy i zasobów obliczeniowych. W ramach platformy Sieć szkieletowa analitycy danych mogą korzystać z modułu flaml.AutoML , aby zautomatyzować różne aspekty przepływów pracy uczenia maszynowego.

W tym artykule omówimy proces generowania wersji próbnej rozwiązania AutoML bezpośrednio z kodu przy użyciu zestawu danych Spark. Ponadto zapoznamy się z metodami konwertowania tych danych na ramkę danych Biblioteki Pandas i omówimy techniki równoległego przetwarzania prób eksperymentów.

Ważne

Ta funkcja jest dostępna w wersji zapoznawczej.

Wymagania wstępne

  • Uzyskaj subskrypcję usługi Microsoft Fabric. Możesz też utworzyć konto bezpłatnej wersji próbnej usługi Microsoft Fabric.

  • Zaloguj się do usługi Microsoft Fabric.

  • Użyj przełącznika środowiska po lewej stronie głównej, aby przełączyć się na środowisko usługi Synapse Nauka o danych.

    Zrzut ekranu przedstawiający menu przełącznika środowiska pokazujące, gdzie wybrać Nauka o danych.

  • Utwórz nowe środowisko sieci szkieletowej lub upewnij się, że korzystasz z środowiska Fabric Runtime 1.2 (Spark 3.4 lub nowszego) i funkcji Delta 2.4)
  • Utwórz nowy notes.
  • Dołącz notes do magazynu lakehouse. Po lewej stronie notesu wybierz pozycję Dodaj , aby dodać istniejący magazyn lakehouse lub utworzyć nowy.

Ładowanie i przygotowywanie danych

W tej sekcji określimy ustawienia pobierania danych, a następnie zapiszemy je w magazynie lakehouse.

Pobieranie danych

Ten kod blokuje pobieranie danych ze źródła zdalnego i zapisywanie ich w lakehouse

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Ładowanie danych do ramki danych platformy Spark

Poniższy blok kodu ładuje dane z pliku CSV do ramki danych platformy Spark i buforuje je w celu wydajnego przetwarzania.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Ten kod zakłada, że plik danych został pobrany i znajduje się w określonej ścieżce. Odczytuje on plik CSV do ramki danych Platformy Spark, wywnioskuje schemat i buforuje go w celu szybszego dostępu podczas kolejnych operacji.

Przygotowywanie danych

W tej sekcji wykonamy czyszczenie danych i inżynierię cech w zestawie danych.

Czyszczenie danych

Najpierw definiujemy funkcję czyszczenia danych, która obejmuje usuwanie wierszy z brakującymi danymi, usuwanie zduplikowanych wierszy na podstawie określonych kolumn i usuwanie niepotrzebnych kolumn.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

Funkcja clean_data pomaga zapewnić, że zestaw danych jest wolny od brakujących wartości i duplikatów, jednocześnie usuwając niepotrzebne kolumny.

Inżynieria cech

Następnie przeprowadzamy inżynierię cech, tworząc fikcyjne kolumny dla kolumn "Geography" i "Gender" przy użyciu kodowania jednorazowego.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

W tym miejscu używamy kodowania jednokrotnego do konwertowania kolumn kategorii na fikcyjne kolumny binarne, dzięki czemu są one odpowiednie dla algorytmów uczenia maszynowego.

Wyświetlanie oczyszczonych danych

Na koniec wyświetlimy oczyszczony i zaprojektowany przez funkcję wyświetlania zestaw danych.


display(df_clean)

Ten krok umożliwia sprawdzenie wynikowej ramki danych za pomocą zastosowanych przekształceń.

Zapisz w lakehouse

Teraz zapiszemy oczyszczony i zaprojektowany przez funkcję zestaw danych w lakehouse.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

W tym miejscu pobierzemy oczyszczoną i przekształconą ramkę df_cleandanych PySpark, i zapiszemy ją jako tabelę delty o nazwie "churn_data_clean" w lakehouse. Używamy formatu delta do wydajnego przechowywania wersji i zarządzania zestawem danych. Dzięki mode("overwrite") temu wszystkie istniejące tabele o tej samej nazwie zostaną zastąpione i zostanie utworzona nowa wersja tabeli.

Tworzenie zestawów danych testowych i szkoleniowych

Następnie utworzymy zestawy danych testowych i szkoleniowych na podstawie oczyszczonych i zaprojektowanych funkcji danych.

W podanej sekcji kodu załadujemy oczyszczony i zaprojektowany przez funkcję zestaw danych z usługi Lakehouse przy użyciu formatu delta, podzielimy go na zestawy szkoleniowe i testowe z współczynnikiem 80–20 i przygotujemy dane do uczenia maszynowego. To przygotowanie obejmuje zaimportowanie elementu VectorAssembler z usługi PySpark ML w celu połączenia kolumn funkcji w jedną kolumnę "features". Następnie użyjemy elementu VectorAssembler , aby przekształcić zestawy danych trenowania i testowania, w wyniku czego train_data ramki danych i test_data zawierają zmienną docelową "Exited" i wektory funkcji. Te zestawy danych są teraz gotowe do użycia w tworzeniu i ocenianiu modeli uczenia maszynowego.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Trenowanie modelu linii bazowej

Korzystając z danych cechowanych, wytrenujemy model uczenia maszynowego punktu odniesienia, skonfigurujemy MLflow pod kątem śledzenia eksperymentów, zdefiniuj funkcję przewidywania dla obliczeń metryk, a na koniec wyświetlimy i zarejestrujemy wynikowy wynikowy wynik ROC AUC.

Ustawianie poziomu rejestrowania

W tym miejscu skonfigurujemy poziom rejestrowania tak, aby pomijał niepotrzebne dane wyjściowe z biblioteki Synapse.ml, zachowując czyszczenie dzienników.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Konfigurowanie biblioteki MLflow

W tej sekcji skonfigurujemy platformę MLflow na potrzeby śledzenia eksperymentów. Ustawiamy nazwę eksperymentu na "automl_sample", aby zorganizować przebiegi. Ponadto włączamy automatyczne rejestrowanie, upewniając się, że parametry modelu, metryki i artefakty są automatycznie rejestrowane w usłudze MLflow.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Trenowanie i ocenianie modelu

Na koniec trenujemy model LightGBMClassifier na podanych danych treningowych. Model jest skonfigurowany z ustawieniami niezbędnymi do obsługi klasyfikacji binarnej i nierównowagi. Następnie użyjemy tego wytrenowanego modelu, aby przewidywać dane testowe. Wyodrębniamy przewidywane prawdopodobieństwa dla klasy dodatniej i prawdziwych etykiet z danych testowych. Następnie obliczamy wynik ROC AUC przy użyciu funkcji sklearn.roc_auc_score

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Z tego miejsca widzimy, że nasz wynikowy model osiąga wynik ROC AUC o wartości 84%.

Tworzenie wersji próbnej rozwiązania AutoML przy użyciu języka FLAML

W tej sekcji utworzymy wersję próbną rozwiązania AutoML przy użyciu pakietu FLAML, skonfigurujemy ustawienia wersji próbnej, przekonwertujemy zestaw danych Platformy Spark na zestaw danych pandas na platformie Spark, uruchomimy wersję próbną rozwiązania AutoML i wyświetlimy wynikowe metryki.

Konfigurowanie wersji próbnej rozwiązania AutoML

W tym miejscu zaimportujemy niezbędne klasy i moduły z pakietu FLAML i utworzymy wystąpienie rozwiązania AutoML, które będzie używane do automatyzacji potoku uczenia maszynowego.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Konfigurowanie ustawień

W tej sekcji zdefiniujemy ustawienia konfiguracji wersji próbnej rozwiązania AutoML.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Konwertowanie na bibliotekę Pandas na platformie Spark

Aby uruchomić rozwiązanie AutoML z zestawem danych opartym na platformie Spark, musimy przekonwertować go na zestaw danych biblioteki Pandas na platformie Spark przy użyciu to_pandas_on_spark funkcji . Dzięki temu flaML może wydajnie pracować z danymi.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Uruchamianie wersji próbnej rozwiązania AutoML

Teraz wykonamy wersję próbną rozwiązania AutoML. Używamy zagnieżdżonego przebiegu MLflow do śledzenia eksperymentu w istniejącym kontekście przebiegu MLflow. Wersja próbna rozwiązania AutoML jest wykonywana na zestawie danych biblioteki Pandas na platformie Spark (df_automl) ze zmienną docelową "Exited i ustawienia zdefiniowane są przekazywane do fit funkcji na potrzeby konfiguracji.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Wyświetlanie wynikowych metryk

W tej ostatniej sekcji pobierzemy i wyświetlimy wyniki wersji próbnej rozwiązania AutoML. Te metryki zapewniają wgląd w wydajność i konfigurację modelu AutoML w danym zestawie danych.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Równoległe wypełnianie wersji próbnej rozwiązania AutoML przy użyciu platformy Apache Spark

W scenariuszach, w których zestaw danych może mieścić się w jednym węźle i chcesz wykorzystać możliwości platformy Spark do jednoczesnego uruchamiania wielu równoległych wersji próbnych rozwiązania AutoML, możesz wykonać następujące kroki:

Konwertowanie na ramkę danych biblioteki Pandas

Aby umożliwić równoległe przetwarzanie danych, należy najpierw przekonwertować je na ramkę danych Biblioteki Pandas.

pandas_df = train_raw.toPandas()

W tym miejscu konwertujemy ramkę danych platformy train_raw Spark na ramkę danych biblioteki Pandas o nazwie pandas_df , aby była odpowiednia do przetwarzania równoległego.

Konfigurowanie ustawień przetwarzania równoległego

Ustaw use_spark wartość na , aby True włączyć równoległość opartą na platformie Spark. Domyślnie platforma FLAML uruchamia jedną wersję próbną dla funkcji wykonawczej. Liczbę współbieżnych prób można dostosować przy użyciu argumentu n_concurrent_trials .

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

W tych ustawieniach określamy, że chcemy używać platformy Spark do równoległości, ustawiając wartość use_spark .True Ustawiliśmy również liczbę równoczesnych prób na 3, co oznacza, że trzy wersje próbne będą uruchamiane równolegle na platformie Spark.

Aby dowiedzieć się więcej na temat równoległości szlaków rozwiązania AutoML, zapoznaj się z dokumentacją FLAML dotyczącą zadań równoległych platformy Spark.

Równoległe uruchamianie wersji próbnej rozwiązania AutoML

Teraz uruchomimy wersję próbną rozwiązania AutoML równolegle z określonymi ustawieniami. Użyjemy zagnieżdżonego przebiegu MLflow do śledzenia eksperymentu w istniejącym kontekście przebiegu MLflow.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Spowoduje to teraz wykonanie wersji próbnej rozwiązania AutoML z włączoną przetwarzaniem równoległym. Argument dataframe jest ustawiony na ramkę pandas_dfdanych Biblioteki Pandas, a inne ustawienia są przekazywane do fit funkcji na potrzeby wykonywania równoległego.

Wyświetlanie metryk

Po uruchomieniu równoległej wersji próbnej rozwiązania AutoML pobierz i wyświetl wyniki, w tym najlepszą konfigurację hiperparametrów, ROC AUC na danych walidacji oraz czas trwania trenowania najlepszego przebiegu.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))