Создание моделей с помощью автоматизированного машинного обучения (предварительная версия)
Автоматизированная Машинное обучение (AutoML) включает набор методов и инструментов, предназначенных для упрощения процесса обучения и оптимизации моделей машинного обучения с минимальным вмешательством человека. Основной целью AutoML является упрощение и ускорение выбора наиболее подходящей модели машинного обучения и гиперпараметров для заданного набора данных, задача, которая обычно требует значительных знаний и вычислительных ресурсов. В платформе Fabric специалисты по обработке и анализу данных могут использовать flaml.AutoML
модуль для автоматизации различных аспектов рабочих процессов машинного обучения.
В этой статье мы рассмотрим процесс создания пробных версий AutoML непосредственно из кода с помощью набора данных Spark. Кроме того, мы рассмотрим методы преобразования этих данных в кадр данных Pandas и обсудим методы параллелизации проб экспериментов.
Внимание
Эта функция доступна в предварительной версии.
Необходимые компоненты
Получение подписки Microsoft Fabric. Или зарегистрируйте бесплатную пробную версию Microsoft Fabric.
Войдите в Microsoft Fabric.
Используйте переключатель интерфейса в левой части домашней страницы, чтобы перейти на интерфейс Synapse Обработка и анализ данных.
- Создайте новую среду Fabric или убедитесь, что вы работаете в среде выполнения Fabric 1.2 (Spark 3.4 или более поздней версии) и Delta 2.4.
- Создайте записную книжку.
- Подключите записную книжку к lakehouse. В левой части записной книжки нажмите кнопку "Добавить", чтобы добавить существующее озеро или создать новую.
Загрузка и подготовка данных
В этом разделе мы укажем параметры скачивания для данных, а затем сохраните его в lakehouse.
Загрузка данных
Этот блок кода загружает данные из удаленного источника и сохраняет его в lakehouse.
import os
import requests
IS_CUSTOM_DATA = False # if TRUE, dataset has to be uploaded manually
if not IS_CUSTOM_DATA:
# Specify the remote URL where the data is hosted
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
# List of data files to download
file_list = ["churn.csv"]
# Define the download path within the lakehouse
download_path = "/lakehouse/default/Files/churn/raw"
# Check if the lakehouse directory exists; if not, raise an error
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
# Create the download directory if it doesn't exist
os.makedirs(download_path, exist_ok=True)
# Download each data file if it doesn't already exist in the lakehouse
for fname in file_list:
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
Загрузка данных в кадр данных Spark
Следующий блок кода загружает данные из CSV-файла в кадр данных Spark и кэширует его для эффективной обработки.
df = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv("Files/churn/raw/churn.csv")
.cache()
)
Этот код предполагает, что файл данных скачан и расположен в указанном пути. Он считывает CSV-файл в кадр данных Spark, выводит схему и кэширует его для ускорения доступа во время последующих операций.
Подготовка данных
В этом разделе мы будем выполнять очистку данных и проектирование функций в наборе данных.
Очистка данных.
Во-первых, мы определим функцию очистки данных, которая включает удаление строк с отсутствующими данными, удаление повторяющихся строк на основе определенных столбцов и удаление ненужных столбцов.
# Define a function to clean the data
def clean_data(df):
# Drop rows with missing data across all columns
df = df.dropna(how="all")
# Drop duplicate rows based on 'RowNumber' and 'CustomerId'
df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
# Drop columns: 'RowNumber', 'CustomerId', 'Surname'
df = df.drop('RowNumber', 'CustomerId', 'Surname')
return df
# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")
# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)
Функция clean_data
помогает убедиться, что набор данных не содержит отсутствующих значений и дубликатов при удалении ненужных столбцов.
Проектирование признаков
Затем мы выполняем проектирование функций, создавая фиктивные столбцы для столбцов Geography и Gender с помощью одно горячей кодировки.
# Import PySpark functions
from pyspark.sql import functions as F
# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
"*",
F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)
# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")
Здесь мы используем одно-горячую кодировку для преобразования категориальных столбцов в двоичные фиктивные столбцы, что делает их подходящими для алгоритмов машинного обучения.
Отображение чистых данных
Наконец, мы отображаем чистый и встроенный набор данных с помощью функции отображения.
display(df_clean)
Этот шаг позволяет проверить результирующий кадр данных с примененными преобразованиями.
Сохранение в Lakehouse
Теперь мы сохраним очищенный и встроенный набор данных в lakehouse.
# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")
Здесь мы берем чистый и преобразованный Кадр df_clean
данных PySpark, и сохраните его в виде таблицы Delta с именем "churn_data_clean" в лейкхаусе. Мы используем разностный формат для эффективного управления версиями и управления набором данных. Гарантирует mode("overwrite")
, что любая существующая таблица с тем же именем перезаписывается и создается новая версия таблицы.
Создание тестовых и обучающих наборов данных
Затем мы создадим тестовые и обучающие наборы данных из очищенных и встроенных данных.
В предоставленном разделе кода мы загружаем чистый и встроенный набор данных из lakehouse с помощью разностного формата, разделяем его на наборы обучения и тестирования с коэффициентом 80-20 и подготавливаем данные для машинного обучения. Эта подготовка включает импорт VectorAssembler
из PySpark ML для объединения столбцов признаков в один столбец "функции". Впоследствии мы используем VectorAssembler
для преобразования наборов данных для обучения и тестирования, что приводит к train_data
test_data
тому, что кадры данных содержат целевую переменную "Exited" и векторы признаков. Эти наборы данных теперь готовы к использованию в создании и оценке моделей машинного обучения.
# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler
# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")
# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)
# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]
# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]
Обучение базовой модели
С помощью признаков данных мы обучим базовую модель машинного обучения, настройте MLflow для отслеживания экспериментов, определите функцию прогнозирования для вычисления метрик и, наконец, просмотрите и зайдите в журнал итоговую оценку ROC AUC.
Настройка уровня ведения журнала
Здесь мы настраиваем уровень ведения журнала для подавления ненужных выходных данных из библиотеки Synapse.ml, сохраняя журналы более чистыми.
import logging
logging.getLogger('synapse.ml').setLevel(logging.ERROR)
Настройка MLflow
В этом разделе мы настроим MLflow для отслеживания экспериментов. Присвойте имени эксперимента значение "automl_sample", чтобы упорядочить запуски. Кроме того, мы включите автоматическое ведение журнала, обеспечивая автоматический вход параметров модели, метрик и артефактов в MLflow.
import mlflow
# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)
Обучение и оценка модели
Наконец, мы обучаем модель LightGBMClassifier на предоставленных обучающих данных. Модель настроена с необходимыми параметрами для двоичной классификации и обработки дисбаланса. Затем мы используем обученную модель для прогнозирования на тестовых данных. Мы извлекаем прогнозируемые вероятности для положительного класса и истинные метки из тестовых данных. Затем мы вычислим оценку AUC ROC с помощью функции sklearn roc_auc_score
.
from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score
# Assuming you have already defined 'train_data' and 'test_data'
with mlflow.start_run(run_name="default") as run:
# Create a LightGBMClassifier model with specified settings
model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
# Fit the model to the training data
model = model.fit(train_data)
# Get the predictions
predictions = model.transform(test_data)
# Extract the predicted probabilities for the positive class
y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()
# Extract the true labels from the 'test_data' DataFrame
y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()
# Compute the ROC AUC score
roc_auc = roc_auc_score(y_true, y_pred)
# Log the ROC AUC score with MLflow
mlflow.log_metric("ROC_AUC", roc_auc)
# Print or log the ROC AUC score
print("ROC AUC Score:", roc_auc)
Здесь мы видим, что полученная модель достигает оценки AUC ROC на 84%.
Создание пробной версии AutoML с помощью FLAML
В этом разделе мы создадим пробную версию AutoML с помощью пакета FLAML, настройте параметры пробной версии, преобразуем набор данных Spark в набор данных Pandas в Spark, запустите пробную версию AutoML и просмотрите полученные метрики.
Настройка пробной версии AutoML
Здесь мы импортируем необходимые классы и модули из пакета FLAML и создадим экземпляр AutoML, который будет использоваться для автоматизации конвейера машинного обучения.
# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark
# Create an AutoML instance
automl = AutoML()
Настройка параметров
В этом разделе мы определим параметры конфигурации для пробной версии AutoML.
# Define AutoML settings
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"log_file_name": 'flaml_experiment.log', # FLAML log file
"seed": 41, # Random seed
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Преобразование в Pandas в Spark
Чтобы запустить AutoML с набором данных на основе Spark, необходимо преобразовать его в набор данных Pandas в набор данных Spark с помощью to_pandas_on_spark
функции. Это позволяет FLAML эффективно работать с данными.
# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)
Запуск пробной версии AutoML
Теперь мы выполняем пробную версию AutoML. Мы используем вложенный запуск MLflow для отслеживания эксперимента в существующем контексте выполнения MLflow. Пробная версия AutoML выполняется на Pandas в наборе данных Spark (df_automl
) с целевой переменной "Exited
и определенные параметры передаются fit
в функцию для настройки.
'''The main flaml automl API'''
with mlflow.start_run(nested=True):
automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)
Просмотр результирующей метрики
В этом заключительном разделе мы извлекаем и отображаем результаты пробной версии AutoML. Эти метрики предоставляют аналитические сведения о производительности и конфигурации модели AutoML в заданном наборе данных.
# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))
Параллелизация пробной версии AutoML с помощью Apache Spark
В сценариях, когда набор данных может входить в один узел, и вы хотите использовать возможности Spark для одновременного запуска нескольких параллельных пробных версий AutoML, выполните следующие действия.
Преобразование в кадр данных Pandas
Чтобы включить параллелизацию, данные необходимо сначала преобразовать в кадр данных Pandas.
pandas_df = train_raw.toPandas()
Здесь мы преобразуем train_raw
кадр данных Spark в кадр данных Pandas DataFrame с именем pandas_df
, чтобы сделать его подходящим для параллельной обработки.
Настройка параметров параллелизации
Установите для use_spark
True
включения параллелизма на основе Spark. По умолчанию FLAML запускает одну пробную версию для каждого исполнителя. Число одновременных пробных версий можно настроить с помощью аргумента n_concurrent_trials
.
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"seed": 41, # Random seed
"use_spark": True, # Enable Spark-based parallelism
"n_concurrent_trials": 3, # Number of concurrent trials to run
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
В этих параметрах мы указываем, что мы хотим использовать Spark для параллелизма, задав для этого use_spark
значение True
. Мы также задали число параллельных пробных версий 3, что означает, что три пробные версии будут выполняться параллельно в Spark.
Дополнительные сведения о параллелизации маршрутов AutoML см. в документации FLAML для параллельных заданий Spark.
Параллельное выполнение пробной версии AutoML
Теперь мы будем запускать пробную версию AutoML параллельно с указанными параметрами. Мы будем использовать вложенный запуск MLflow для отслеживания эксперимента в существующем контексте выполнения MLflow.
'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
automl.fit(dataframe=pandas_df, label='Exited', **settings)
Теперь будет выполнена пробная версия AutoML с включенной параллелизацией. Аргумент dataframe
имеет значение Pandas DataFrame pandas_df
, а другие параметры передаются функции для параллельного fit
выполнения.
Просмотр метрик
После выполнения параллельной пробной версии AutoML извлеките и отобразите результаты, включая лучшую конфигурацию гиперпараметров, ROC AUC для данных проверки и продолжительность обучения наилучшего выполнения.
''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))