使用 SynapseML 的分类任务

在本文中,你会通过两种不同的方式执行相同的分类任务:一种是使用普通pyspark,一种是使用synapseml库。 这两种方法会产生相同的性能,但与pyspark相比,突出显示了使用synapseml的简单性。

任务是根据评论文本预测客户对亚马逊上销售的书籍的评价是好的(评级> 3)还是差的。 可以使用不同的超参数训练 LogisticRegression 学习者并选择最佳模型以实现此目的。

先决条件

将笔记本附加到湖屋。 在左侧,选择“添加”以添加现有湖屋或创建湖屋。

安装

导入必要的 Python 库并获取 Spark 会话。

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

读取数据

下载并读取数据。

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

提取特征并处理数据

实际数据比上述数据集复杂。 数据集通常具有多种类型的特征,例如文本、数字和分类。 为了说明使用这些数据集的难度,请向数据集添加两个数字特征:评论的字数和平均字长

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

使用 pyspark 进行分类

要使用pyspark库选择最佳的 LogisticRegression 分类器,需要显式执行以下步骤:

  1. 处理特征:
    • 使文本列标记化
    • 使用哈希将标记化列哈希为向量
    • 将数字特征与向量合并
  2. 处理标签列:将其强制转换为正确的类型。
  3. 使用不同的超参数在train数据集上训练多个 LogisticRegression 算法
  4. 计算每个已训练模型 ROC 曲线下的面积,并选择test数据集上计算出的具有最高指标的模型
  5. 评估validation集上的最佳模型
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

使用 SynapseML 进行分类

使用synapseml所需的步骤更简单:

  1. TrainClassifier估算器在内部对数据进行特征化,只要traintestvalidation数据集中选择的列表示特征

  2. FindBestModel估算器根据指定的指标查找test数据集上性能最佳的模型,从而从训练的模型池中查找最佳模型

  3. 同时,ComputeModelStatistics转换器计算评分数据集上的不同指标(本例中是validation数据集)

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)