Compartilhar via


Criar modelos com ML automatizado (preview)

O machine learning automatizado (AutoML) engloba um conjunto de técnicas e ferramentas desenvolvidas para simplificar o processo de treinamento e otimização de modelos de machine learning com o mínimo de intervenção humana. O principal objetivo do AutoML é simplificar e acelerar a seleção do modelo de machine learning e dos hiperparâmetros mais adequados para um determinado conjunto de dados, uma tarefa que costuma demandar competências e recursos computacionais consideráveis. Na estrutura do Fabric, os cientistas de dados podem aproveitar o módulo flaml.AutoML para automatizar vários aspectos dos fluxos de trabalho de aprendizado de máquina.

Neste artigo, vamos nos aprofundar no processo de gerar avaliações de AutoML diretamente do código usando um conjunto de dados do Spark. Além disso, exploraremos métodos para converter esses dados em um DataFrame do Pandas e discutiremos técnicas para paralelizar as avaliações de experimentação.

Importante

Esse recurso está em preview.

Pré-requisitos

  • Crie um Ambiente do Fabric ou verifique se está executando no Fabric Runtime 1.2 (Spark 3.4 ou superior e Delta 2.4)
  • Criar um notebook novo
  • Anexe o notebook a um lakehouse. No lado esquerdo, selecione Adicionar para adicionar um lakehouse existente ou criar um.

Carregar e preparar dados

Nesta seção, especificaremos as configurações de download dos dados e os salvaremos no Lakehouse.

Baixar dados

Esse bloco de código baixa os dados de uma origem remota e os salva no Lakehouse

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Carregar dados em um DataFrame do Spark

O bloco de código a seguir carrega os dados do arquivo CSV em um DataFrame do Spark e os armazena em cache para um processamento eficiente.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Esse código pressupõe que o arquivo de dados foi baixado e está localizado no caminho especificado. Ele lê o arquivo CSV em um DataFrame do Spark, infere o esquema e o armazena em cache para acesso mais rápido durante as operações subsequentes.

Preparar os dados

Nesta seção, executaremos a limpeza de dados e a engenharia de recursos no conjunto de dados.

Limpar dados

Primeiro, definimos uma função para limpar os dados, que inclui descartar linhas com dados ausentes, remover linhas duplicadas com base em colunas específicas e descartar colunas desnecessárias.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

A função clean_data ajuda a garantir que o conjunto de dados esteja livre de valores ausentes e duplicatas, removendo colunas desnecessárias.

Engenharia de recursos

Em seguida, realizamos a engenharia de recursos criando colunas fictícias para as colunas "Geografia" e "Gênero" usando codificação one-hot.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Aqui, usamos codificação one-hot para converter colunas categóricas em colunas fictícias binárias, tornando-as adequadas para algoritmos de aprendizado de máquina.

Exibir dados limpos

Por fim, exibimos o conjunto de dados limpo e criado pela engenharia de recursos usando a função de exibição.


display(df_clean)

Esta etapa permite inspecionar o DataFrame resultante com as transformações aplicadas.

Salvar no Lakehouse

Agora, salvamos o conjunto de dados limpo e criado pela engenharia de recursos no Lakehouse.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Aqui, pegamos o DataFrame do PySpark limpo e transformado, df_clean, e o salvamos como uma tabela Delta chamada "churn_data_clean" no Lakehouse. Usamos o formato Delta para controle de versão e gerenciamento eficientes do conjunto de dados. O mode("overwrite") garante que qualquer tabela existente com o mesmo nome seja substituída e uma nova versão dela seja criada.

Criar conjuntos de dados de treinamento e teste

Em seguida, criamos os conjuntos de dados de teste e treinamento com os dados limpos da engenharia de recursos.

Na seção de código fornecida, carregamos um conjunto de dados limpo da engenharia de recursos do Lakehouse usando o formato Delta, o dividimos em conjuntos de treinamento e de teste com uma proporção de 80–20 e preparamos os dados para o aprendizado de máquina. Essa preparação envolve importar VectorAssembler do ML do PySpark para combinar colunas de recursos em uma única coluna de "recursos". Posteriormente, usamos VectorAssembler para transformar os conjuntos de dados de treinamento e de teste, resultando nos DataFrames train_data e test_data que contêm a variável de destino "Exited" e os vetores de recurso. Esses conjuntos de dados agora estão prontos para uso na criação e avaliação de modelos de machine learning.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Treinar modelo de linha de base

Usando os dados apresentados, treinamos um modelo de machine learning de linha de base, configuramos o MLflow para acompanhamento de experimentos, definimos uma função de previsão para o cálculo de métricas e, por fim, visualizamos e registramos em log a pontuação ROC AUC resultante.

Definir o nível de registros em log

Aqui, configuramos o nível de log para suprimir a saída desnecessária da biblioteca Synapse.ml, mantendo os logs mais limpos.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Configurar o MLflow

Nesta seção, configuramos o MLflow para acompanhamento de experimentos. Definimos o nome do experimento como "automl_sample" para organizar as execuções. Além disso, habilitamos o registro em log automático, garantindo que os parâmetros, as métricas e os artefatos do modelo sejam registrados automaticamente no MLflow.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Treinar e avaliar o modelo

Por fim, treinamos um modelo LightGBMClassifier nos dados de treinamento fornecidos. O modelo é definido com as configurações necessárias para classificação binária e tratamento de desequilíbrio. Depois, usamos esse modelo treinado para fazer previsões nos dados de teste. Extraímos as probabilidades previstas para a classe positiva e os rótulos verdadeiros dos dados do teste. Em seguida, calculamos a pontuação ROC AUC usando a função roc_auc_score da Sklearn.

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

A partir daqui, podemos ver que nosso modelo resultante alcança uma pontuação ROC AUC de 84%.

Criar uma avaliação de AutoML com FLAML

Nesta seção, criaremos uma avaliação de AutoML usando o pacote FLAML, definiremos as configurações de avaliação, converteremos o conjunto de dados do Spark em um conjunto de dados do Pandas no Spark, executaremos a avaliação de AutoML e visualizaremos as métricas resultantes.

Configurar a avaliação de AutoML

Aqui, importamos as classes e módulos necessários do pacote FLAML e criamos uma instância de AutoML, que será usada para automatizar o pipeline de aprendizado de máquina.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Definir configurações

Nesta seção, definimos as configurações da avaliação de AutoML.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Converter para o Pandas no Spark

Para executar o AutoML com um conjunto de dados baseado no Spark, precisamos convertê-lo em um conjunto de dados do Pandas no Spark usando a função to_pandas_on_spark. Isso permite que a FLAML trabalhe com os dados de forma eficiente.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Executar a avaliação de AutoML

Agora, executamos a avaliação de AutoML. Usamos uma execução de MLflow aninhada para acompanhar o experimento no contexto de execução do MLflow existente. A avaliação de AutoML é executada no conjunto de dados do Pandas no Spark (df_automl) com a variável de destino "Exited e as configurações definidas são passadas à função fit para configuração.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Visualizar métricas resultantes

Nesta seção final, recuperamos e visualizamos os resultados da avaliação de AutoML. Essas métricas fornecem insights sobre o desempenho e a configuração do modelo de AutoML no conjunto de dados fornecido.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Paralelizar a avaliação de AutoML com o Apache Spark

Em cenários em que o conjunto de dados pode caber em um único nó e você deseja aproveitar o poder do Spark para executar várias avaliações paralelas de AutoML simultaneamente, siga estas etapas:

Converter ao DataFrame do Pandas

Para habilitar a paralelização, os dados devem primeiro ser convertidos em um DataFrame do Pandas.

pandas_df = train_raw.toPandas()

Aqui, convertemos o DataFrame train_raw do Spark em um DataFrame do Pandas denominado pandas_df para adequá-lo ao processamento paralelo.

Definir configurações de paralelização

Defina use_spark como True para habilitar o paralelismo baseado no Spark. Por padrão, a FLAML inicia uma avaliação por executor. Você pode personalizar o número de avaliações simultâneas usando o argumento n_concurrent_trials.

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

Nessas configurações, especificamos que queremos usar o Spark para paralelismo definindo use_spark como True. Também definimos o número de avaliações simultâneas para 3, o que significa que três avaliações serão executadas em paralelo no Spark.

Para saber mais sobre como paralelizar as trilhas de AutoML, visite a documentação da FLAML para trabalhos paralelos do Spark.

Executar a avaliação de AutoML em paralelo

Agora, executamos a avaliação de AutoML em paralelo com as configurações especificadas. Usaremos uma execução de MLflow aninhada para acompanhar o experimento no contexto de execução de MLflow existente.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Agora, isso executará a avaliação de AutoML com a paralelização habilitada. O argumento dataframe é definido como o DataFrame pandas_df do Pandas e outras configurações são passadas à função fit para execução paralela.

Métricas de exibição

Depois de executar a avaliação paralela de AutoML, recupere e exiba os resultados, incluindo a melhor configuração de hiperparâmetro, a pontuação ROC AUC nos dados de validação e a duração do treinamento da execução de melhor desempenho.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))