Delen via


Classificatietaken met SynapseML

In dit artikel voert u dezelfde classificatietaak op twee verschillende manieren uit: eenmaal zonder opmaak pyspark en eenmaal met behulp van de synapseml bibliotheek. De twee methoden leveren dezelfde prestaties op, maar benadrukt de eenvoud van het gebruik synapseml in vergelijking met pyspark.

De taak is om te voorspellen of de beoordeling van een boek dat op Amazon is verkocht, goed is (beoordeling > 3) of slecht op basis van de tekst van de beoordeling. U doet dit door LogisticRegression-cursisten met verschillende hyperparameters te trainen en het beste model te kiezen.

Vereisten

Koppel uw notitieblok aan een lakehouse. Selecteer aan de linkerkant Toevoegen om een bestaand lakehouse toe te voegen of een lakehouse te maken.

Instellingen

Importeer de benodigde Python-bibliotheken en haal een Spark-sessie op.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

De gegevens lezen

Download en lees de gegevens.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Functies extraheren en gegevens verwerken

Echte gegevens zijn complexer dan de bovenstaande gegevensset. Het is gebruikelijk dat een gegevensset functies van meerdere typen heeft, zoals tekst, numeriek en categorisch. Als u wilt laten zien hoe moeilijk het is om met deze gegevenssets te werken, voegt u twee numerieke functies toe aan de gegevensset: het aantal woorden van de beoordeling en de gemiddelde woordlengte.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Classificeren met behulp van pyspark

Als u de beste LogisticRegression-classificatie wilt kiezen met behulp van de pyspark bibliotheek, moet u de volgende stappen expliciet uitvoeren:

  1. De functies verwerken:
    • De tekstkolom tokeniseren
    • Hash van de tokenized kolom in een vector met behulp van hashing
    • De numerieke kenmerken samenvoegen met de vector
  2. De labelkolom verwerken: cast deze naar het juiste type.
  3. Meerdere LogisticRegression-algoritmen trainen in de train gegevensset met verschillende hyperparameters
  4. Bereken het gebied onder de ROC-curve voor elk van de getrainde modellen en selecteer het model met de hoogste metrische waarde zoals berekend op de test gegevensset
  5. Het beste model op de validation set evalueren
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Classificeren met SynapseML

De benodigde synapseml stappen zijn eenvoudiger:

  1. De TrainClassifier estimator bevat de gegevens intern, zolang de kolommen die zijn geselecteerd in de traingegevensset testvalidation de functies vertegenwoordigen

  2. De FindBestModel estimator vindt het beste model uit een pool met getrainde modellen door het model te vinden dat het beste presteert op de test gegevensset op basis van de opgegeven metrische gegevens

  3. De ComputeModelStatistics transformator berekent de verschillende metrische gegevens op een gescoorde gegevensset (in ons geval de validation gegevensset) tegelijkertijd

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)