Κοινή χρήση μέσω


Εργασίες ταξινόμησης με χρήση του SynapseML

Σε αυτό το άρθρο, εκτελείτε την ίδια εργασία ταξινόμησης με δύο διαφορετικούς τρόπους: μία φορά χρησιμοποιώντας pyspark απλά και μία φορά τη synapseml βιβλιοθήκη. Οι δύο μέθοδοι αποδίδουν τις ίδιες επιδόσεις, αλλά επισημαίνουν την απλότητα χρήσης synapseml σε σύγκριση με pysparkτο .

Η εργασία είναι να προβλέψετε εάν η κριτική ενός βιβλίου που πωλείται στο Amazon από έναν πελάτη είναι καλή (αξιολόγηση > 3) ή κακή με βάση το κείμενο της κριτικής. Το επιτυγχάνετε εκπαιδεύοντας μαθητές LogisticRegression με διαφορετικούς υπερπαραμετήρες και επιλέγοντας το καλύτερο μοντέλο.

Προαπαιτούμενα στοιχεία

Επισυνάψτε το σημειωματάριό σας σε ένα lakehouse. Στην αριστερή πλευρά, επιλέξτε Προσθήκη για να προσθέσετε μια υπάρχουσα λίμνη ή να δημιουργήσετε μια λίμνη.

Ρύθμιση

Εισαγάγετε τις απαραίτητες βιβλιοθήκες Python και λάβετε μια περίοδο λειτουργίας spark.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Ανάγνωση των δεδομένων

Κάντε λήψη και ανάγνωση στα δεδομένα.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Εξαγωγή δυνατοτήτων και επεξεργασία δεδομένων

Τα πραγματικά δεδομένα είναι πιο σύνθετα από το παραπάνω σύνολο δεδομένων. Είναι σύνηθες για ένα σύνολο δεδομένων να έχει δυνατότητες πολλών τύπων, όπως κείμενο, αριθμητικά και κατηγορικά. Για να απεικονίσετε πόσο δύσκολο είναι να εργαστείτε με αυτά τα σύνολα δεδομένων, προσθέστε δύο αριθμητικές δυνατότητες στο σύνολο δεδομένων: το πλήθος λέξεων της αναθεώρησης και το μέσο μήκος της λέξης.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Ταξινόμηση με χρήση του pyspark

Για να επιλέξετε τον καλύτερο αλγόριθμο ταξινόμησης LogisticRegression χρησιμοποιώντας τη pyspark βιβλιοθήκη, πρέπει να εκτελέσετε ρητά τα ακόλουθα βήματα:

  1. Επεξεργαστείτε τις δυνατότητες:
    • Διακριτικό της στήλης κειμένου
    • Κατακερματίζει τη στήλη με διακριτικά σε ένα διάνυσμα με χρήση κατακερματίσματος
    • Συγχώνευση των αριθμητικών δυνατοτήτων με το διάνυσμα
  2. Επεξεργασία της στήλης ετικέτας: μετατρέψτε τη στον κατάλληλο τύπο.
  3. Εκπαίδευση πολλών αλγόριθμων LogisticRegression στο train σύνολο δεδομένων με διαφορετικούς υπερπαραμετήρες
  4. Υπολογίστε την περιοχή κάτω από την καμπύλη ROC για καθένα από τα εκπαιδευμένα μοντέλα και επιλέξτε το μοντέλο με το υψηλότερο μετρικό όπως υπολογίζεται στο test σύνολο δεδομένων
  5. Αξιολόγηση του βέλτιστου μοντέλου στο validation σύνολο
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Ταξινόμηση με χρήση του SynapseML

Τα βήματα που απαιτούνται είναι synapseml απλούστερα:

  1. Ο TrainClassifier Εκτιμητής παρουσιάζει τα δεδομένα εσωτερικά, εφόσον οι στήλες που έχουν επιλεγεί στο trainσύνολο δεδομένων , test, validation αντιπροσωπεύουν τις δυνατότητες

  2. Ο FindBestModel Εκτιμητής βρίσκει το καλύτερο μοντέλο από μια ομάδα εκπαιδευμένων μοντέλων, βρίσκοντας το μοντέλο που αποδίδει καλύτερα στο test σύνολο δεδομένων, με βάση το καθορισμένο μετρικό

  3. Ο ComputeModelStatistics Μετασχηματισμός υπολογίζει τα διαφορετικά μετρικά σε ένα σύνολο δεδομένων που σημειώθηκε (στην περίπτωσή μας, το validation σύνολο δεδομένων) ταυτόχρονα

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)