Partage via


Créer un modèle Machine Learning avec Apache Spark MLlib

Cet article explique comment utiliser Apache Spark MLlib pour créer une application d’apprentissage automatique qui traite l’analyse prédictive simple sur un jeu de données ouvert Azure. Spark fournit des bibliothèques d’apprentissage automatique intégrées. Cet exemple utilise un classification par régression logistique.

Les bibliothèques Spark de base SparkML et MLlib fournissent de nombreux utilitaires utiles pour les tâches d’apprentissage automatique. Ces utilitaires conviennent pour les tâches suivantes :

  • classification ;
  • Clustering
  • Hypothèse de test et de calcul des exemples de statistiques
  • régression ;
  • Décomposition de valeur singulière (SVD) et analyse des composants principaux (PCA)
  • Modélisation de rubrique

Comprendre la classification et la régression logistique

Une classification, tâche d’apprentissage automatique très courante, implique le tri de données d’entrée par catégories. Un algorithme de classification doit déterminer comment affecter des étiquettes aux données d’entrée fournies. Par exemple, un algorithme d’apprentissage automatique pourrait accepter des informations de stock en entrée et diviser le stock en deux catégories : le stock à vendre et le stock que vous devriez conserver.

L’algorithme Régression logistique est utile pour la classification. L’API de régression logistique Spark est utile pour la classification binaire des données d’entrée dans l’un des deux groupes. Pour plus d’informations sur la régression logistique, consultez Wikipedia.

La régression logistique génère une fonction logistique qui peut prédire la probabilité qu’un vecteur d’entrée appartienne à un groupe ou à l’autre.

Exemple d’analyse prédictive des données des taxis de New York

Tout d’abord, installez azureml-opendatasets. Les données sont disponibles via la ressource Azure Open Datasets. Ce sous-ensemble du jeu de données contient des informations sur les courses de taxis jaunes, notamment les heures et les lieux de départ et d’arrivée, le coût des courses et d’autres attributs.

%pip install azureml-opendatasets

Le reste de cet article s’appuie sur Apache Spark pour effectuer dans un premier temps une analyse sur les données de pourboires des taxis de New York, puis développer un modèle pour prédire si un voyage particulier inclut ou non un pourboire.

Créer un modèle d’apprentissage automatique Apache Spark

  1. Créez un notebook PySpark. Pour plus d’informations, consultez Créer un notebook.

  2. Importez les types nécessaires pour ce notebook.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Nous utiliserons MLflow pour suivre nos expériences d’apprentissage automatique et les exécutions correspondantes. Si la journalisation automatique Microsoft Fabric est activée, les métriques et paramètres correspondants sont automatiquement capturés.

    import mlflow
    

Construire le DataFrame d’entrée

Cet exemple charge les données dans un dataframe Pandas, puis le convertit en dataframe Apache Spark. Dans ce format, nous pouvons appliquer d’autres opérations Apache Spark pour nettoyer et filtrer le jeu de données.

  1. Collez ces lignes dans une nouvelle cellule et exécutez-les pour créer un DataFrame Spark. Cette étape récupère les données via l’API Open Datasets. Nous pouvons filtrer ces données pour examiner une fenêtre de données spécifique. L’exemple de code utilise start_date et end_date pour appliquer un filtre qui retourne un seul mois de données.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Le code réduit le jeu de données à environ 10 000 lignes. Pour accélérer le développement et l’entraînement, le code échantillonne notre jeu de données pour l’instant.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Nous devons examiner nos données à l’aide de la commande intégrée display(). Avec cette commande, nous pouvons afficher facilement un échantillon des données ou explorer les tendances dans les données par graphique.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Préparer les données

La préparation des données est une étape cruciale du processus d’apprentissage automatique. Cela passe par le nettoyage, la transformation et l’organisation des données brutes pour les rendre adaptées à l’analyse et à la modélisation. Dans cet exemple de code, plusieurs étapes de préparation des données sont effectuées :

  • suppression des valeurs hors norme et des valeurs incorrectes en filtrant le jeu de données ;
  • suppression des colonnes non nécessaires à l’entraînement de modèle ;
  • création de colonnes à partir des données brutes ; et
  • génération d’une étiquette pour déterminer si un trajet donné en taxi implique un pourboire.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Ensuite, effectuez une deuxième passe sur les données pour ajouter les caractéristiques finales.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Créer un modèle de régression logistique

La dernière tâche convertit les données étiquetées dans un format qui peut être analysé par régression logistique. L’entrée dans un algorithme de régression logistique doit avoir une structure de paires de vecteurs étiquette/caractéristique, où le vecteur caractéristique est un vecteur de nombres qui représente le point d’entrée.

En fonction des exigences de tâche finales, nous devons convertir les colonnes catégorielles en nombres. Plus précisément, nous devons convertir les colonnes trafficTimeBins et weekdayString en représentations entières. Nous avons de nombreuses options disponibles pour traiter cette exigence. Cet exemple implique l’approche OneHotEncoder :

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Cette action génère un nouveau DataFrame dont les colonnes sont toutes dans un format adapté à l’entraînement d’un modèle.

Entraîner un modèle de régression logistique

La première tâche divise le jeu de données en un jeu d’entraînement et un jeu de test ou de validation.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Une fois que nous avons deux DataFrames, nous devons créer la formule du modèle et l’exécuter sur le DataFrame d’entraînement. Nous pouvons ensuite effectuer une validation par rapport au DataFrame de test. Faites des essais avec différentes versions de la formule du modèle pour voir l’effet de différentes combinaisons.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

La cellule produit :

Area under ROC = 0.9749430523917996

Créer une représentation visuelle de la prédiction

Nous pouvons maintenant construire une visualisation finale pour interpréter les résultats du modèle. Une courbe ROC peut certainement présenter le résultat.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graphique illustrant la courbe ROC pour la régression logistique du modèle de pourboires.