Tâches de classification avec SynapseML
Dans cet article, vous effectuez la même tâche de classification de deux manières différentes : une fois en utilisant plain pyspark
et une fois en utilisant la bibliothèque synapseml
. Les deux méthodes donnent les mêmes performances, mais mettent en évidence la simplicité d'utilisation synapseml
par rapport à pyspark
.
La tâche consiste à prédire si l'avis d'un client sur un livre vendu sur Amazon est bon (note > 3) ou mauvais en fonction du texte de l'avis. Vous y parvenez en formant les apprenants LogisticRegression avec différents hyperparamètres et en choisissant le meilleur modèle.
Prérequis
Attachez votre cahier à une cabane au bord du lac. Sur le côté gauche, sélectionnez Ajouter pour ajouter une maison de lac existante ou créer une maison de lac.
Programme d’installation
Importez les bibliothèques Python nécessaires et obtenez une session Spark.
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Lire les données
Téléchargez et lisez les données.
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Extraire des fonctionnalités et traiter des données
Les données réelles sont plus complexes que l'ensemble de données ci-dessus. Il est courant qu'un jeu de données ait des fonctionnalités de plusieurs types, telles que textuelles, numériques et catégorielles. Pour illustrer à quel point il est difficile de travailler avec ces ensembles de données, ajoutez deux caractéristiques numériques à l'ensemble de données : le nombre de mots de l'avis et la longueur moyenne des mots.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Classifier à l'aide de pyspark
Pour choisir le meilleur classificateur LogisticRegression à l'aide de la bibliothèque pyspark
, vous devez explicitement effectuer les étapes suivantes :
- Traiter les fonctionnalités :
- Tokeniser la colonne de texte
- Hachez la colonne tokenisée dans un vecteur en utilisant le hachage
- Fusionner les caractéristiques numériques avec le vecteur
- Traitez la colonne d'étiquette : convertissez-la dans le type approprié.
- Former plusieurs algorithmes LogisticRegression sur l'ensemble de données
train
avec différents hyperparamètres - Calculez l'aire sous la courbe ROC pour chacun des modèles entraînés et sélectionnez le modèle avec la métrique la plus élevée calculée sur l'ensemble de données
test
- Évaluez le meilleur modèle sur le plateau
validation
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Classifier à l'aide de SynapseML
Les étapes nécessaires avec synapseml
sont plus simples :
L'estimateur
TrainClassifier
caractérise les données en interne, tant que les colonnes sélectionnées dans l'ensemble de donnéestrain
,test
,validation
représentent les caractéristiquesL'estimateur
FindBestModel
trouve le meilleur modèle à partir d'un pool de modèles formés en trouvant le modèle qui fonctionne le mieux sur l'ensemble de donnéestest
tenu de l’indicateur de performance spécifiéLe transformateur
ComputeModelStatistics
calcule les différentes métriques sur un jeu de données noté (dans notre cas, le jeu de donnéesvalidation
) en même temps
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)