Compartir vía


Creación de modelos con ML automatizado (versión preliminar)

El aprendizaje automático automatizado (Automated Machine Learning, AutoML) abarca un conjunto de técnicas y herramientas diseñadas para simplificar el proceso de entrenamiento y optimizar los modelos de aprendizaje automático con una intervención humana mínima. El objetivo principal de AutoML es simplificar y acelerar la selección del modelo de aprendizaje automático y los hiperparámetros más adecuados para un conjunto de datos determinado, una tarea que normalmente requiere una experiencia considerable y recursos computacionales. Dentro del marco de Fabric, los científicos de datos pueden aprovechar el módulo flaml.AutoML para automatizar varios aspectos de sus flujos de trabajo de aprendizaje automático.

En este artículo, profundizaremos en el proceso de generación de pruebas de AutoML directamente desde el código mediante un conjunto de datos de Spark. Además, exploraremos métodos para convertir estos datos en una trama de datos de Pandas y analizaremos técnicas para paralelizar las pruebas de experimentación.

Importante

Esta característica se encuentra en versión preliminar.

Requisitos previos

  • Cree un nuevo entorno de Fabric o asegúrese de que se ejecuta en Fabric Runtime 1.2 (Spark 3.4 o superior y Delta 2.4)
  • Creación de un cuaderno nuevo.
  • Adjunte el cuaderno a un almacén de lago. En la parte izquierda de su cuaderno, seleccione Añadir para añadir una casa lacustre existente o crear una nueva.

Carga y preparación de datos

En esta sección, especificaremos la configuración de descarga de los datos y, a continuación, la guardaremos en el almacén de lago.

Descarga de datos

Este bloque de código descarga los datos de un origen remoto y lo guarda en el almacén de lago.

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Cargar datos en una trama de datos (dataframe) de Spark

El siguiente bloque de código carga los datos del archivo CSV en un dataframe de Spark y lo almacena en caché para un procesamiento eficaz.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Este código supone que se ha descargado el archivo de datos y se encuentra en la ruta de acceso especificada. Lee el archivo CSV en un dataframe de Spark, deduce el esquema y lo almacena en caché para un acceso más rápido durante las operaciones posteriores.

Preparación de los datos

En esta sección, realizaremos la limpieza de datos y la ingeniería de características en el conjunto de datos.

Limpiar datos

En primer lugar, definimos una función para limpiar los datos, lo que incluye quitar filas con datos que faltan, quitar filas duplicadas basadas en columnas específicas y quitar columnas innecesarias.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

La función clean_data ayuda a garantizar que el conjunto de datos está libre de valores que faltan y duplicados al quitar columnas innecesarias.

Ingeniería de características

A continuación, se realiza la ingeniería de características mediante la creación de columnas ficticias para las columnas "Geography" y "Gender" mediante la codificación one-hot.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Aquí, usamos la codificación de un solo uso para convertir columnas de categorías en columnas ficticias binarias, lo que las convierte en adecuadas para algoritmos de aprendizaje automático.

Mostrar datos limpios

Por último, se muestra el conjunto de datos limpio y con ingeniería de características mediante la función de visualización.


display(df_clean)

Este paso le permite inspeccionar el dataframe resultante con las transformaciones aplicadas.

Guardar en el almacén de lago

Ahora, guardaremos el conjunto de datos limpio y con ingeniería de características en el lago.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Aquí, tomamos el DataFrame de PySpark limpio y transformado, df_clean, y lo guardamos como una tabla Delta denominada "churn_data_clean" en el lago. Usamos el formato Delta para una administración y control eficientes del conjunto de datos. mode("overwrite") garantiza que se sobrescribe cualquier tabla existente con el mismo nombre y se crea una nueva versión de la tabla.

Creación de conjuntos de datos de entrenamiento y prueba

A continuación, crearemos los conjuntos de datos de prueba y entrenamiento a partir de los datos limpios y con ingeniería de características.

En la sección de código proporcionada, cargamos un conjunto de datos limpio y con ingeniería de características desde el almacén de lago mediante el formato Delta, lo dividimos en conjuntos de entrenamiento y pruebas con una relación de 80 a 20 y preparamos los datos para el aprendizaje automático. Esta preparación implica importar desde VectorAssembler de PySpark ML para combinar columnas de características en una sola columna "características". Posteriormente, usamos VectorAssembler para transformar los conjuntos de datos de entrenamiento y pruebas, lo que da como resultado DataFrames train_data y test_data que contienen la variable de destino "Exited" y los vectores de características. Estos conjuntos de datos ya están listos para su uso en la creación y evaluación de modelos de aprendizaje automático.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Entrenar un modelo de línea base

Con los datos con características, entrenaremos un modelo de aprendizaje automático de línea base, configuraremos MLflow para el seguimiento de experimentos, definiremos una función de predicción para el cálculo de métricas y, por último, veremos y registraremos la puntuación de ROC AUC resultante.

Establecimiento del nivel de registro

Aquí, se configura el nivel de registro para suprimir la salida innecesaria de la biblioteca de Synapse.ml, lo que mantiene los registros más limpios.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Configurar MLflow

En esta sección, se configura MLflow para el seguimiento de experimentos. Establecemos el nombre del experimento en "automl_sample" para organizar las ejecuciones. Además, se habilita el registro automático, lo que garantiza que los parámetros del modelo, las métricas y los artefactos se registran automáticamente en MLflow.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Entrenamiento y evaluación del modelo

Por último, entrenamos un modelo LightGBMClassifier en los datos de entrenamiento proporcionados. El modelo se configura con los valores necesarios para la clasificación binaria y el control de desequilibrios. A continuación usamos el modelo entrenado para realizar predicciones en un conjunto de datos de prueba. Extraemos las probabilidades predichas para la clase positiva y las etiquetas verdaderas de los datos de prueba. Después, calculamos la puntuación de ROC AUC mediante la función roc_auc_score de sklearn.

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Desde aquí, podemos ver que nuestro modelo resultante logra una puntuación ROC AUC del 84 %.

Creación de una versión de prueba de AutoML con FLAML

En esta sección, crearemos una versión de prueba de AutoML mediante el paquete FLAML, configuraremos las opciones de prueba, convertiremos el conjunto de datos de Spark en un conjunto de datos de Pandas en Spark, ejecutaremos la versión de prueba de AutoML y veremos las métricas resultantes.

Configuración de la prueba de AutoML

Aquí se importan las clases y módulos necesarios desde el paquete FLAML y se crea una instancia de AutoML, que se usará para automatizar la canalización de aprendizaje automático.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Configurar las opciones

En esta sección, definimos las opciones de configuración de la versión de prueba de AutoML.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Conversión a Pandas en Spark

Para ejecutar AutoML con un conjunto de datos basado en Spark, es necesario convertirlo en un conjunto de datos de Pandas en Spark mediante la función to_pandas_on_spark. Esto permite que FLAML funcione con los datos de forma eficaz.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Ejecución de la versión de prueba de AutoML

Ahora, ejecutamos la versión de prueba de AutoML. Usamos una ejecución de MLflow anidada para realizar un seguimiento del experimento dentro del contexto de ejecución de MLflow existente. La prueba de AutoML se realiza en el conjunto de datos de Pandas en Spark (df_automl) con la variable de destino "Exited y la configuración definida se pasa a la función fit para la configuración.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Visualización de métricas resultantes

En esta sección final, se recuperan y se muestran los resultados de la prueba de AutoML. Estas métricas proporcionan información sobre el rendimiento y la configuración del modelo de AutoML en el conjunto de datos determinado.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Paralelización de la versión de prueba de AutoML con Apache Spark

En escenarios en los que el conjunto de datos puede caber en un solo nodo y desea aprovechar la eficacia de Spark para ejecutar varias pruebas de AutoML paralelas simultáneamente, puede seguir estos pasos:

Conversión a dataframe de Pandas

Para habilitar la paralelización, los datos deben convertirse primero en un DataFrame de Pandas.

pandas_df = train_raw.toPandas()

En este caso, convertimos el DataFrame de Spark train_raw en un DataFrame de Pandas denominado pandas_df a fin de que se adecue al procesamiento paralelo.

Configuración de las opciones de paralelización

Establezca use_spark en True para habilitar el paralelismo basado en Spark. De forma predeterminada, FLAML iniciará una prueba por ejecutor. Puede personalizar el número de pruebas simultáneas mediante el argumento n_concurrent_trials.

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

En esta configuración, especificamos que queremos usar Spark para paralelismo estableciendo use_spark en True. También establecemos el número de pruebas simultáneas en 3, lo que significa que tres pruebas se ejecutarán en paralelo en Spark.

Para obtener más información sobre cómo paralelizar los recorridos de AutoML, puede visitar la documentación de FLAML para trabajos paralelos de Spark.

Ejecución de la versión de prueba de AutoML en paralelo

Ahora, ejecutaremos la versión de prueba de AutoML en paralelo con la configuración especificada. Usaremos una ejecución de MLflow anidada para realizar un seguimiento del experimento dentro del contexto de ejecución de MLflow existente.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Esto ejecutará ahora la versión de prueba de AutoML con la paralelización habilitada. El argumento dataframe se establece en el DataFrame de Pandas pandas_df y se pasan otras configuraciones a la función fit para la ejecución en paralelo.

Visualización de métricas

Después de ejecutar la versión de prueba de AutoML paralela, recupere y muestre los resultados, incluida la mejor configuración de hiperparámetros, ROC AUC en los datos de validación y la duración del entrenamiento de la ejecución con mejor rendimiento.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))