Partilhar via


Crie um modelo de aprendizado de máquina com o Apache Spark MLlib

Neste artigo, você aprenderá a usar o Apache Spark MLlib para criar um aplicativo de aprendizado de máquina que lida com análise preditiva simples em um conjunto de dados aberto do Azure. O Spark fornece bibliotecas de aprendizado de máquina integradas. Este exemplo usa a classificação por meio de regressão logística.

As principais bibliotecas SparkML e MLlib Spark fornecem muitos utilitários que são úteis para tarefas de aprendizado de máquina. Estes utilitários são adequados para:

  • Classificação
  • Clustering
  • Teste de hipóteses e cálculo de estatísticas amostrais
  • Regressão
  • Decomposição de valor singular (SVD) e análise de componentes principais (ACP)
  • Modelagem de tópicos

Compreender a classificação e a regressão logística

A classificação, uma tarefa popular de aprendizado de máquina, envolve classificar os dados de entrada em categorias. Um algoritmo de classificação deve descobrir como atribuir rótulos aos dados de entrada fornecidos. Por exemplo, um algoritmo de aprendizado de máquina pode aceitar informações de estoque como entrada e dividir o estoque em duas categorias: ações que você deve vender e ações que você deve manter.

O algoritmo de regressão logística é útil para a classificação. A API de regressão logística do Spark é útil para a classificação binária de dados de entrada em um dos dois grupos. Para obter mais informações sobre regressão logística, consulte Wikipedia.

A regressão logística produz uma função logística que pode prever a probabilidade de um vetor de entrada pertencer a um grupo ou outro.

Exemplo de análise preditiva de dados de táxi de Nova York

Primeiro, instale o azureml-opendatasets. Os dados estão disponíveis através do recurso Azure Open Datasets . Este subconjunto de dados hospeda informações sobre viagens de táxi amarelo, incluindo as horas de início, horas de término, locais de início, locais de término, custos de viagem e outros atributos.

%pip install azureml-opendatasets

O restante deste artigo depende do Apache Spark para primeiro realizar algumas análises nos dados de dicas de viagem de táxi de Nova York e, em seguida, desenvolver um modelo para prever se uma viagem específica inclui uma dica ou não.

Criar um modelo de aprendizado de máquina do Apache Spark

  1. Crie um bloco de anotações PySpark. Para obter mais informações, visite Criar um bloco de anotações.

  2. Importe os tipos necessários para este bloco de anotações.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Usaremos o MLflow para acompanhar nossos experimentos de aprendizado de máquina e execuções correspondentes. Se o log automático do Microsoft Fabric estiver habilitado, as métricas e os parâmetros correspondentes serão capturados automaticamente.

    import mlflow
    

Construir o DataFrame de entrada

Este exemplo carrega os dados em um dataframe Pandas e, em seguida, converte-os em um dataframe Apache Spark. Nesse formato, podemos aplicar outras operações do Apache Spark para limpar e filtrar o conjunto de dados.

  1. Cole essas linhas em uma nova célula e execute-as para criar um DataFrame do Spark. Esta etapa recupera os dados por meio da API Open Datasets. Podemos filtrar esses dados para examinar uma janela específica de dados. O exemplo de código usa start_date e end_date para aplicar um filtro que retorna um único mês de dados.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Esse código reduz o conjunto de dados para cerca de 10.000 linhas. Para acelerar o desenvolvimento e o treinamento, o código mostra nosso conjunto de dados por enquanto.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Queremos examinar nossos dados usando o comando integrado display() . Com este comando, podemos visualizar facilmente uma amostra de dados ou explorar graficamente as tendências nos dados.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Preparar os dados

A preparação de dados é uma etapa crucial no processo de aprendizado de máquina. Envolve limpeza, transformação e organização de dados brutos, para torná-los adequados para análise e modelagem. Neste exemplo de código, você executa várias etapas de preparação de dados:

  • Filtrar o conjunto de dados para remover valores atípicos e incorretos
  • Remover colunas que não são necessárias para o treinamento do modelo
  • Criar novas colunas a partir dos dados brutos
  • Gere uma etiqueta para determinar se uma determinada viagem de táxi envolve ou não uma gorjeta
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Em seguida, faça uma segunda passagem sobre os dados para adicionar os recursos finais.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Criar um modelo de regressão logística

A tarefa final converte os dados rotulados em um formato que a regressão logística pode manipular. A entrada para um algoritmo de regressão logística deve ter uma estrutura de pares vetoriais label/feature, onde o vetor feature é um vetor de números que representam o ponto de entrada.

Com base nos requisitos da tarefa final, devemos converter as colunas categóricas em números. Especificamente, devemos converter as trafficTimeBins colunas e weekdayString em representações inteiras. Temos muitas opções disponíveis para lidar com este requisito. Este exemplo envolve a OneHotEncoder abordagem:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Essa ação resulta em um novo DataFrame com todas as colunas no formato adequado para treinar um modelo.

Treinar um modelo de regressão logística

A primeira tarefa divide o conjunto de dados em um conjunto de treinamento e um conjunto de teste ou validação.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Depois de termos dois DataFrames, devemos criar a fórmula do modelo e executá-la no DataFrame de treinamento. Em seguida, podemos validar em relação ao dataFrame de teste. Experimente diferentes versões da fórmula do modelo para ver os efeitos de diferentes combinações.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

As saídas da célula:

Area under ROC = 0.9749430523917996

Criar uma representação visual da previsão

Agora podemos construir uma visualização final para interpretar os resultados do modelo. Uma curva ROC certamente pode apresentar o resultado.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Gráfico que mostra a curva ROC para regressão logística no modelo de ponta.