Del via


Klassificeringsopgaver ved hjælp af SynapseML

I denne artikel udfører du den samme klassificeringsopgave på to forskellige måder: når du bruger almindeligt pyspark og én gang ved hjælp af synapseml biblioteket. De to metoder giver den samme ydeevne, men fremhæver enkelheden ved at pysparkbruge synapseml sammenlignet med .

Opgaven er at forudsige, om en kundes anmeldelse af en bog, der er solgt på Amazon, er god (bedømmelse > 3) eller dårlig baseret på teksten i gennemgangen. Du opnår det ved at oplære LogisticRegression-elever med forskellige hyperparametre og vælge den bedste model.

Forudsætninger

Vedhæft din notesbog til et lakehouse. I venstre side skal du vælge Tilføj for at tilføje et eksisterende lakehouse eller oprette et lakehouse.

Opsætte

Importér nødvendige Python-biblioteker, og få en spark-session.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Læs dataene

Download og læs dataene.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Udtræk funktioner og behandl data

Reelle data er mere komplekse end ovenstående datasæt. Det er almindeligt, at et datasæt har funktioner af flere typer, f.eks. tekst, numerisk og kategorisk. Hvis du vil illustrere, hvor svært det er at arbejde med disse datasæt, skal du føje to numeriske funktioner til datasættet: antallet af ord i korrekturen og den gennemsnitlige ordlængde.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Klassificer ved hjælp af pyspark

Hvis du vil vælge den bedste LogisticRegression-klassificering ved hjælp af pyspark biblioteket, skal du eksplicit udføre følgende trin:

  1. Behandl funktionerne:
    • Tokenize tekstkolonnen
    • Hash den tokeniserede kolonne til en vektor ved hjælp af hashing
    • Flet de numeriske funktioner med vektoren
  2. Behandl etiketkolonnen: Kast den til den korrekte type.
  3. Oplær flere LogisticRegression-algoritmer på train datasættet med forskellige hyperparametre
  4. Beregn området under ROC-kurven for hver af de oplærte modeller, og vælg modellen med den højeste metrikværdi som beregnet på test datasættet
  5. Evaluer den bedste model på validation sættet
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Klassificer ved hjælp af SynapseML

De trin, der skal udføres med synapseml , er enklere:

  1. TrainClassifier Estimator viser dataene internt, så længe de kolonner, der er valgt i traindatasættet , testvalidation repræsenterer funktionerne

  2. FindBestModel Estimator finder den bedste model fra en pulje af oplærte modeller ved at finde den model, der fungerer bedst på test datasættet på baggrund af den angivne metrikværdi

  3. ComputeModelStatistics Transformeren beregner de forskellige målepunkter for et scoret datasæt (i vores tilfælde validation datasættet) på samme tid

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)