Klassifizierungsaufgaben mit SynapseML
In diesem Artikel führen Sie dieselbe Klassifizierungsaufgabe auf zwei unterschiedliche Arten aus: einmal einfach mit pyspark
und einmal mit der synapseml
-Bibliothek. Die beiden Methoden erzielen die gleiche Leistung, jedoch wird die Einfachheit der Verwendung von synapseml
im Vergleich zu pyspark
hervorgehoben.
Die Aufgabe besteht darin, basierend auf dem Text einer Buchrezension vorherzusagen, ob die Bewertung dieses bei Amazon verkauften Buches gut (Bewertung > 3) oder schlecht ist. Sie erreichen dies, indem Sie LogisticRegression-Learner mit verschiedenen Hyperparametern trainieren und das beste Modell auswählen.
Voraussetzungen
Fügen Sie Ihr Notebook an ein Lakehouse an. Wählen Sie auf der linken Seite Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein Lakehouse zu erstellen.
Setup
Importieren Sie die erforderlichen Python-Bibliotheken, und rufen Sie eine Spark-Sitzung ab.
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Lesen der Daten
Laden Sie die Daten herunter, und lesen Sie sie ein.
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Extrahieren von Features und Verarbeiten der Daten
Echte Daten sind komplexer als das oben angegebene Dataset. Meist weist ein Dataset Features mit mehreren Typen auf, z. B. textlich, numerisch und kategorisch. Um zu veranschaulichen, wie schwierig die Arbeit mit diesen Datasets ist, fügen Sie dem Dataset zwei numerische Features hinzu: die Wortanzahl der Rezension und die mittlere Wortlänge.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Klassifizieren mithilfe von PySpark
Um den besten LogisticRegression-Klassifizierer mithilfe der pyspark
-Bibliothek auszuwählen, müssen Sie die folgenden Schritte explizit ausführen:
- Verarbeiten der Features:
- Tokenisieren der Textspalte
- Bilden des Hashs der tokenisierten Spalte als Vektor mithilfe von Hashing
- Zusammenführen der numerischen Features mit dem Vektor
- Verarbeiten der Bezeichnungsspalte: Umwandlung in den richtigen Typ
- Trainieren mehrerer LogisticRegression-Algorithmen für das
train
-Dataset mit verschiedenen Hyperparametern - Berechnen der Fläche unter der ROC-Kurve für jedes trainierte Modell und Auswählen des Modells mit der höchsten Metrik, wie für das
test
-Dataset berechnet - Bewerten des besten Modells mit dem
validation
-Satz
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Klassifizieren mithilfe von SynapseML
Die erforderlichen Schritte mit synapseml
sind einfacher:
Der Schätzer
TrainClassifier
wandelt die Daten intern in Features um, solange die in den Datasetstrain
,test
,validation
ausgewählten Spalten die Features darstellen.Der Schätzer
FindBestModel
sucht das beste Modell in einem Pool trainierter Modelle, indem das Modell ermittelt wird, das für dastest
-Dataset unter Berücksichtigung der angegebenen Metrik am besten funktioniert.Die Transformation
ComputeModelStatistics
berechnet zur gleichen Zeit die verschiedenen Metriken für ein bewertetes Dataset (in unserem Fall dasvalidation
-Dataset).
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)