Condividi tramite


Attività di classificazione con SynapseML

In questo articolo viene eseguita la stessa attività di classificazione in due modi diversi: una volta usando plain pyspark e una volta usando la synapseml libreria. I due metodi hanno le stesse prestazioni, ma evidenzia la semplicità di utilizzo di synapseml rispetto a pyspark.

L'attività consiste nel prevedere se la recensione di un cliente di un libro venduto su Amazon è positiva (valutazione > 3) o negativa, in base al testo della recensione. Ciò è possibile formando agli studenti LogisticRegression iperparametri diversi e scegliendo il modello migliore.

Prerequisiti

Collegare il notebook a un lakehouse. Sul lato sinistro, selezionare Aggiungi per aggiungere un lakehouse esistente o creare un lakehouse.

Configurazione

Importare le librerie Python necessarie e ottenere una sessione Spark.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Leggere i dati

Scaricare e leggere i dati.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Estrarre funzionalità ed elaborare i dati

I dati reali sono più complessi rispetto al set di dati precedente. In genere, i set di dati hanno funzionalità di più tipi, come funzionalità di testo, funzionalità numeriche e di categoria. Per comprendere quanto sia difficile lavorare con questi set di dati, aggiungere due funzionalità numeriche al set di dati: il conteggio delle parole della recensione e la lunghezza media della parole.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Classificare usando pyspark

Per scegliere il classificatore LogisticRegression migliore usando la pyspark libreria, è necessario eseguire in modo esplicito i passaggi seguenti:

  1. Elaborazione delle funzionalità:
    • Tokenizzare la colonna di testo
    • Eseguire l'hashing della colonna tokenizzata in un vettore usando la funzione hash
    • Integrare le caratteristiche numeriche al vettore
  2. Elaborare la colonna etichetta: eseguirne il cast nel tipo corretto.
  3. Eseguire il training di più algoritmi LogisticRegression nel train set di dati con iperparametri diversi
  4. Calcolare l'area sotto la curva ROC per ognuno dei modelli sottoposti a training e selezionare il modello con la metrica più alta calcolata nel test set di dati
  5. Valutare il modello migliore nel validation set
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Classificare usando SynapseML

I passaggi necessari con synapseml sono più semplici:

  1. L'oggetto TrainClassifier Stima definisce i dati internamente, purché le colonne selezionate nei set di dati train, test, validation rappresentino le funzionalità

  2. L'oggetto FindBestModel Stima indivua il modello migliore da un pool di modelli sottoposti a training, trovando il modello che offre prestazioni migliori nel test set di dati in base alla metrica specificata

  3. Il Trasformatore ComputeModelStatistics calcola contemporaneamente le diverse metriche in un set di dati con punteggio (in questo caso, il set di dati validation)

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)