Criar modelos com ML automatizado (visualização)
O Automated Machine Learning (AutoML) engloba um conjunto de técnicas e ferramentas projetadas para agilizar o processo de treinamento e otimizar modelos de aprendizado de máquina com intervenção humana mínima. O principal objetivo do AutoML é simplificar e acelerar a seleção do modelo de aprendizado de máquina e hiperparâmetros mais adequados para um determinado conjunto de dados, uma tarefa que normalmente exige experiência considerável e recursos computacionais. Dentro da estrutura do Fabric, os cientistas de dados podem aproveitar o flaml.AutoML
módulo para automatizar vários aspetos de seus fluxos de trabalho de aprendizado de máquina.
Neste artigo, vamos nos aprofundar no processo de geração de avaliações do AutoML diretamente do código usando um conjunto de dados do Spark. Além disso, exploraremos métodos para converter esses dados em um dataframe Pandas e discutiremos técnicas para paralelizar seus testes de experimentação.
Importante
Este recurso está em pré-visualização.
Pré-requisitos
Obtenha uma assinatura do Microsoft Fabric. Ou inscreva-se para uma avaliação gratuita do Microsoft Fabric.
Entre no Microsoft Fabric.
Use o seletor de experiência no lado esquerdo da sua página inicial para alternar para a experiência Synapse Data Science.
- Crie um novo ambiente de malha ou verifique se você está executando no Fabric Runtime 1.2 (Spark 3.4 (ou superior) e Delta 2.4)
- Crie um novo bloco de notas.
- Ligue o seu bloco de notas a uma casa no lago. No lado esquerdo do seu bloco de notas, selecione Adicionar para adicionar uma lakehouse existente ou criar uma nova.
Carregar e preparar dados
Nesta seção, especificaremos as configurações de download para os dados e, em seguida, salvá-los na casa do lago.
Transferir dados
Este bloco de código baixa os dados de uma fonte remota e os salva na casa do lago
import os
import requests
IS_CUSTOM_DATA = False # if TRUE, dataset has to be uploaded manually
if not IS_CUSTOM_DATA:
# Specify the remote URL where the data is hosted
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
# List of data files to download
file_list = ["churn.csv"]
# Define the download path within the lakehouse
download_path = "/lakehouse/default/Files/churn/raw"
# Check if the lakehouse directory exists; if not, raise an error
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
# Create the download directory if it doesn't exist
os.makedirs(download_path, exist_ok=True)
# Download each data file if it doesn't already exist in the lakehouse
for fname in file_list:
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
Carregar dados em um dataframe do Spark
O bloco de código a seguir carrega os dados do arquivo CSV em um Spark DataFrame e os armazena em cache para um processamento eficiente.
df = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv("Files/churn/raw/churn.csv")
.cache()
)
Esse código pressupõe que o arquivo de dados foi baixado e está localizado no caminho especificado. Ele lê o arquivo CSV em um Spark DataFrame, infere o esquema e o armazena em cache para acesso mais rápido durante as operações subsequentes.
Preparar os dados
Nesta seção, executaremos a limpeza de dados e a engenharia de recursos no conjunto de dados.
Limpar dados
Primeiro, definimos uma função para limpar os dados, que inclui soltar linhas com dados ausentes, remover linhas duplicadas com base em colunas específicas e descartar colunas desnecessárias.
# Define a function to clean the data
def clean_data(df):
# Drop rows with missing data across all columns
df = df.dropna(how="all")
# Drop duplicate rows based on 'RowNumber' and 'CustomerId'
df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
# Drop columns: 'RowNumber', 'CustomerId', 'Surname'
df = df.drop('RowNumber', 'CustomerId', 'Surname')
return df
# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")
# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)
A clean_data
função ajuda a garantir que o conjunto de dados esteja livre de valores ausentes e duplicados, removendo colunas desnecessárias.
Desenvolvimento de funcionalidades
Em seguida, executamos engenharia de recursos criando colunas fictícias para as colunas 'Geografia' e 'Gênero' usando codificação a quente.
# Import PySpark functions
from pyspark.sql import functions as F
# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
"*",
F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)
# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")
Aqui, usamos codificação one-hot para converter colunas categóricas em colunas fictícias binárias, tornando-as adequadas para algoritmos de aprendizado de máquina.
Exibir dados limpos
Finalmente, exibimos o conjunto de dados limpo e projetado por recursos usando a função de exibição.
display(df_clean)
Esta etapa permite que você inspecione o DataFrame resultante com as transformações aplicadas.
Guardar em lakehouse
Agora, vamos salvar o conjunto de dados limpo e projetado por recursos para a casa do lago.
# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")
Aqui, pegamos o PySpark DataFrame df_clean
limpo e transformado e o salvamos como uma tabela Delta chamada "churn_data_clean" na casa do lago. Usamos o formato Delta para controle de versão e gerenciamento eficientes do conjunto de dados. O mode("overwrite")
garante que qualquer tabela existente com o mesmo nome seja substituída e uma nova versão da tabela seja criada.
Criar conjuntos de dados de teste e treinamento
Em seguida, criaremos os conjuntos de dados de teste e treinamento a partir dos dados limpos e projetados por recursos.
Na seção de código fornecido, carregamos um conjunto de dados limpo e projetado por recursos da casa do lago usando o formato Delta, o dividimos em conjuntos de treinamento e teste com uma proporção de 80-20 e preparamos os dados para aprendizado de máquina. Essa preparação envolve a VectorAssembler
importação do PySpark ML para combinar colunas de feição em uma única coluna de "recursos". Posteriormente, usamos o VectorAssembler
para transformar os conjuntos de dados de treinamento e teste, resultando em train_data
DataFrames test_data
que contêm a variável de destino "Exited" e os vetores de recurso. Esses conjuntos de dados agora estão prontos para uso na criação e avaliação de modelos de aprendizado de máquina.
# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler
# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")
# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)
# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]
# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]
Modelo de linha de base do trem
Usando os dados featurizados, treinaremos um modelo de aprendizado de máquina de linha de base, configuraremos o MLflow para rastreamento de experimentos, definiremos uma função de previsão para cálculo de métricas e, finalmente, visualizaremos e registraremos a pontuação ROC AUC resultante.
Definir nível de registo
Aqui, configuramos o nível de log para suprimir a saída desnecessária da biblioteca de Synapse.ml, mantendo os logs mais limpos.
import logging
logging.getLogger('synapse.ml').setLevel(logging.ERROR)
Configurar MLflow
Nesta seção, configuramos o MLflow para rastreamento de experimentos. Definimos o nome do experimento como "automl_sample" para organizar as corridas. Além disso, habilitamos o registro automático, garantindo que os parâmetros, métricas e artefatos do modelo sejam registrados automaticamente no MLflow.
import mlflow
# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)
Treinar e avaliar o modelo
Finalmente, treinamos um modelo LightGBMClassifier nos dados de treinamento fornecidos. O modelo é configurado com as configurações necessárias para classificação binária e manipulação de desequilíbrios. Em seguida, usamos esse modelo treinado para fazer previsões sobre os dados de teste. Extraímos as probabilidades previstas para a classe positiva e os rótulos verdadeiros dos dados do teste. Depois, calculamos a pontuação ROC AUC usando a função sklearn roc_auc_score
.
from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score
# Assuming you have already defined 'train_data' and 'test_data'
with mlflow.start_run(run_name="default") as run:
# Create a LightGBMClassifier model with specified settings
model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
# Fit the model to the training data
model = model.fit(train_data)
# Get the predictions
predictions = model.transform(test_data)
# Extract the predicted probabilities for the positive class
y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()
# Extract the true labels from the 'test_data' DataFrame
y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()
# Compute the ROC AUC score
roc_auc = roc_auc_score(y_true, y_pred)
# Log the ROC AUC score with MLflow
mlflow.log_metric("ROC_AUC", roc_auc)
# Print or log the ROC AUC score
print("ROC AUC Score:", roc_auc)
A partir daqui, podemos ver que nosso modelo resultante alcança uma pontuação ROC AUC de 84%.
Crie uma versão de avaliação do AutoML com FLAML
Nesta seção, criaremos uma avaliação do AutoML usando o pacote FLAML, definiremos as configurações de avaliação, converteremos o conjunto de dados do Spark em um conjunto de dados do Pandas on Spark, executaremos a avaliação do AutoML e visualizaremos as métricas resultantes.
Configurar a versão de avaliação do AutoML
Aqui, importamos as classes e módulos necessários do pacote FLAML e criamos uma instância do AutoML, que será usada para automatizar o pipeline de aprendizado de máquina.
# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark
# Create an AutoML instance
automl = AutoML()
Configurar definições
Nesta seção, definimos as definições de configuração para a avaliação do AutoML.
# Define AutoML settings
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"log_file_name": 'flaml_experiment.log', # FLAML log file
"seed": 41, # Random seed
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Converter para Pandas no Spark
Para executar o AutoML com um conjunto de dados baseado no Spark, precisamos convertê-lo em um conjunto de dados Pandas on Spark usando a to_pandas_on_spark
função. Isso permite que o FLAML trabalhe com os dados de forma eficiente.
# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)
Execute a versão de avaliação do AutoML
Agora, executamos a versão de avaliação do AutoML. Usamos uma execução de MLflow aninhada para acompanhar o experimento dentro do contexto de execução de MLflow existente. O teste AutoML é realizado no conjunto de dados Pandas on Spark (df_automl
) com a variável de destino "Exited
e as configurações definidas são passadas para a fit
função para configuração.
'''The main flaml automl API'''
with mlflow.start_run(nested=True):
automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)
Ver métricas resultantes
Nesta seção final, recuperamos e exibimos os resultados da avaliação do AutoML. Essas métricas fornecem informações sobre o desempenho e a configuração do modelo AutoML no conjunto de dados fornecido.
# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))
Paralelize sua avaliação do AutoML com o Apache Spark
Em cenários em que seu conjunto de dados pode caber em um único nó e você deseja aproveitar o poder do Spark para executar várias avaliações paralelas do AutoML simultaneamente, você pode seguir estas etapas:
Converter para dataframe Pandas
Para habilitar a paralelização, seus dados devem primeiro ser convertidos em um Pandas DataFrame.
pandas_df = train_raw.toPandas()
Aqui, convertemos o train_raw
Spark DataFrame em um Pandas DataFrame nomeado pandas_df
para torná-lo adequado para processamento paralelo.
Definir configurações de paralelização
Defina use_spark
como True
para habilitar o paralelismo baseado em faísca. Por padrão, o FLAML iniciará uma avaliação por executor. Você pode personalizar o número de avaliações simultâneas usando o n_concurrent_trials
argumento.
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"seed": 41, # Random seed
"use_spark": True, # Enable Spark-based parallelism
"n_concurrent_trials": 3, # Number of concurrent trials to run
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Nessas configurações, especificamos que queremos utilizar o Spark para paralelismo definindo use_spark
como True
. Também definimos o número de testes simultâneos para 3, o que significa que três testes serão executados em paralelo no Spark.
Para saber mais sobre como paralelizar suas trilhas AutoML, você pode visitar a documentação do FLAML para trabalhos paralelos do Spark.
Execute a versão de avaliação do AutoML em paralelo
Agora, executaremos a versão de avaliação do AutoML em paralelo com as configurações especificadas. Usaremos uma execução de MLflow aninhada para acompanhar o experimento dentro do contexto de execução de MLflow existente.
'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
automl.fit(dataframe=pandas_df, label='Exited', **settings)
Isso agora executará a versão de avaliação do AutoML com paralelização habilitada. O dataframe
argumento é definido como o Pandas DataFrame pandas_df
, e outras configurações são passadas para a fit
função para execução paralela.
Ver métricas
Depois de executar a avaliação paralela do AutoML, recupere e exiba os resultados, incluindo a melhor configuração de hiperparâmetros, a AUC do ROC nos dados de validação e a duração do treinamento da execução com melhor desempenho.
''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))