Zadania klasyfikacji przy użyciu usługi SynapseML
W tym artykule wykonasz to samo zadanie klasyfikacji na dwa różne sposoby: raz przy użyciu zwykłego pyspark
synapseml
i raz przy użyciu biblioteki. Dwie metody dają taką samą wydajność, ale podkreśla prostotę użycia synapseml
w porównaniu z pyspark
.
Zadaniem jest przewidywanie, czy recenzja książki sprzedanej przez klienta na Amazon jest dobra (ocena > 3) czy zła na podstawie tekstu przeglądu. Można to osiągnąć, szkoląc uczniów LogisticsRegression przy użyciu różnych hiperparametrów i wybierając najlepszy model.
Wymagania wstępne
Dołącz notes do magazynu lakehouse. Po lewej stronie wybierz pozycję Dodaj , aby dodać istniejący obiekt lakehouse lub utworzyć jezioro.
Konfiguracja
Zaimportuj niezbędne biblioteki języka Python i uzyskaj sesję platformy Spark.
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Odczytywanie danych
Pobierz i odczytaj dane.
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Wyodrębnianie funkcji i przetwarzanie danych
Rzeczywiste dane są bardziej złożone niż powyższy zestaw danych. Zestaw danych często zawiera funkcje wielu typów, takie jak tekst, numeryczne i podzielone na kategorie. Aby zilustrować, jak trudno jest pracować z tymi zestawami danych, dodaj do zestawu danych dwie funkcje liczbowe: liczbę słów recenzji i średnią długość słowa.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Klasyfikowanie przy użyciu narzędzia pyspark
Aby wybrać najlepszy klasyfikator LogisticsRegression przy użyciu pyspark
biblioteki, należy jawnie wykonać następujące czynności:
- Przetwarzanie funkcji:
- Tokenizowanie kolumny tekstowej
- Skrót kolumny tokenizowanej do wektora przy użyciu skrótu
- Scalanie cech liczbowych z wektorem
- Przetwarzanie kolumny etykiety: rzutowanie jej do odpowiedniego typu.
- Trenowanie wielu algorytmów LogisticsRegression w zestawie danych przy
train
użyciu różnych hiperparametrów - Oblicz obszar pod krzywą ROC dla każdego z wytrenowanych modeli i wybierz model o najwyższej metryce obliczonej
test
na zestawie danych - Ocena najlepszego modelu na
validation
zestawie
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Klasyfikowanie przy użyciu usługi SynapseML
Kroki wymagane w programie synapseml
są prostsze:
Narzędzie
TrainClassifier
do szacowania cechuje dane wewnętrznie, o ile kolumny wybrane w zestawietrain
test
validation
danych reprezentują funkcjeNarzędzie
FindBestModel
do szacowania znajduje najlepszy model z puli wytrenowanych modeli, wyszukując model, który najlepiej sprawdza się wtest
zestawie danych, biorąc pod uwagę określoną metrykęFunkcja
ComputeModelStatistics
Transformer oblicza różne metryki na ocenianym zestawie danych (w naszym przypadkuvalidation
zestaw danych) w tym samym czasie
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)